如何定义流式 SSE 响应格式并兼容 OpenAI API 语义?
解读
在国内大模型落地场景中,“流式 SSE” 已成为高并发、低延迟服务的标配。面试官问这一题,核心想验证三件事:
- 你是否真正理解 Server-Sent Events 的协议细节(MIME、字段、重连、心跳)。
- 你是否能把 OpenAI 的增量 delta 语义(如 choices[].delta.content)无损地映射到 SSE 的 data: 行,且字段顺序、空值处理、特殊 token(<|endoftext|>)都与官方对齐。
- 你是否能在 国产监管框架 下做二次封装:内容安全审核、敏感词同步拦截、用户级流控,同时仍让前端“零改造”接入。
一句话:既要协议对齐,又要合规可控。
知识点
-
SSE 基础
- 响应头必须返回
Content-Type: text/event-stream; charset=utf-8和Cache-Control: no-cache。 - 每条消息格式为
field: value\n,常见 field 有event,data,id,retry;连续两个\n表示一条消息结束。 - 浏览器原生
EventSource仅支持 GET,且默认带Last-Event-ID重连;国内 B 端常改用 POST + 自建解析器,需手动维护id字段做断点续传。
- 响应头必须返回
-
OpenAI 增量 chunk 语义
- 每个 chunk 是合法 JSON,顶层字段
object="chat.completion.chunk"。 - 增量内容在
choices[].delta,而非message;结束标志是choices[].finish_reason非空。 - 最后会发送
[DONE]作为纯文本行,非 JSON,前端需特判。
- 每个 chunk 是合法 JSON,顶层字段
-
国产合规改造点
- 内容安全:必须在首 token 到达前完成 同步审核(国内云厂商提供 50ms 内返回的接口),不通过直接返回
event: error并关闭连接,避免“先生成后审核”导致舆情。 - 字段脱敏:若客户要求私有化部署,需把
model字段做别名映射,防止内部代号泄露。 - 日志留痕:每条
id建议采用UUID + 用户哈希后 8 位,满足 《生成式 AI 服务管理暂行办法》 的追溯要求。
- 内容安全:必须在首 token 到达前完成 同步审核(国内云厂商提供 50ms 内返回的接口),不通过直接返回
-
性能与稳定性
- 使用 异步生成器(Python yield / Java Flux) 逐 chunk 吐出,避免一次性序列化大对象。
- 网络层开启 TCP_NODELAY 禁用 Nagle,降低 30~50 ms 延迟。
- 在 K8s 环境给 Pod 加 preStop hook,确保客户端收到
event: close后再终止,防止“异常断流”触发前端重连风暴。
答案
下面给出可直接落地的 Python FastAPI 代码骨架,同时满足 OpenAI 语义与国内监管要求:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import uuid, json, time
app = FastAPI()
async def compliant_stream(prompt: str, user_id: str):
# 1. 同步内容安全审核
if not sync_audit(prompt): # 国内云审核接口
yield f"event: error\ndata: {json.dumps({'error': {'code': 'content_filter', 'message': '输入含敏感信息'}})}\n\n"
return
# 2. 生成全局唯一 id,用于日志追溯
stream_id = f"{uuid.uuid4().hex}#{user_id[-8:]}"
# 3. 模拟大模型逐 token 生成
tokens = ["流", "式", "测", "试", "完", "成", "。"]
for idx, token in enumerate(tokens):
chunk = {
"id": stream_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gpt-3.5-turbo", # 可映射内部别名
"choices": [{
"index": 0,
"delta": {"content": token},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
# 4. 发送 finish 块
final = {
"id": stream_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gpt-3.5-turbo",
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {json.dumps(final, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
prompt = body["messages"][-1]["content"]
user_id = body.get("user", "default")
return StreamingResponse(
compliant_stream(prompt, user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓存
"Access-Control-Expose-Headers": "X-Request-Id"
}
)
关键对齐点
- 每条
data:行都是合法 JSON,与 OpenAI 完全一致。 - 结束标志先发送
finish_reason非空的 JSON,再发送[DONE],前端无需改动。 - 审核不通过时通过
event: error返回,不会抛出 HTTP 4xx,防止 SDK 异常捕获。
拓展思考
- 多模态流式:当输出包含图片 URL 时,可把
delta.content置为空,而在delta.image_url字段放 临时签名 URL,并设置 5 分钟过期,兼顾安全与体验。 - 超长上下文续接:利用 SSE 的
id字段存储 最后一段消息的 sha256,客户端断网重连时带Last-Event-ID,服务端对比 sha256 若一致可直接 从缓存续传,减少 50% 冗余流量。 - 流式 token 级计费:在每次
yield前累加token_count,并异步写 Redis 滚动窗口,实现 毫秒级实时账单,满足国内政企客户“先看账后付费”诉求。