SSE协议与流式响应
SSE 协议:不只是 data: 开头的文本
1. SSE 是什么
SSE 全称 Server-Sent Events,直译就是服务端发送的事件。它是 HTML 标准的一部分,目前的权威规范在 WHATWG HTML Living Standard 里,定义了 text/event-stream 这套事件流格式和浏览器端的 EventSource API。
一句话总结:基于 HTTP 的单向服务端推送协议。
客户端发起一个普通的 HTTP 请求,服务端保持连接不关闭,持续向客户端推送事件。注意是单向的——只有服务端往客户端推,客户端不能通过同一个连接往服务端发数据。
和普通 HTTP 请求的区别很直观:
- 普通 HTTP:客户端发请求 → 服务端返回完整响应 → 连接关闭。一问一答。
- SSE:客户端发请求 → 服务端持续推送事件 → 推完了或客户端主动断开 → 连接关闭。一问多答。
打个比方:普通 HTTP 像发短信,你发一条我回一条,每次都是独立的。SSE 像打电话,拨通之后对方一直在说(你只需要听),说完了再挂。
SSE 响应有一个标志性的 HTTP 头:
Content-Type: text/event-stream
浏览器和 HTTP 客户端看到这个 Content-Type,就知道这是一个 SSE 流,我要按 SSE 的规则来解析。
2. SSE 的完整字段格式
其实 SSE 规范一共定义了四个字段和一个注释机制,每个都有明确的用途。一个完整的 SSE 事件流长这样:
retry: 3000: 这是一条注释,客户端会忽略
id: 1
event: message
data: {"content":"你好"}
id: 2
event: token
data: {"content":","}
id: 3
event: token
data: {"content":"有什么可以帮你?"}
id: 4
event: done
data: {"status":"completed", "total_tokens":128}
逐个来看。
2.1 data: 字段——事件数据
你最熟悉的字段。每行以 data: 开头,后面跟的是事件的数据内容。
data: {"content":"你好"}
一个事件可以有多行 data:,它们会被换行符 \n 拼接起来:
data: 第一行
data: 第二行
data: 第三行
客户端收到的数据是 第一行\n第二行\n第三行。
一个事件以两个连续换行(空行)结束。这是 SSE 的事件边界标志——解析器看到空行就知道上一个事件结束了,该处理它了。
2.2 event: 字段——自定义事件类型
如果不写 event: 字段,事件的类型默认是 message。通过 event: 字段可以给事件指定自定义类型:
event: token
data: {"content":"你好"}
event: error
data: {"code":429, "message":"rate limit exceeded"}
event: done
data: {"total_tokens":128}
这样客户端可以根据事件类型做不同的处理——收到 token 事件就拼接内容,收到 error 事件就展示错误,收到 done 事件就结束流。
大模型 API(OpenAI、SiliconFlow 等)通常不使用 event: 字段,所有事件都是默认的 message 类型,靠 data: 里的 JSON 内容来区分。但如果你自己搭建 SSE 服务端,event: 字段就很有用了——下一篇 Spring Boot SSE 服务端实战会用到。
2.3 id: 字段——事件 ID
每个事件可以有一个 ID:
id: 42
data: {"content":"这是第 42 个事件"}
id: 字段的核心作用是支持断线重连。机制是这样的:
- 1.服务端给每个事件编一个
id: - 2.客户端内部会记住最后收到的事件 ID
- 3.如果连接断开,客户端重连时会在 HTTP 请求头里带上
Last-Event-ID: 42 - 4.服务端看到这个头,就知道客户端已经收到了 ID 42 之前的所有事件,从 ID 43 开始继续推送
这个机制让 SSE 天然支持断点续传——连接断了不用从头来,从上次断开的地方接着推。不过大模型 API 通常不使用 id: 字段。因为大模型的流式生成是一次性的——内容是实时生成的,断了就没法从断点接着生成,只能重新请求。
2.4 retry: 字段——重连间隔
服务端可以通过 retry: 告诉客户端如果连接断了,等多久再重连:
retry: 5000
data: {"content":"连接已建立"}
retry: 5000 的意思是如果连接断了,等 5000 毫秒(5 秒)再尝试重连。这个字段通常只在连接建立后发送一次。
浏览器原生的 EventSource API 会自动处理重连(默认间隔约 3 秒)。但在 Java 客户端里,你需要自己实现重连逻辑——OkHttp 的 HTTP 客户端不会自动按 retry: 的值重连。
2.5 注释行——心跳保活
以冒号 : 开头的行是注释,客户端会直接忽略:
: this is a comment
: keepalive
注释看起来没什么用,但在生产环境有一个很重要的作用——心跳保活。
问题背景:SSE 连接在数据推送的间隙可能长时间没有数据传输(比如大模型在思考,几秒甚至十几秒没有输出)。如果中间经过了 Nginx、负载均衡器、CDN 等中间件,它们可能会认为这个连接已经死了,主动把连接断掉。
解决方案:服务端定期发送一个空注释 : keepalive\n\n 或者 :\n\n,客户端会忽略它,但中间件看到有数据在传输,就不会超时断开连接。
SSE vs WebSocket vs 长轮询
你可能会好奇:为什么大模型 API 都用 SSE?不是还有 WebSocket 吗?WebSocket 不是更高级吗?
1. 三种方案的本质区别
先搞清楚三种实时通信方案各自是什么:
- 长轮询(Long Polling):客户端发请求,服务端如果没有新数据就 hold 住不返回(而不是立刻返回空),等有新数据了再返回。客户端收到响应后立刻再发一个新请求。本质上还是一问一答,只是每次答的等待时间变长了。
- SSE(Server-Sent Events):客户端发一个 HTTP 请求,服务端保持连接,持续单向推送数据。基于 HTTP,只能服务端往客户端推。
- WebSocket:客户端和服务端通过一次 HTTP 握手升级到 WebSocket 协议,之后双方可以随时互发消息。这是一个独立于 HTTP 的全双工协议。
2. 为什么大模型 API 选了 SSE
主要是四个原因:
- 方向匹配:大模型流式输出是典型的服务端→客户端单向推送——模型生成内容,逐块发给你。你不需要在同一个连接上往服务端发数据(你的问题在第一次 HTTP 请求里已经发过了)。SSE 刚好满足这个需求,WebSocket 的双向能力完全用不上。
- HTTP 原生:SSE 就是一个普通的 HTTP 响应,只是 Content-Type 变成了
text/event-stream,数据格式是文本流。Nginx、CDN、负载均衡器、API 网关……所有 HTTP 基础设施都能原生支持,不需要额外配置。WebSocket 则需要专门配置代理的 WebSocket 升级支持(proxy_set_header Upgrade $http_upgrade这类配置),在某些企业网络环境下可能被防火墙拦截。 - 自动重连:SSE 规范内置了重连机制(
Last-Event-ID+retry:),浏览器的EventSourceAPI 自动处理。WebSocket 断线后需要你自己写重连逻辑。 - 实现简单:服务端只需要按格式输出文本行,客户端只需要逐行解析。WebSocket 有自己的帧协议(Frame),需要处理帧的编码解码、ping/pong 心跳、关闭帧等,实现复杂度高一个量级。
一个简单的判断标准:如果你的场景是服务端向客户端持续推送数据,优先考虑 SSE。只有当客户端也需要实时向服务端发送数据时(在线协作编辑、实时游戏、聊天室里的“对方正在输入…”),才需要 WebSocket。
大模型流式响应的数据结构
模型调用 API 那篇里已经展示过流式响应的基本格式。这里更系统地梳理一遍完整的数据结构,特别是一些之前没展开讲的细节。
1. 流式 vs 非流式的响应对比
先放一张对比表,有个全局认知:
2. 流式响应的逐 chunk 解析
用一个实际的例子来过一遍完整的流式响应。假设你问模型 SSE 是什么?,模型回答 SSE 是一种基于 HTTP 的服务端推送协议。
服务端推送过来的 SSE 流完整长这样:
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"SSE"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":" 是"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"一种"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"基于"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":" HTTP "},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"的"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"服务端"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"推送"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"协议。"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":15,"completion_tokens":12,"total_tokens":27}}
data: [DONE]
逐 chunk 拆解:
| 序号 | delta 内容 | finish_reason | 说明 |
|---|---|---|---|
| 1 | {"role":"assistant","content":""} | null | 开场白:模型说"我准备开始说话了",role 只出现这一次 |
| 2 | {"content":"SSE"} | null | 开始输出内容 |
| 3 | {"content":" 是"} | null | 继续输出 |
| 4~10 | {"content":"..."} | null | 逐块输出内容 |
| 11 | {} | "stop" | delta 为空,finish_reason 为 stop——说话结束 |
| - | [DONE] | - | SSE 流结束标记,不是 JSON |
客户端要做的事情:把第 1~10 个 chunk 的 delta.content 拼接起来,就是完整的回答。
有几个需要注意的细节:
- 第一个 chunk 的
content可能是空字符串,不是所有平台都这样,但你需要兼容。有的平台第一个 chunk 只有role,没有content字段。 - 中间可能出现空
delta:有的 chunk 的delta是空对象{},既没有content也没有role。不要因为拿不到content就报错。 finish_reason只在最后一个 chunk 有值,之前的 chunk 都是null。值不只是"stop"(正常结束),还可能是"length"(达到 max_tokens 上限被截断)、"content_filter"(被安全过滤截断)、"tool_calls"(模型转入工具调用流程)。代码里需要根据不同的值做相应处理。data: [DONE]不是 JSON,不能用 JSON 解析器去解析,要单独判断。
3. 流式模式下的 Token 统计
非流式响应里,usage 字段直接就在响应体里:
{"choices":[...],"usage":{"prompt_tokens":15,"completion_tokens":12,"total_tokens":27}}
流式模式下就麻烦了。不同平台的处理方式不一样:
方式一:最后一个 chunk 里附带 usage
部分平台(包括 SiliconFlow)会在 finish_reason: "stop" 那个 chunk 里附带 usage 字段。前面的示例里已经展示了这种情况。
方式二:需要额外参数 stream_options
OpenAI 的 API 需要在请求中显式加上 stream_options:
{"model":"gpt-4","messages":[...],"stream":true,"stream_options":{"include_usage":true}}
加了这个参数之后,OpenAI 会在流的最后额外发送一个只包含 usage 的 chunk(choices 为空数组)。
方式三:不返回 usage
有些平台在流式模式下压根不返回 usage。这种情况下,completion_tokens 可以在客户端自己估算(统计收到的所有 delta.content 拼接后的字符数或 Token 数),但 prompt_tokens 只有服务端知道,拿不到就拿不到。
实测 SiliconFlow 平台的行为:在最后一个 chunk(
finish_reason: "stop")中会返回usage字段,不需要额外设置stream_options。如果你用的是其他平台,建议先测一下流式响应里有没有usage,没有的话看文档是否支持stream_options。
Java 实战:健壮的 SSE 客户端
作为入门 demo 完全没问题,但在生产环境有几个隐患:
- 没有超时控制——如果服务端卡住不发数据(模型推理异常、网络拥塞),客户端的
readLine()会一直阻塞 - 没有错误处理——连接中断直接抛
IOException,调用方拿不到已经接收到的部分内容 - 没有回调机制——所有内容都是
System.out.print直接打印,没法集成到业务逻辑里(比如实时推送给前端) - 没有 Token 统计——流式模式下
usage的提取逻辑缺失 - 边界情况没处理——空
delta、缺失content字段、JSON 解析失败都没有容错
接下来封装一个相对偏向于生产可用的 SSE 流式客户端。
注意:下面的代码实现的是大模型流式 API 常见的 data-only SSE 消费逻辑,针对 OpenAI 兼容接口的主流场景做了优化。它不是一个完整的通用 SSE 协议解析器——标准 SSE 还有多行
data:拼接、未完成事件不派发等规则,在大模型 API 的场景下用不到,这里不做处理。
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import okhttp3.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class SseStreamClient {
private static final String API_URL = "https://api.siliconflow.cn/v1/chat/completions";
private static final String API_KEY = "sk-xxx"; // 替换为你的 API Key
// ========== 回调接口 ==========
/**
* SSE 流式响应的事件回调
*/
interface StreamCallback {
/** 收到一个 content 增量(每个 token 调用一次) */
void onToken(String token);
/** 流正常结束,返回完整内容和 Token 统计 */
void onComplete(String fullContent, Usage usage);
/** 发生错误,partialContent 是错误发生前已接收到的内容 */
void onError(Exception e, String partialContent);
}
/**
* Token 用量统计
*/
static class Usage {
int promptTokens;
int completionTokens;
int totalTokens;
@Override
public String toString() {
return String.format("prompt=%d, completion=%d, total=%d",
promptTokens, completionTokens, totalTokens);
}
}
// ========== 核心方法 ==========
/**
* 发起流式请求
*
* @param model 模型 ID
* @param systemPrompt System 消息内容
* @param userMessage 用户消息内容
* @param callback 事件回调
*/
public static void streamChat(String model, String systemPrompt,
String userMessage, StreamCallback callback) {
// 1. 构建请求体
JsonObject requestBody = new JsonObject();
requestBody.addProperty("model", model);
requestBody.addProperty("temperature", 0.7);
requestBody.addProperty("max_tokens", 2048);
requestBody.addProperty("stream", true);
JsonArray messages = new JsonArray();
if (systemPrompt != null && !systemPrompt.isEmpty()) {
JsonObject sysMsg = new JsonObject();
sysMsg.addProperty("role", "system");
sysMsg.addProperty("content", systemPrompt);
messages.add(sysMsg);
}
JsonObject userMsg = new JsonObject();
userMsg.addProperty("role", "user");
userMsg.addProperty("content", userMessage);
messages.add(userMsg);
requestBody.add("messages", messages);
// 2. 创建 HTTP 客户端
// 关键:readTimeout 是"两个数据块之间的最大等待时间",不是整个响应的超时
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS) // 流式场景需要更长
.writeTimeout(15, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(API_URL)
.addHeader("Authorization", "Bearer " + API_KEY)
.addHeader("Content-Type", "application/json")
.addHeader("Accept", "text/event-stream") // 明确告诉服务端我要 SSE
.post(RequestBody.create(requestBody.toString(),
MediaType.parse("application/json")))
.build();
// 3. 发起请求并解析 SSE 流
StringBuilder fullContent = new StringBuilder();
Usage usage = null;
try (Response response = client.newCall(request).execute()) {
// 检查 HTTP 状态码
if (!response.isSuccessful()) {
String errorBody = response.body() != null ? response.body().string() : "无响应体";
callback.onError(
new RuntimeException("HTTP " + response.code() + ": " + errorBody),
fullContent.toString()
);
return;
}
// 逐行读取 SSE 流(显式指定 UTF-8,SSE 规范要求 UTF-8 编码)
BufferedReader reader = new BufferedReader(
new InputStreamReader(response.body().byteStream(), StandardCharsets.UTF_8));
String line;
boolean streamDone = false; // 是否收到了 [DONE] 标记
while ((line = reader.readLine()) != null) {
// 跳过空行(SSE 事件分隔符)
if (line.isEmpty()) {
continue;
}
// 跳过注释行(心跳保活)
if (line.startsWith(":")) {
continue;
}
// 只处理 data: 开头的行(兼容 "data: xxx" 和 "data:xxx" 两种格式)
if (!line.startsWith("data:")) {
continue;
}
// 去掉 "data:" 前缀,SSE 标准规定冒号后最多去掉一个可选空格
String data = line.substring(5);
if (data.startsWith(" ")) {
data = data.substring(1);
}
// 检查流结束标记
if ("[DONE]".equals(data)) {
streamDone = true;
break;
}
// 解析 JSON(加容错)
JsonObject chunk;
try {
chunk = JsonParser.parseString(data).getAsJsonObject();
} catch (Exception e) {
// JSON 解析失败,跳过这个 chunk,不要中断整个流
System.err.println("JSON 解析失败,跳过: " + data);
continue;
}
// 提取 choices 数组
JsonArray choices = chunk.getAsJsonArray("choices");
if (choices == null || choices.isEmpty()) {
// 有些平台在最后一个 chunk(stream_options 模式)choices 为空数组
// 但可能有 usage 字段
usage = extractUsage(chunk, usage);
continue;
}
JsonObject choice = choices.get(0).getAsJsonObject();
// 提取 delta 中的 content
JsonObject delta = choice.getAsJsonObject("delta");
if (delta != null && delta.has("content")) {
JsonElement contentElement = delta.get("content");
if (!contentElement.isJsonNull()) {
String token = contentElement.getAsString();
if (!token.isEmpty()) {
fullContent.append(token);
callback.onToken(token);
}
}
}
// 提取 finish_reason
JsonElement finishElement = choice.get("finish_reason");
if (finishElement != null && !finishElement.isJsonNull()) {
String finishReason = finishElement.getAsString();
// finish_reason 不只是 "stop",还可能是:
// - "length":达到 max_tokens 上限,内容被截断
// - "content_filter":被安全过滤截断
// - "tool_calls":模型转入工具调用流程
// 这里统一标记为流结束,调用方可根据 finishReason 做更细的处理
usage = extractUsage(chunk, usage);
}
}
// 判断流是否正常结束
if (streamDone) {
callback.onComplete(fullContent.toString(), usage);
} else {
// readLine() 返回 null 但没收到 [DONE]——连接异常关闭
callback.onError(
new RuntimeException("SSE 流异常结束:未收到 [DONE] 标记"),
fullContent.toString()
);
}
} catch (Exception e) {
// 连接异常(超时、网络中断等),把已接收到的内容传给调用方
callback.onError(e, fullContent.toString());
}
}
/**
* 从 chunk 中提取 usage 信息
*/
private static Usage extractUsage(JsonObject chunk, Usage existing) {
if (!chunk.has("usage") || chunk.get("usage").isJsonNull()) {
return existing;
}
JsonObject usageJson = chunk.getAsJsonObject("usage");
Usage usage = new Usage();
usage.promptTokens = usageJson.has("prompt_tokens")
? usageJson.get("prompt_tokens").getAsInt() : 0;
usage.completionTokens = usageJson.has("completion_tokens")
? usageJson.get("completion_tokens").getAsInt() : 0;
usage.totalTokens = usageJson.has("total_tokens")
? usageJson.get("total_tokens").getAsInt() : 0;
return usage;
}
// ========== 运行示例 ==========
public static void main(String[] args) {
System.out.println("=== SSE 流式调用演示 ===\n");
streamChat(
"Qwen/Qwen3-32B",
"你是一个技术专家,回答简洁清晰。",
"用两三句话解释一下什么是 SSE 协议?",
new StreamCallback() {
@Override
public void onToken(String token) {
// 每收到一个 token 就实时输出(不换行)
System.out.print(token);
}
@Override
public void onComplete(String fullContent, Usage usage) {
System.out.println("\n");
System.out.println("--- 流式输出完毕 ---");
System.out.println("完整内容长度:" + fullContent.length() + " 字符");
if (usage != null) {
System.out.println("Token 统计:" + usage);
} else {
System.out.println("Token 统计:未返回");
}
}
@Override
public void onError(Exception e, String partialContent) {
System.err.println("\n\n--- 发生错误 ---");
System.err.println("错误信息:" + e.getMessage());
if (!partialContent.isEmpty()) {
System.err.println("已接收到的内容:" + partialContent);
}
}
}
);
}
}
运行输出:
=== SSE 流式调用演示 ===
SSE(Server-Sent Events)是基于 HTTP 的**服务器向客户端单向实时通信协议**,通过持久连接持续发送文本事件流(如 `data: message\n\n`)。
不同于 WebSocket,SSE**无需客户端主动发送消息**,适用于如实时通知、股票报价等需服务器主动推送的场景。
其优势包括**自动重连、兼容性好(HTML5 原生支持)**,但仅支持单向传输(服务器→客户端)。
--- 流式输出完毕 ---
完整内容长度:208 字符
Token 统计:prompt=33, completion=503, total=536
代码的几个关键设计点:
- 回调接口:
StreamCallback定义了三个回调方法——onToken(每个增量 token)、onComplete(流正常结束)、onError(出错)。调用方实现这个接口就能把流式内容集成到自己的业务逻辑里,比如实时推送给前端(下一篇会讲)、写入日志、或者显示进度。 - 错误时不丢内容:
onError方法带了一个partialContent参数。如果流传输到一半连接断了,调用方至少能拿到已经收到的部分内容,而不是什么都没有。 - JSON 解析容错:解析失败不中断整个流——跳过这个有问题的 chunk,继续处理后面的。在生产环境中,偶尔会遇到格式不规范的 chunk(服务端 bug 或网络传输问题),直接崩掉是不合适的。
- Usage 提取兼容:先尝试从
finish_reason: stop的 chunk 里提取usage,也兼容choices为空数组但有usage字段的情况(OpenAIstream_options模式)。
3. 常见坑与处理
3.1 流式超时怎么设
OkHttp 的 readTimeout 在非流式和流式场景下含义不同:
- 非流式:
readTimeout是从发出请求到收到完整响应的最大等待时间。设 30 秒意味着如果 30 秒内拿不到完整回答,就超时。 - 流式:
readTimeout是两次数据读取之间的最大等待时间。设 30 秒意味着如果 30 秒内没有收到任何新的 chunk,就超时。
这个区别很重要。流式场景下,整个响应可能持续几十秒甚至几分钟(长文本生成),但只要两个 chunk 之间的间隔不超过 readTimeout,就不会超时。
那设多少合适?
- 设太短(比如 5 秒):模型在思考比较复杂的问题时,两个 token 之间的间隔可能超过 5 秒(特别是用了深度思考模式的模型),会误判为超时。
- 设太长(比如 300 秒):如果服务端真的卡死了,你要等 5 分钟才能发现。
建议值:30~60 秒。对于大部分大模型 API,两个 chunk 之间的间隔通常在毫秒到几秒之间。30 秒的容忍度足够覆盖模型思考的场景,又不至于让真正的故障等太久。
connectTimeout和writeTimeout不受流式影响,正常设置即可(10~15 秒)。
3.2 空 delta 和缺失字段
不同平台、不同模型返回的 delta 结构不完全一致。你可能遇到这些情况:
// 情况 1:delta 有 role 但没有 content{"delta":{"role":"assistant"}}// 情况 2:delta 有 content 但是空字符串{"delta":{"content":""}}// 情况 3:delta 是空对象{"delta":{}}// 情况 4:delta 的 content 是 null{"delta":{"content":null}}
前面的代码里已经做了处理——检查 delta 是否为 null、是否有 content 字段、content 是否为 null 或空字符串。这些检查看着啰嗦,但少一个就可能在某个平台上翻车。
3.3 粘包问题
SSE 的数据是通过 HTTP 的 chunked transfer encoding 传输的。在网络层面,一次 TCP 传输可能包含多个 data: 行,也可能一个 data: 行被截断成两次传输。
如果你用 BufferedReader.readLine() 来读取,不用担心这个问题——readLine() 会帮你按换行符分割,保证每次返回一个完整的行。
但如果你用原始的 InputStream.read(byte[]) 来读取(比如为了更高的性能),就需要自己处理行边界。对于大部分场景,BufferedReader 的性能完全够用,没必要用原始 InputStream 去找麻烦。
3.4 连接中断的处理
网络不稳定时,SSE 连接可能中途断开。表现为 readLine() 抛出 IOException(通常是 SocketTimeoutException 或 SocketException)。
对于大模型 API 的场景,断了通常不能从断点续传——因为 LLM 的生成是有状态的,内部的注意力缓存(KV Cache)在服务端,连接断了这些状态就丢了。要继续的话只能重新发请求。
代码里的处理策略:
- 1.在
catch块里调用onError,把已接收到的部分内容传给调用方 - 2.调用方根据业务需求决定是否重试——如果已经收到了大部分内容,可能展示部分结果比重试更好
- 3.如果决定重试,把之前的问题重新发一次(不是续传,是全新的请求)
// 调用方的重试逻辑示例
public void chatWithRetry(String question, int maxRetries) {
for (int i = 0; i <= maxRetries; i++) {
final int attempt = i;
final boolean[] success = {false};
SseStreamClient.streamChat("Qwen/Qwen3-32B", null, question,
new SseStreamClient.StreamCallback() {
@Override
public void onToken(String token) {
System.out.print(token);
}
@Override
public void onComplete(String fullContent, SseStreamClient.Usage usage) {
success[0] = true;
}
@Override
public void onError(Exception e, String partialContent) {
System.err.printf("第 %d 次请求失败: %s%n", attempt + 1, e.getMessage());
if (!partialContent.isEmpty()) {
System.err.println("已接收: " + partialContent.length() + " 字符");
}
}
}
);
if (success[0]) break;
if (i < maxRetries) {
System.err.println("等待 2 秒后重试...");
try { Thread.sleep(2000); } catch (InterruptedException ignored) {}
}
}
}
重试要注意幂等性——大模型 Chat API 的请求天然是幂等的(同样的输入不一定得到同样的输出,但不会产生副作用),可以放心重试。但如果你的请求里包含了 Function Call 调用外部系统的操作,重试就要小心了。
SpringBoot-SSE服务端实战
为什么需要后端中间层
1. 前端直连大模型 API 的三个问题
你可能会想:前端直接调大模型 API 不就行了?何必多一层后端中间层?
不行。有三个硬伤:
- API Key 暴露:前端代码跑在用户的浏览器里,你的 JavaScript 代码、网络请求对用户是完全透明的。打开 F12 → Network 面板,请求头里的
Authorization: Bearer sk-xxx一目了然。API Key 泄露意味着任何人都能用你的额度调用大模型,账单直接爆炸。 - 跨域限制:大模型 API 的域名(比如
api.siliconflow.cn)和你的前端域名(比如www.yourapp.com)不同,浏览器的同源策略会拦截请求。虽然可以通过 CORS 解决,但 CORS 的响应头需要服务端配置——你控制不了第三方大模型 API 的 CORS 策略。 - 无法加业务逻辑:用户是否登录了?是否有调用权限?单个用户每分钟最多调几次?对话记录要不要存?敏感词要不要过滤?这些业务逻辑都需要在后端处理。前端直连等于跳过了整个业务层,你的系统就是一个没有门禁的大楼。
所以架构必须是这样的:
前端浏览器 → 你的 SpringBoot 后端 → 大模型 API
后端负责鉴权、限流、日志、对话历史存储,然后代理调用大模型 API,把流式响应转发给前端。 Controller 线程不会被长时间占用。Controller 方法创建 SseEmitter 对象后立即返回,线程就释放了。真正的数据推送是在另一个异步线程里进行的。这意味着即使流式生成需要 30 秒,Tomcat 的 Controller 线程也只占用了几毫秒。
Spring Boot SseEmitter 核心机制
1. SseEmitter 是什么
SseEmitter 是 Spring MVC 提供的异步响应工具,专门用于 SSE 服务端推送。
普通的 Controller 返回值(比如 ResponseEntity、String、Map)是同步响应——方法执行完,响应就发了,HTTP 连接就关了。一问一答,干脆利落。
SseEmitter 是异步响应——Controller 方法返回一个 SseEmitter 对象后,HTTP 连接不会关闭。你可以在其他线程里通过这个 SseEmitter 对象持续向客户端推送事件,直到你调用 emitter.complete() 主动关闭连接。
打个比方:普通 Controller 像自动售货机——投币、出货、走人。SseEmitter 像外卖平台——你下了单(发了请求),然后骑手持续给你推送状态更新(已接单、正在配送、即将到达),直到送达(complete())。
2. SseEmitter 的核心 API
SseEmitter 的 API 不多,核心就这几个:
| API | 作用 | 说明 |
|---|---|---|
new SseEmitter(timeout) | 创建实例,设置超时 | 超时单位毫秒,超时后自动关闭连接 |
send(Object data) | 推送默认类型事件 | 事件类型为 message,data 会被序列化为字符串 |
send(SseEventBuilder event) | 推送自定义事件 | 可指定 event 名称、id、data、comment |
complete() | 正常结束 | 关闭 SSE 连接 |
completeWithError(Throwable ex) | 异常结束 | 关闭连接并触发 onError 回调 |
onCompletion(Runnable) | 连接关闭回调 | 不管正常还是异常关闭都会触发 |
onTimeout(Runnable) | 超时回调 | 超时时触发 |
onError(Consumer<Throwable>) | 错误回调 | 出错时触发 |
其中 send(SseEventBuilder) 是最常用的,因为它可以指定事件类型:
// 推送一个 token 事件
emitter.send(SseEmitter.event()
.name("token") // event: token
.data("{\"content\":\"你好\"}")); // data: {"content":"你好"}
// 推送一个 done 事件
emitter.send(SseEmitter.event()
.name("done")
.data("{\"usage\":{\"total_tokens\":128}}"));
// 推送一个注释(心跳保活)
emitter.send(SseEmitter.event()
.comment("keepalive"));
还记得上一篇讲的 SSE event: 字段吗?大模型 API 通常不用 event: 字段,所有事件都是默认的 message 类型。但你自己搭建 SSE 服务端时,event: 字段就很有用了——前端可以根据不同的事件类型做不同的处理。
3. SseEmitter 的线程模型
SseEmitter 的异步本质是理解它的关键。整个过程涉及两个线程:
Controller 线程(Tomcat 线程池里的线程):
- 1.接收 HTTP 请求
- 2.创建
SseEmitter对象 - 3.启动异步任务(把
SseEmitter对象传给异步线程) - 4.返回
SseEmitter对象 - 5.Controller 线程结束,释放回 Tomcat 线程池
推送线程(你自己创建的异步线程):
- 1.拿着
SseEmitter对象 - 2.调用大模型流式 API
- 3.在回调里调用
emitter.send()逐 Token 推送 - 4.调用
emitter.complete()结束
这个设计的好处是 Controller 线程不会被长时间占用。如果不用 SseEmitter,而是在 Controller 里直接写一个循环发送数据,那这个请求会占住一个 Tomcat 线程直到流结束——30 秒的流式生成就占住一个线程 30 秒。Tomcat 默认 200 个线程,200 个并发用户就把线程池打满了。
用 SseEmitter,Controller 线程几毫秒就释放了。推送操作在独立的线程中进行,不占 Tomcat 线程池。
如果你在 Controller 线程里直接调
send()发完所有数据再complete(),那和普通接口没什么区别——线程还是被占住了。SseEmitter的价值在于跨线程推送:Controller 线程返回,推送线程接管。
Java 实战:三个层次的 SSE 服务端
1. 最简版:Hello SSE
下述为示例代码,大家可自行创建项目进行运行验证,也可仅关注其中的核心思想。关于基础的 Spring Boot 框架代码逻辑,本文在概念章节中不再做过多展开。
先用最少的代码跑通一个 SSE 接口,理解 SseEmitter 的基本套路。
@RestController
@RequestMapping("/api/sse")
public class SimpleSseController {
@GetMapping("/hello")
public SseEmitter hello() {
// 创建 SseEmitter,超时 60 秒
SseEmitter emitter = new SseEmitter(60_000L);
// 启动一个新线程做数据推送(不能在 Controller 线程里做)推送逻辑是异步长任务:适合交给独立线程/线程池处理
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
// 推送事件:event 名称为 "message"(默认),data 是字符串
emitter.send(SseEmitter.event()
.name("token")
.data("第 " + i + " 条消息"));
Thread.sleep(1000); // 每秒推送一条
}
// 推送完毕,关闭连接
emitter.send(SseEmitter.event()
.name("done")
.data("{\"msg\":\"推送完毕\"}"));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
// Controller 线程立即返回,不等推送完成
return emitter;
}
}
启动 Spring Boot 之后,用浏览器直接访问 http://localhost:8080/api/sse/hello,你会看到数据一条一条地出现——每秒出一条,5 秒后结束。
也可以用下面这段 HTML 在浏览器里测试(后面完整版也用这种方式验证):
<!DOCTYPE html><html><body><h3>SSE 测试</h3><div id="output"></div><script>
const output = document.getElementById('output');
const eventSource = new EventSource('/api/sse/hello');
eventSource.addEventListener('token', function(e) {
output.innerHTML += '<p>收到 token: ' + e.data + '</p>';
});
eventSource.addEventListener('done', function(e) {
output.innerHTML += '<p style="color:green">完成: ' + e.data + '</p>';
eventSource.close(); // 收到 done 事件后关闭连接
});
eventSource.onerror = function() {
output.innerHTML += '<p style="color:red">连接断开</p>';
eventSource.close();
};
</script></body></html>
这个最简版能跑通,但离生产可用还有距离——没有接入大模型、没有心跳、没有断连检测。接下来一步步加。
2. 完整版:接入大模型流式转发
完整版要实现的链路是:前端发请求 → Spring Boot 接收 → 调用大模型流式 API → 逐 Token 转发给前端。
事件类型设计:
| 事件名称 | 用途 | data 格式 |
|---|---|---|
token | 内容增量 | {"content": "一段文字"} |
done | 流结束 | {"usage": {"prompt_tokens":15, "completion_tokens":12}} |
error | 错误通知 | {"code": 500, "message": "错误描述"} |
前端根据事件类型做不同处理——收到 token 就拼接内容,收到 done 就结束,收到 error 就展示错误信息。
2.1 请求和响应结构
/** 前端请求体 */publicclassChatRequest{privateString question;// getter/setter 省略}
2.2 Controller 层
@RestController@RequestMapping("/api/chat")publicclassChatController{privatefinalChatService chatService;publicChatController(ChatService chatService){this.chatService = chatService;}@GetMapping("/stream")publicSseEmitterstream(@RequestParamString question){// 创建 SseEmitter,超时 3 分钟SseEmitter emitter =newSseEmitter(180_000L);// 设置回调(连接关闭时的清理逻辑)
emitter.onCompletion(()->System.out.println("SSE 连接关闭"));
emitter.onTimeout(()->System.out.println("SSE 连接超时"));
emitter.onError(e ->System.err.println("SSE 连接异常: "+ e.getMessage()));// 异步线程中调用大模型并转发
chatService.streamChat(question, emitter);return emitter;}}
这里用
@GetMapping+@RequestParam是为了兼容浏览器原生的EventSourceAPI(它只支持 GET 请求)。如果你的前端用fetch来消费 SSE(后面会讲),可以改成@PostMapping+@RequestBody,支持更复杂的请求体。
2.3 Service 层:流式转发核心逻辑
@ServicepublicclassChatService{privatestaticfinalStringAPI_URL="https://api.siliconflow.cn/v1/chat/completions";privatestaticfinalStringAPI_KEY="sk-xxx";// 替换为你的 API KeyprivatestaticfinalStringMODEL="Qwen/Qwen3-32B";privatefinalOkHttpClient httpClient =newOkHttpClient.Builder().connectTimeout(15,TimeUnit.SECONDS).readTimeout(60,TimeUnit.SECONDS).writeTimeout(15,TimeUnit.SECONDS).build();privatefinalGson gson =newGson();/**
* 异步调用大模型流式 API,逐 Token 通过 SseEmitter 转发给前端
*/publicvoidstreamChat(String question,SseEmitter emitter){// 在新线程中执行(生产环境应使用线程池,后面会讲)newThread(()->doStreamChat(question, emitter)).start();}privatevoiddoStreamChat(String question,SseEmitter emitter){// 1. 构建大模型请求体JsonObject requestBody =newJsonObject();
requestBody.addProperty("model",MODEL);
requestBody.addProperty("temperature",0.7);
requestBody.addProperty("max_tokens",2048);
requestBody.addProperty("stream",true);JsonArray messages =newJsonArray();JsonObject systemMsg =newJsonObject();
systemMsg.addProperty("role","system");
systemMsg.addProperty("content","你是一个技术专家,回答简洁清晰。");
messages.add(systemMsg);JsonObject userMsg =newJsonObject();
userMsg.addProperty("role","user");
userMsg.addProperty("content", question);
messages.add(userMsg);
requestBody.add("messages", messages);// 2. 发起 HTTP 请求Request request =newRequest.Builder().url(API_URL).addHeader("Authorization","Bearer "+API_KEY).addHeader("Content-Type","application/json").addHeader("Accept","text/event-stream").post(RequestBody.create(requestBody.toString(),MediaType.parse("application/json"))).build();// 3. 解析 SSE 流并转发try(Response response = httpClient.newCall(request).execute()){if(!response.isSuccessful()){sendError(emitter, response.code(),"大模型 API 调用失败: HTTP "+ response.code());return;}BufferedReader reader =newBufferedReader(newInputStreamReader(response.body().byteStream(),StandardCharsets.UTF_8));String line;while((line = reader.readLine())!=null){if(line.isEmpty()|| line.startsWith(":")){continue;}if(!line.startsWith("data:")){continue;}String data = line.substring(5);if(data.startsWith(" ")){
data = data.substring(1);}if("[DONE]".equals(data)){// 流结束,发送 done 事件
emitter.send(SseEmitter.event().name("done").data("{\"msg\":\"completed\"}"));
emitter.complete();return;}// 解析 JSONJsonObject chunk;try{
chunk =JsonParser.parseString(data).getAsJsonObject();}catch(Exception e){continue;// JSON 解析失败,跳过}// 提取 contentJsonArray choices = chunk.getAsJsonArray("choices");if(choices ==null|| choices.isEmpty()){// 可能是只有 usage 的 chunkif(chunk.has("usage")&&!chunk.get("usage").isJsonNull()){JsonObject usage = chunk.getAsJsonObject("usage");
emitter.send(SseEmitter.event().name("done").data(gson.toJson(usage)));
emitter.complete();return;}continue;}JsonObject choice = choices.get(0).getAsJsonObject();JsonObject delta = choice.getAsJsonObject("delta");if(delta !=null&& delta.has("content")){JsonElement contentElement = delta.get("content");if(!contentElement.isJsonNull()){String token = contentElement.getAsString();if(!token.isEmpty()){// 核心:通过 SseEmitter 把 token 推送给前端
emitter.send(SseEmitter.event().name("token").data("{\"content\":\""+escapeJson(token)+"\"}"));}}}// 检查 finish_reason,提取 usageJsonElement finishElement = choice.get("finish_reason");if(finishElement !=null&&!finishElement.isJsonNull()){if(chunk.has("usage")&&!chunk.get("usage").isJsonNull()){JsonObject usage = chunk.getAsJsonObject("usage");
emitter.send(SseEmitter.event().name("done").data(gson.toJson(usage)));}}}// readLine 返回 null 但没收到 [DONE],连接异常sendError(emitter,500,"流式响应异常结束");}catch(IOException e){sendError(emitter,500,"连接异常: "+ e.getMessage());}}/** 向前端推送错误事件 */privatevoidsendError(SseEmitter emitter,int code,String message){try{
emitter.send(SseEmitter.event().name("error").data("{\"code\":"+ code +",\"message\":\""+escapeJson(message)+"\"}"));
emitter.complete();}catch(IOException e){
emitter.completeWithError(e);}}/** 简单的 JSON 字符串转义 */privateStringescapeJson(String s){return s.replace("\\","\\\\").replace("\"","\\\"").replace("\n","\\n").replace("\r","\\r").replace("\t","\\t");}}
核心逻辑就是一句话:在大模型 SSE 客户端的回调里,调用 emitter.send() 把数据转发给前端。
整个链路是这样的:
- 1.前端发请求 → Controller 创建
SseEmitter,启动异步线程,立即返回 - 2.异步线程调用大模型流式 API → 逐行读取
data:行 - 3.解析出
delta.content→ 通过emitter.send()推送token事件给前端 - 4.收到
[DONE]→ 推送done事件 →emitter.complete()关闭连接 - 5.出错 → 推送
error事件 →emitter.complete()关闭连接
注意
sendError方法的设计——出错时不是直接completeWithError(),而是先通过 SSE 推送一个error事件让前端知道出了什么问题,然后再complete()。如果直接completeWithError(),前端只知道连接断了,不知道为什么断了。
3. 生产版:连接管理与健壮性
完整版能跑了,但在生产环境还有几个问题需要解决。
3.1 超时设置
SseEmitter 的默认超时是 30 秒。大模型生成一次回答可能需要 30~60 秒(特别是长文本、深度思考场景),30 秒一到 SseEmitter 自动关闭连接,前端直接断了。
建议把 SseEmitter 的超时设为 3~5 分钟:
SseEmitter emitter =newSseEmitter(180_000L);// 3 分钟
这里有一个容易混淆的地方:SseEmitter 的超时和 OkHttp 的 readTimeout 是两回事。
| 超时类型 | 含义 | 建议值 |
|---|---|---|
SseEmitter(timeout) | 整个 SSE 连接的生命周期上限 | 3~5 分钟 |
OkHttp readTimeout | 调用大模型 API 时,两个 chunk 之间的最大等待间隔 | 30~60 秒 |
SseEmitter 超时是这次对话最多持续多久,OkHttp readTimeout 是大模型多久没吐出新 token 就认为它卡住了。两个维度不同,不要搞混。
3.2 心跳保活
模型在思考时,可能几秒甚至十几秒没有输出。如果你的 Spring Boot 前面有 Nginx 反向代理,Nginx 的 proxy_read_timeout 默认 60 秒——超过 60 秒没有数据传输,Nginx 会主动断开连接。
解决方案:定期发送心跳注释事件。上一篇讲过,SSE 的注释行(: 开头)会被客户端忽略,但对中间件来说有数据在传输,就不会超时断开。
@ServicepublicclassChatService{// 心跳定时器privatefinalScheduledExecutorService heartbeatScheduler =Executors.newScheduledThreadPool(2);publicvoidstreamChat(String question,SseEmitter emitter){// 启动心跳:每 15 秒发送一次注释事件ScheduledFuture<?> heartbeat = heartbeatScheduler.scheduleAtFixedRate(()->{try{
emitter.send(SseEmitter.event().comment("keepalive"));}catch(IOException e){// 发送失败说明连接已断开,不需要处理}},15,15,TimeUnit.SECONDS);newThread(()->{try{doStreamChat(question, emitter);}finally{// 不管成功还是失败,都要停止心跳
heartbeat.cancel(false);}}).start();}}
心跳间隔建议 15~20 秒。太频繁浪费带宽,太稀疏可能还是会被中间件断开。
3.3 客户端断连检测
用户关闭浏览器标签页、刷新页面、或者网络断开,SSE 连接会从客户端侧断开。这时候后端可能还在调用大模型 API——用户已经走了,你还在花钱调 API,纯浪费。
SseEmitter 提供了三个回调来感知连接状态变化:
SseEmitter emitter =newSseEmitter(180_000L);
emitter.onCompletion(()->{System.out.println("连接关闭(正常或异常)");// 清理资源:停止心跳、从连接池移除});
emitter.onTimeout(()->{System.out.println("连接超时");// 超时也会触发 onCompletion});
emitter.onError(e ->{System.out.println("连接异常: "+ e.getMessage());// 异常也会触发 onCompletion});
onCompletion是最终回调——不管连接是正常complete()结束、超时结束、还是异常结束,onCompletion都会被触发。资源清理逻辑放在onCompletion里就行。
另外,当客户端断开后,emitter.send() 会抛出 IOException。所以推送代码里要做好异常处理:
try{
emitter.send(SseEmitter.event().name("token").data("{\"content\":\""+escapeJson(token)+"\"}"));}catch(IOException e){// 客户端已断开,停止继续推送// 如果还在调用大模型 API,可以尝试取消请求break;}
3.4 线程池管理
前面的代码用 new Thread() 来启动异步任务,这在生产环境是不可接受的——线程创建销毁的开销大,也没法控制并发数。
在 Ragent AI 项目中,大量使用了线程池,如果大家学习过 oneThread 动态线程池项目,可以写 1-2 个亮点到 Ragent AI,两者结合说了属于是。
应该用专门的线程池:
@Configuration
public class ThreadPoolConfig {
/**
* SSE 推送专用线程池
* 注意:不要用 Tomcat 的线程池,SSE 推送是长耗时任务,会阻塞 Tomcat 线程
*/
@Bean("sseExecutor")
public ExecutorService sseExecutor() {
return new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(100), // 等待队列
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "sse-push-" + counter.getAndIncrement());
t.setDaemon(true);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时由调用线程执行
);
}
}
Service 里改用线程池:
@Service
public class ChatService {
@Resource(name = "sseExecutor")
private ExecutorService sseExecutor;
public void streamChat(String question, SseEmitter emitter) {
sseExecutor.submit(() -> {
// 心跳和推送逻辑...
});
}
}
线程池的参数需要根据你的并发量和服务器配置来调。核心线程数可以参考你的预期并发 SSE 连接数。
CallerRunsPolicy拒绝策略意味着队列满时由调用线程(Controller 线程)执行——相当于降级为同步处理,不会丢失请求。
前端怎么接:EventSource API
后端开发需要知道前端怎么接 SSE,才能设计好接口。这里简要介绍前端消费 SSE 的两种方式。
1. 原生 EventSource 的基本用法
浏览器原生提供了 EventSource API 来消费 SSE:
// 创建 EventSource,自动发起 GET 请求
const eventSource = new EventSource('/api/chat/stream?question=什么是SSE');
// 监听自定义事件(对应后端 SseEmitter.event().name("token") 设置的名称)
eventSource.addEventListener('token', function(event) {
const data = JSON.parse(event.data);
document.getElementById('output').textContent += data.content;
});
eventSource.addEventListener('done', function(event) {
console.log('完成:', event.data);
eventSource.close(); // 必须手动关闭,否则 EventSource 会自动重连
});
eventSource.addEventListener('error', function(event) {
if (event.data) {
console.error('服务端错误:', event.data);
}
eventSource.close();
});
// 也可以监听默认的 message 事件(event 名称为 "message" 或未指定时触发)
// eventSource.onmessage = function(event) { ... };
注意这里和上一篇 SSE 协议的呼应:前端 addEventListener('token', ...) 里的 'token',就是后端 SseEmitter.event().name("token") 里设置的事件名称。所以设计事件类型时,前后端要约定好。
还有一个重要的点:EventSource 在连接断开后会自动重连。如果你在 done 事件里不调用 eventSource.close(),EventSource 发现连接断了会尝试重新连接,导致重复请求。收到结束信号后一定要手动关闭。
2. EventSource 的局限和替代方案
EventSource 有两个主要局限:
只支持 GET 请求。你的问题文本只能放在 URL 参数里。如果问题很长(几百字),URL 长度可能超出限制。更重要的是,如果请求体比较复杂(包含对话历史、系统配置等),塞在 URL 参数里既不优雅也不安全。
不能自定义请求头。想在请求头里加 Authorization Token 做鉴权?EventSource 做不到。
替代方案是用 fetch API + ReadableStream,可以发 POST 请求、自定义请求头:
async function streamChat(question) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-token', // 可以自定义请求头
'Accept': 'text/event-stream'
},
body: JSON.stringify({ question: question })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行分割,处理 SSE 格式
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一个可能不完整,留到下一次
for (const line of lines) {
if (line.startsWith('event: ')) {
// 解析事件类型
} else if (line.startsWith('data: ')) {
// 解析数据
}
}
}
}
这也是为什么很多 AI 产品的前端不用原生
EventSource,而是用fetch手动处理 SSE 流——因为对话请求通常是 POST 且需要带 Token。如果你的后端用@PostMapping,前端就必须用fetch方案。
生产环境的关键配置
1. Nginx 反向代理 SSE
这是生产环境最常踩的坑。
默认情况下,Nginx 开启了 proxy_buffering——它会把后端的响应数据缓冲在内存里,攒够一定量(或等一段时间)再一次性转发给客户端。对普通 HTTP 请求来说,缓冲可以提升性能,减少后端压力。
但对 SSE 来说,缓冲是致命的。后端 emitter.send() 了一个 token,Nginx 缓冲住了不转发,前端收不到数据。等 Nginx 缓冲区满了或者超时了,一堆数据才一次性涌过来。前端看到的效果就是:卡了好几秒,然后"哗"一下出来一大段——完全不是逐字输出的打字机效果。
解决方案:在 Nginx 的 SSE 接口配置中关闭缓冲。
location /api/chat/stream {
proxy_pass http://your-backend;
# 关键:关闭代理缓冲,数据立即转发
proxy_buffering off;
# 关闭代理缓存
proxy_cache off;
# 设置足够长的读取超时(SSE 是长连接)
proxy_read_timeout 300s;
# 确保使用 HTTP/1.1 长连接
proxy_http_version 1.1;
proxy_set_header Connection '';
# 开启 chunked 传输编码
chunked_transfer_encoding on;
# 告诉 Nginx 不要缓冲 SSE 响应
# 有些 Nginx 版本即使关了 proxy_buffering,也需要这个头
proxy_set_header X-Accel-Buffering no;
}
每个配置项的作用:
| 配置项 | 作用 | 不配的后果 |
|---|---|---|
proxy_buffering off | 关闭代理缓冲,后端数据立即转发 | 数据被缓冲,前端收不到实时推送 |
proxy_cache off | 关闭代理缓存 | SSE 响应可能被缓存,返回过期数据 |
proxy_read_timeout 300s | 读取超时设为 5 分钟 | 默认 60 秒,长文本生成可能超时断开 |
proxy_http_version 1.1 | 使用 HTTP/1.1 | HTTP/1.0 不支持长连接 |
proxy_set_header Connection '' | 清除 Connection 头 | 可能导致连接被提前关闭 |
X-Accel-Buffering no | 额外的缓冲关闭指令 | 某些场景下 proxy_buffering off 不生效 |
你也可以在 Spring Boot 侧通过响应头来关闭缓冲:
@GetMapping("/stream")publicSseEmitterstream(@RequestParamString question,HttpServletResponse response){// 告诉 Nginx 不要缓冲这个响应
response.setHeader("X-Accel-Buffering","no");
response.setHeader("Cache-Control","no-cache");SseEmitter emitter =newSseEmitter(180_000L);// ...return emitter;}
proxy_buffering off和X-Accel-Buffering: no最好都配上。有些 Nginx 版本或配置场景下,只配一个可能不生效。双重保险总没错。
2. 连接数与线程池
SSE 是长连接,每个活跃的 SSE 连接在后端占用的资源包括:
- 一个
SseEmitter对象(内存) - 一个 SSE 推送线程(线程池)
- 一个到大模型 API 的 HTTP 连接(OkHttp 连接池)
- Tomcat 的一个 NIO 连接
SseEmitter 本身是异步的,Controller 线程立即返回,不占 Tomcat 线程池。但它占用 Tomcat 的 NIO 连接数。Tomcat 的 max-connections 默认 8192,通常够用。真正的瓶颈往往在推送线程池和大模型 API 的并发限制上。
线程池的配置建议:
| 参数 | 建议值 | 说明 |
|---|---|---|
| 核心线程数 | 10~20 | 覆盖正常并发量 |
| 最大线程数 | 50~100 | 覆盖峰值 |
| 队列容量 | 100~200 | 超过的请求等待或降级 |
| 拒绝策略 | CallerRunsPolicy | 队列满时降级为同步,不丢失请求 |
还有一个容易忽略的点:调用大模型 API 的 OkHttpClient 应该是全局单例,不要每次请求都 new OkHttpClient()。OkHttpClient 内部维护了连接池和线程池,每次 new 都会创建新的连接池,连接无法复用,资源浪费严重。
3. 内存泄漏防范
SseEmitter 对象在连接期间一直被 Spring 持有。如果连接异常断开但 SseEmitter 没有被 complete(),它会一直留在内存里直到超时。
防范措施:
- 1.设置合理的超时时间:不要设
0(表示永不超时)——如果客户端断开而你没感知到,这个SseEmitter就永远不会被清理。建议 3~5 分钟 - 2.
onCompletion回调里清理资源:停止心跳定时任务、记录日志 - 3.定期巡检连接池:虽然
onCompletion回调应该能覆盖所有清理场景,但作为兜底,可以用一个定时任务每分钟检查一下连接池,清理超过超时时间还存在的连接
4. 负载均衡注意事项
如果你的 Spring Boot 服务做了多实例部署 + 轮询负载均衡,SSE 长连接会带来一个问题:
SSE 是一个持续的 HTTP 连接。连接建立后,所有的 send() 数据都必须通过同一个后端实例推送。如果中间负载均衡器把连接切到了另一个实例,数据就丢了。
对于 SSE 场景,有两种处理方式:
- Sticky Session(会话粘滞):配置负载均衡器,让同一个客户端的请求始终打到同一个后端实例。Nginx 可以用
ip_hash或sticky cookie实现 - 无状态设计:SSE 连接本身是有状态的(绑定在某个实例上),但业务逻辑做成无状态的——对话历史存在 Redis/MySQL 而不是内存里,任何实例都能处理
实际上,对于大模型流式转发这个场景,SSE 连接只在一次对话的生成过程中存在(几秒到几十秒),生成完就断开了。只要负载均衡器不在一次 SSE 连接中间切换后端实例(Nginx 的默认行为不会这么做),就不需要特殊配置。

Comments NOTHING