如何定义流式 SSE 响应格式并兼容 OpenAI API 语义?

解读

在国内大模型落地场景中,“流式 SSE” 已成为高并发、低延迟服务的标配。面试官问这一题,核心想验证三件事:

  1. 你是否真正理解 Server-Sent Events 的协议细节(MIME、字段、重连、心跳)。
  2. 你是否能把 OpenAI 的增量 delta 语义(如 choices[].delta.content)无损地映射到 SSE 的 data: 行,且字段顺序、空值处理、特殊 token(<|endoftext|>)都与官方对齐。
  3. 你是否能在 国产监管框架 下做二次封装:内容安全审核、敏感词同步拦截、用户级流控,同时仍让前端“零改造”接入。

一句话:既要协议对齐,又要合规可控

知识点

  1. SSE 基础

    • 响应头必须返回 Content-Type: text/event-stream; charset=utf-8Cache-Control: no-cache
    • 每条消息格式为 field: value\n,常见 field 有 event, data, id, retry;连续两个 \n 表示一条消息结束。
    • 浏览器原生 EventSource 仅支持 GET,且默认带 Last-Event-ID 重连;国内 B 端常改用 POST + 自建解析器,需手动维护 id 字段做断点续传。
  2. OpenAI 增量 chunk 语义

    • 每个 chunk 是合法 JSON,顶层字段 object="chat.completion.chunk"
    • 增量内容在 choices[].delta,而非 message;结束标志是 choices[].finish_reason 非空。
    • 最后会发送 [DONE] 作为纯文本行,非 JSON,前端需特判。
  3. 国产合规改造点

    • 内容安全:必须在首 token 到达前完成 同步审核(国内云厂商提供 50ms 内返回的接口),不通过直接返回 event: error 并关闭连接,避免“先生成后审核”导致舆情。
    • 字段脱敏:若客户要求私有化部署,需把 model 字段做别名映射,防止内部代号泄露。
    • 日志留痕:每条 id 建议采用 UUID + 用户哈希后 8 位,满足 《生成式 AI 服务管理暂行办法》 的追溯要求。
  4. 性能与稳定性

    • 使用 异步生成器(Python yield / Java Flux) 逐 chunk 吐出,避免一次性序列化大对象。
    • 网络层开启 TCP_NODELAY 禁用 Nagle,降低 30~50 ms 延迟。
    • 在 K8s 环境给 Pod 加 preStop hook,确保客户端收到 event: close 后再终止,防止“异常断流”触发前端重连风暴。

答案

下面给出可直接落地的 Python FastAPI 代码骨架,同时满足 OpenAI 语义与国内监管要求:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import uuid, json, time

app = FastAPI()

async def compliant_stream(prompt: str, user_id: str):
    # 1. 同步内容安全审核
    if not sync_audit(prompt):          # 国内云审核接口
        yield f"event: error\ndata: {json.dumps({'error': {'code': 'content_filter', 'message': '输入含敏感信息'}})}\n\n"
        return

    # 2. 生成全局唯一 id,用于日志追溯
    stream_id = f"{uuid.uuid4().hex}#{user_id[-8:]}"

    # 3. 模拟大模型逐 token 生成
    tokens = ["流", "式", "测", "试", "完", "成", "。"]
    for idx, token in enumerate(tokens):
        chunk = {
            "id": stream_id,
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": "gpt-3.5-turbo",          # 可映射内部别名
            "choices": [{
                "index": 0,
                "delta": {"content": token},
                "finish_reason": None
            }]
        }
        yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"

    # 4. 发送 finish 块
    final = {
        "id": stream_id,
        "object": "chat.completion.chunk",
        "created": int(time.time()),
        "model": "gpt-3.5-turbo",
        "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
    }
    yield f"data: {json.dumps(final, ensure_ascii=False)}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    body = await request.json()
    prompt = body["messages"][-1]["content"]
    user_id = body.get("user", "default")
    return StreamingResponse(
        compliant_stream(prompt, user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",        # 禁用 Nginx 缓存
            "Access-Control-Expose-Headers": "X-Request-Id"
        }
    )

关键对齐点

  • 每条 data: 行都是合法 JSON,与 OpenAI 完全一致。
  • 结束标志先发送 finish_reason 非空的 JSON,再发送 [DONE],前端无需改动。
  • 审核不通过时通过 event: error 返回,不会抛出 HTTP 4xx,防止 SDK 异常捕获。

拓展思考

  1. 多模态流式:当输出包含图片 URL 时,可把 delta.content 置为空,而在 delta.image_url 字段放 临时签名 URL,并设置 5 分钟过期,兼顾安全与体验。
  2. 超长上下文续接:利用 SSE 的 id 字段存储 最后一段消息的 sha256,客户端断网重连时带 Last-Event-ID,服务端对比 sha256 若一致可直接 从缓存续传,减少 50% 冗余流量。
  3. 流式 token 级计费:在每次 yield 前累加 token_count,并异步写 Redis 滚动窗口,实现 毫秒级实时账单,满足国内政企客户“先看账后付费”诉求。