手搓RAG系统 – Ragent AI(12)

eve2333 发布于 19 小时前 5 次阅读


SSE协议与流式响应

SSE 协议:不只是 data: 开头的文本

1. SSE 是什么

SSE 全称 Server-Sent Events,直译就是服务端发送的事件。它是 HTML 标准的一部分,目前的权威规范在 WHATWG HTML Living Standard 里,定义了 text/event-stream 这套事件流格式和浏览器端的 EventSource API。

一句话总结:基于 HTTP 的单向服务端推送协议

客户端发起一个普通的 HTTP 请求,服务端保持连接不关闭,持续向客户端推送事件。注意是单向的——只有服务端往客户端推,客户端不能通过同一个连接往服务端发数据。

和普通 HTTP 请求的区别很直观:

  • 普通 HTTP:客户端发请求 → 服务端返回完整响应 → 连接关闭。一问一答。
  • SSE:客户端发请求 → 服务端持续推送事件 → 推完了或客户端主动断开 → 连接关闭。一问多答。

打个比方:普通 HTTP 像发短信,你发一条我回一条,每次都是独立的。SSE 像打电话,拨通之后对方一直在说(你只需要听),说完了再挂。

SSE 响应有一个标志性的 HTTP 头:

Content-Type: text/event-stream

浏览器和 HTTP 客户端看到这个 Content-Type,就知道这是一个 SSE 流,我要按 SSE 的规则来解析。

2. SSE 的完整字段格式

其实 SSE 规范一共定义了四个字段和一个注释机制,每个都有明确的用途。一个完整的 SSE 事件流长这样:

retry: 3000: 这是一条注释,客户端会忽略

id: 1
event: message
data: {"content":"你好"}

id: 2
event: token
data: {"content":","}

id: 3
event: token
data: {"content":"有什么可以帮你?"}

id: 4
event: done
data: {"status":"completed", "total_tokens":128}

逐个来看。

2.1 data: 字段——事件数据

你最熟悉的字段。每行以 data: 开头,后面跟的是事件的数据内容。

data: {"content":"你好"}

一个事件可以有多行 data:,它们会被换行符 \n 拼接起来:

data: 第一行
data: 第二行
data: 第三行

客户端收到的数据是 第一行\n第二行\n第三行

一个事件以两个连续换行(空行)结束。这是 SSE 的事件边界标志——解析器看到空行就知道上一个事件结束了,该处理它了。

2.2 event: 字段——自定义事件类型

如果不写 event: 字段,事件的类型默认是 message。通过 event: 字段可以给事件指定自定义类型:

event: token
data: {"content":"你好"}

event: error
data: {"code":429, "message":"rate limit exceeded"}

event: done
data: {"total_tokens":128}

这样客户端可以根据事件类型做不同的处理——收到 token 事件就拼接内容,收到 error 事件就展示错误,收到 done 事件就结束流。

大模型 API(OpenAI、SiliconFlow 等)通常不使用 event: 字段,所有事件都是默认的 message 类型,靠 data: 里的 JSON 内容来区分。但如果你自己搭建 SSE 服务端,event: 字段就很有用了——下一篇 Spring Boot SSE 服务端实战会用到。

2.3 id: 字段——事件 ID

每个事件可以有一个 ID:

id: 42
data: {"content":"这是第 42 个事件"}

id: 字段的核心作用是支持断线重连。机制是这样的:

  1. 1.服务端给每个事件编一个 id:
  2. 2.客户端内部会记住最后收到的事件 ID
  3. 3.如果连接断开,客户端重连时会在 HTTP 请求头里带上 Last-Event-ID: 42
  4. 4.服务端看到这个头,就知道客户端已经收到了 ID 42 之前的所有事件,从 ID 43 开始继续推送

这个机制让 SSE 天然支持断点续传——连接断了不用从头来,从上次断开的地方接着推。不过大模型 API 通常不使用 id: 字段。因为大模型的流式生成是一次性的——内容是实时生成的,断了就没法从断点接着生成,只能重新请求。

2.4 retry: 字段——重连间隔

服务端可以通过 retry: 告诉客户端如果连接断了,等多久再重连:

retry: 5000
data: {"content":"连接已建立"}

retry: 5000 的意思是如果连接断了,等 5000 毫秒(5 秒)再尝试重连。这个字段通常只在连接建立后发送一次。

浏览器原生的 EventSource API 会自动处理重连(默认间隔约 3 秒)。但在 Java 客户端里,你需要自己实现重连逻辑——OkHttp 的 HTTP 客户端不会自动按 retry: 的值重连。

2.5 注释行——心跳保活

以冒号 : 开头的行是注释,客户端会直接忽略:

: this is a comment
: keepalive

注释看起来没什么用,但在生产环境有一个很重要的作用——心跳保活

问题背景:SSE 连接在数据推送的间隙可能长时间没有数据传输(比如大模型在思考,几秒甚至十几秒没有输出)。如果中间经过了 Nginx、负载均衡器、CDN 等中间件,它们可能会认为这个连接已经死了,主动把连接断掉。

解决方案:服务端定期发送一个空注释 : keepalive\n\n 或者 :\n\n,客户端会忽略它,但中间件看到有数据在传输,就不会超时断开连接。

SSE vs WebSocket vs 长轮询

你可能会好奇:为什么大模型 API 都用 SSE?不是还有 WebSocket 吗?WebSocket 不是更高级吗?

1. 三种方案的本质区别

先搞清楚三种实时通信方案各自是什么:

  • 长轮询(Long Polling):客户端发请求,服务端如果没有新数据就 hold 住不返回(而不是立刻返回空),等有新数据了再返回。客户端收到响应后立刻再发一个新请求。本质上还是一问一答,只是每次答的等待时间变长了。
  • SSE(Server-Sent Events):客户端发一个 HTTP 请求,服务端保持连接,持续单向推送数据。基于 HTTP,只能服务端往客户端推。
  • WebSocket:客户端和服务端通过一次 HTTP 握手升级到 WebSocket 协议,之后双方可以随时互发消息。这是一个独立于 HTTP 的全双工协议。

2. 为什么大模型 API 选了 SSE

主要是四个原因:

  • 方向匹配:大模型流式输出是典型的服务端→客户端单向推送——模型生成内容,逐块发给你。你不需要在同一个连接上往服务端发数据(你的问题在第一次 HTTP 请求里已经发过了)。SSE 刚好满足这个需求,WebSocket 的双向能力完全用不上。
  • HTTP 原生:SSE 就是一个普通的 HTTP 响应,只是 Content-Type 变成了 text/event-stream,数据格式是文本流。Nginx、CDN、负载均衡器、API 网关……所有 HTTP 基础设施都能原生支持,不需要额外配置。WebSocket 则需要专门配置代理的 WebSocket 升级支持(proxy_set_header Upgrade $http_upgrade 这类配置),在某些企业网络环境下可能被防火墙拦截。
  • 自动重连:SSE 规范内置了重连机制(Last-Event-ID + retry:),浏览器的 EventSource API 自动处理。WebSocket 断线后需要你自己写重连逻辑。
  • 实现简单:服务端只需要按格式输出文本行,客户端只需要逐行解析。WebSocket 有自己的帧协议(Frame),需要处理帧的编码解码、ping/pong 心跳、关闭帧等,实现复杂度高一个量级。

一个简单的判断标准:如果你的场景是服务端向客户端持续推送数据,优先考虑 SSE。只有当客户端也需要实时向服务端发送数据时(在线协作编辑、实时游戏、聊天室里的“对方正在输入…”),才需要 WebSocket。

大模型流式响应的数据结构

模型调用 API 那篇里已经展示过流式响应的基本格式。这里更系统地梳理一遍完整的数据结构,特别是一些之前没展开讲的细节。

1. 流式 vs 非流式的响应对比

先放一张对比表,有个全局认知:

2. 流式响应的逐 chunk 解析

用一个实际的例子来过一遍完整的流式响应。假设你问模型 SSE 是什么?,模型回答 SSE 是一种基于 HTTP 的服务端推送协议。

服务端推送过来的 SSE 流完整长这样:

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"SSE"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":" 是"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"一种"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"基于"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":" HTTP "},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"的"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"服务端"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"推送"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{"content":"协议。"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":15,"completion_tokens":12,"total_tokens":27}}

data: [DONE]

逐 chunk 拆解:

序号delta 内容finish_reason说明
1{"role":"assistant","content":""}null开场白:模型说"我准备开始说话了",role 只出现这一次
2{"content":"SSE"}null开始输出内容
3{"content":" 是"}null继续输出
4~10{"content":"..."}null逐块输出内容
11{}"stop"delta 为空,finish_reasonstop——说话结束
-[DONE]-SSE 流结束标记,不是 JSON

客户端要做的事情:把第 1~10 个 chunk 的 delta.content 拼接起来,就是完整的回答。

有几个需要注意的细节:

  • 第一个 chunk 的 content 可能是空字符串,不是所有平台都这样,但你需要兼容。有的平台第一个 chunk 只有 role,没有 content 字段。
  • 中间可能出现空 delta:有的 chunk 的 delta 是空对象 {},既没有 content 也没有 role。不要因为拿不到 content 就报错。
  • finish_reason 只在最后一个 chunk 有值,之前的 chunk 都是 null。值不只是 "stop"(正常结束),还可能是 "length"(达到 max_tokens 上限被截断)、"content_filter"(被安全过滤截断)、"tool_calls"(模型转入工具调用流程)。代码里需要根据不同的值做相应处理。
  • data: [DONE] 不是 JSON,不能用 JSON 解析器去解析,要单独判断。

3. 流式模式下的 Token 统计

非流式响应里,usage 字段直接就在响应体里:

{"choices":[...],"usage":{"prompt_tokens":15,"completion_tokens":12,"total_tokens":27}}

流式模式下就麻烦了。不同平台的处理方式不一样:

方式一:最后一个 chunk 里附带 usage

部分平台(包括 SiliconFlow)会在 finish_reason: "stop" 那个 chunk 里附带 usage 字段。前面的示例里已经展示了这种情况。

方式二:需要额外参数 stream_options

OpenAI 的 API 需要在请求中显式加上 stream_options

{"model":"gpt-4","messages":[...],"stream":true,"stream_options":{"include_usage":true}}

加了这个参数之后,OpenAI 会在流的最后额外发送一个只包含 usage 的 chunk(choices 为空数组)。

方式三:不返回 usage

有些平台在流式模式下压根不返回 usage。这种情况下,completion_tokens 可以在客户端自己估算(统计收到的所有 delta.content 拼接后的字符数或 Token 数),但 prompt_tokens 只有服务端知道,拿不到就拿不到。

实测 SiliconFlow 平台的行为:在最后一个 chunk(finish_reason: "stop")中会返回 usage 字段,不需要额外设置 stream_options。如果你用的是其他平台,建议先测一下流式响应里有没有 usage,没有的话看文档是否支持 stream_options

Java 实战:健壮的 SSE 客户端

作为入门 demo 完全没问题,但在生产环境有几个隐患:

  • 没有超时控制——如果服务端卡住不发数据(模型推理异常、网络拥塞),客户端的 readLine() 会一直阻塞
  • 没有错误处理——连接中断直接抛 IOException,调用方拿不到已经接收到的部分内容
  • 没有回调机制——所有内容都是 System.out.print 直接打印,没法集成到业务逻辑里(比如实时推送给前端)
  • 没有 Token 统计——流式模式下 usage 的提取逻辑缺失
  • 边界情况没处理——空 delta、缺失 content 字段、JSON 解析失败都没有容错

接下来封装一个相对偏向于生产可用的 SSE 流式客户端。

注意:下面的代码实现的是大模型流式 API 常见的 data-only SSE 消费逻辑,针对 OpenAI 兼容接口的主流场景做了优化。它不是一个完整的通用 SSE 协议解析器——标准 SSE 还有多行 data: 拼接、未完成事件不派发等规则,在大模型 API 的场景下用不到,这里不做处理。

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import okhttp3.*;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

public class SseStreamClient {

    private static final String API_URL = "https://api.siliconflow.cn/v1/chat/completions";
    private static final String API_KEY = "sk-xxx"; // 替换为你的 API Key

    // ========== 回调接口 ==========

    /**
     * SSE 流式响应的事件回调
     */
    interface StreamCallback {
        /** 收到一个 content 增量(每个 token 调用一次) */
        void onToken(String token);

        /** 流正常结束,返回完整内容和 Token 统计 */
        void onComplete(String fullContent, Usage usage);

        /** 发生错误,partialContent 是错误发生前已接收到的内容 */
        void onError(Exception e, String partialContent);
    }

    /**
     * Token 用量统计
     */
    static class Usage {
        int promptTokens;
        int completionTokens;
        int totalTokens;

        @Override
        public String toString() {
            return String.format("prompt=%d, completion=%d, total=%d",
                    promptTokens, completionTokens, totalTokens);
        }
    }

    // ========== 核心方法 ==========

    /**
     * 发起流式请求
     *
     * @param model       模型 ID
     * @param systemPrompt System 消息内容
     * @param userMessage  用户消息内容
     * @param callback     事件回调
     */
    public static void streamChat(String model, String systemPrompt,
                                  String userMessage, StreamCallback callback) {
        // 1. 构建请求体
        JsonObject requestBody = new JsonObject();
        requestBody.addProperty("model", model);
        requestBody.addProperty("temperature", 0.7);
        requestBody.addProperty("max_tokens", 2048);
        requestBody.addProperty("stream", true);

        JsonArray messages = new JsonArray();
        if (systemPrompt != null && !systemPrompt.isEmpty()) {
            JsonObject sysMsg = new JsonObject();
            sysMsg.addProperty("role", "system");
            sysMsg.addProperty("content", systemPrompt);
            messages.add(sysMsg);
        }
        JsonObject userMsg = new JsonObject();
        userMsg.addProperty("role", "user");
        userMsg.addProperty("content", userMessage);
        messages.add(userMsg);
        requestBody.add("messages", messages);

        // 2. 创建 HTTP 客户端
        // 关键:readTimeout 是"两个数据块之间的最大等待时间",不是整个响应的超时
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(15, TimeUnit.SECONDS)
                .readTimeout(60, TimeUnit.SECONDS)   // 流式场景需要更长
                .writeTimeout(15, TimeUnit.SECONDS)
                .build();

        Request request = new Request.Builder()
                .url(API_URL)
                .addHeader("Authorization", "Bearer " + API_KEY)
                .addHeader("Content-Type", "application/json")
                .addHeader("Accept", "text/event-stream")  // 明确告诉服务端我要 SSE
                .post(RequestBody.create(requestBody.toString(),
                        MediaType.parse("application/json")))
                .build();

        // 3. 发起请求并解析 SSE 流
        StringBuilder fullContent = new StringBuilder();
        Usage usage = null;

        try (Response response = client.newCall(request).execute()) {
            // 检查 HTTP 状态码
            if (!response.isSuccessful()) {
                String errorBody = response.body() != null ? response.body().string() : "无响应体";
                callback.onError(
                        new RuntimeException("HTTP " + response.code() + ": " + errorBody),
                        fullContent.toString()
                );
                return;
            }

            // 逐行读取 SSE 流(显式指定 UTF-8,SSE 规范要求 UTF-8 编码)
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(response.body().byteStream(), StandardCharsets.UTF_8));
            String line;
            boolean streamDone = false;  // 是否收到了 [DONE] 标记

            while ((line = reader.readLine()) != null) {
                // 跳过空行(SSE 事件分隔符)
                if (line.isEmpty()) {
                    continue;
                }

                // 跳过注释行(心跳保活)
                if (line.startsWith(":")) {
                    continue;
                }

                // 只处理 data: 开头的行(兼容 "data: xxx" 和 "data:xxx" 两种格式)
                if (!line.startsWith("data:")) {
                    continue;
                }

                // 去掉 "data:" 前缀,SSE 标准规定冒号后最多去掉一个可选空格
                String data = line.substring(5);
                if (data.startsWith(" ")) {
                    data = data.substring(1);
                }

                // 检查流结束标记
                if ("[DONE]".equals(data)) {
                    streamDone = true;
                    break;
                }

                // 解析 JSON(加容错)
                JsonObject chunk;
                try {
                    chunk = JsonParser.parseString(data).getAsJsonObject();
                } catch (Exception e) {
                    // JSON 解析失败,跳过这个 chunk,不要中断整个流
                    System.err.println("JSON 解析失败,跳过: " + data);
                    continue;
                }

                // 提取 choices 数组
                JsonArray choices = chunk.getAsJsonArray("choices");
                if (choices == null || choices.isEmpty()) {
                    // 有些平台在最后一个 chunk(stream_options 模式)choices 为空数组
                    // 但可能有 usage 字段
                    usage = extractUsage(chunk, usage);
                    continue;
                }

                JsonObject choice = choices.get(0).getAsJsonObject();

                // 提取 delta 中的 content
                JsonObject delta = choice.getAsJsonObject("delta");
                if (delta != null && delta.has("content")) {
                    JsonElement contentElement = delta.get("content");
                    if (!contentElement.isJsonNull()) {
                        String token = contentElement.getAsString();
                        if (!token.isEmpty()) {
                            fullContent.append(token);
                            callback.onToken(token);
                        }
                    }
                }

                // 提取 finish_reason
                JsonElement finishElement = choice.get("finish_reason");
                if (finishElement != null && !finishElement.isJsonNull()) {
                    String finishReason = finishElement.getAsString();
                    // finish_reason 不只是 "stop",还可能是:
                    // - "length":达到 max_tokens 上限,内容被截断
                    // - "content_filter":被安全过滤截断
                    // - "tool_calls":模型转入工具调用流程
                    // 这里统一标记为流结束,调用方可根据 finishReason 做更细的处理
                    usage = extractUsage(chunk, usage);
                }
            }

            // 判断流是否正常结束
            if (streamDone) {
                callback.onComplete(fullContent.toString(), usage);
            } else {
                // readLine() 返回 null 但没收到 [DONE]——连接异常关闭
                callback.onError(
                        new RuntimeException("SSE 流异常结束:未收到 [DONE] 标记"),
                        fullContent.toString()
                );
            }

        } catch (Exception e) {
            // 连接异常(超时、网络中断等),把已接收到的内容传给调用方
            callback.onError(e, fullContent.toString());
        }
    }

    /**
     * 从 chunk 中提取 usage 信息
     */
    private static Usage extractUsage(JsonObject chunk, Usage existing) {
        if (!chunk.has("usage") || chunk.get("usage").isJsonNull()) {
            return existing;
        }
        JsonObject usageJson = chunk.getAsJsonObject("usage");
        Usage usage = new Usage();
        usage.promptTokens = usageJson.has("prompt_tokens")
                ? usageJson.get("prompt_tokens").getAsInt() : 0;
        usage.completionTokens = usageJson.has("completion_tokens")
                ? usageJson.get("completion_tokens").getAsInt() : 0;
        usage.totalTokens = usageJson.has("total_tokens")
                ? usageJson.get("total_tokens").getAsInt() : 0;
        return usage;
    }

    // ========== 运行示例 ==========

    public static void main(String[] args) {
        System.out.println("=== SSE 流式调用演示 ===\n");

        streamChat(
            "Qwen/Qwen3-32B",
            "你是一个技术专家,回答简洁清晰。",
            "用两三句话解释一下什么是 SSE 协议?",
            new StreamCallback() {
                @Override
                public void onToken(String token) {
                    // 每收到一个 token 就实时输出(不换行)
                    System.out.print(token);
                }

                @Override
                public void onComplete(String fullContent, Usage usage) {
                    System.out.println("\n");
                    System.out.println("--- 流式输出完毕 ---");
                    System.out.println("完整内容长度:" + fullContent.length() + " 字符");
                    if (usage != null) {
                        System.out.println("Token 统计:" + usage);
                    } else {
                        System.out.println("Token 统计:未返回");
                    }
                }

                @Override
                public void onError(Exception e, String partialContent) {
                    System.err.println("\n\n--- 发生错误 ---");
                    System.err.println("错误信息:" + e.getMessage());
                    if (!partialContent.isEmpty()) {
                        System.err.println("已接收到的内容:" + partialContent);
                    }
                }
            }
        );
    }
}

运行输出:

=== SSE 流式调用演示 ===

SSE(Server-Sent Events)是基于 HTTP 的**服务器向客户端单向实时通信协议**,通过持久连接持续发送文本事件流(如 `data: message\n\n`)。  
不同于 WebSocket,SSE**无需客户端主动发送消息**,适用于如实时通知、股票报价等需服务器主动推送的场景。  
其优势包括**自动重连、兼容性好(HTML5 原生支持)**,但仅支持单向传输(服务器→客户端)。

--- 流式输出完毕 ---
完整内容长度:208 字符
Token 统计:prompt=33, completion=503, total=536

代码的几个关键设计点:

  • 回调接口StreamCallback 定义了三个回调方法——onToken(每个增量 token)、onComplete(流正常结束)、onError(出错)。调用方实现这个接口就能把流式内容集成到自己的业务逻辑里,比如实时推送给前端(下一篇会讲)、写入日志、或者显示进度。
  • 错误时不丢内容onError 方法带了一个 partialContent 参数。如果流传输到一半连接断了,调用方至少能拿到已经收到的部分内容,而不是什么都没有。
  • JSON 解析容错:解析失败不中断整个流——跳过这个有问题的 chunk,继续处理后面的。在生产环境中,偶尔会遇到格式不规范的 chunk(服务端 bug 或网络传输问题),直接崩掉是不合适的。
  • Usage 提取兼容:先尝试从 finish_reason: stop 的 chunk 里提取 usage,也兼容 choices 为空数组但有 usage 字段的情况(OpenAI stream_options 模式)。

3. 常见坑与处理

3.1 流式超时怎么设

OkHttp 的 readTimeout 在非流式和流式场景下含义不同:

  • 非流式readTimeout 是从发出请求到收到完整响应的最大等待时间。设 30 秒意味着如果 30 秒内拿不到完整回答,就超时。
  • 流式readTimeout 是两次数据读取之间的最大等待时间。设 30 秒意味着如果 30 秒内没有收到任何新的 chunk,就超时。

这个区别很重要。流式场景下,整个响应可能持续几十秒甚至几分钟(长文本生成),但只要两个 chunk 之间的间隔不超过 readTimeout,就不会超时。

那设多少合适?

  • 设太短(比如 5 秒):模型在思考比较复杂的问题时,两个 token 之间的间隔可能超过 5 秒(特别是用了深度思考模式的模型),会误判为超时。
  • 设太长(比如 300 秒):如果服务端真的卡死了,你要等 5 分钟才能发现。

建议值:30~60 秒。对于大部分大模型 API,两个 chunk 之间的间隔通常在毫秒到几秒之间。30 秒的容忍度足够覆盖模型思考的场景,又不至于让真正的故障等太久。

connectTimeoutwriteTimeout 不受流式影响,正常设置即可(10~15 秒)。

3.2 空 delta 和缺失字段

不同平台、不同模型返回的 delta 结构不完全一致。你可能遇到这些情况:

// 情况 1:delta 有 role 但没有 content{"delta":{"role":"assistant"}}// 情况 2:delta 有 content 但是空字符串{"delta":{"content":""}}// 情况 3:delta 是空对象{"delta":{}}// 情况 4:delta 的 content 是 null{"delta":{"content":null}}

前面的代码里已经做了处理——检查 delta 是否为 null、是否有 content 字段、content 是否为 null 或空字符串。这些检查看着啰嗦,但少一个就可能在某个平台上翻车。

3.3 粘包问题

SSE 的数据是通过 HTTP 的 chunked transfer encoding 传输的。在网络层面,一次 TCP 传输可能包含多个 data: 行,也可能一个 data: 行被截断成两次传输。

如果你用 BufferedReader.readLine() 来读取,不用担心这个问题——readLine() 会帮你按换行符分割,保证每次返回一个完整的行。

但如果你用原始的 InputStream.read(byte[]) 来读取(比如为了更高的性能),就需要自己处理行边界。对于大部分场景,BufferedReader 的性能完全够用,没必要用原始 InputStream 去找麻烦。

3.4 连接中断的处理

网络不稳定时,SSE 连接可能中途断开。表现为 readLine() 抛出 IOException(通常是 SocketTimeoutExceptionSocketException)。

对于大模型 API 的场景,断了通常不能从断点续传——因为 LLM 的生成是有状态的,内部的注意力缓存(KV Cache)在服务端,连接断了这些状态就丢了。要继续的话只能重新发请求。

代码里的处理策略:

  1. 1.在 catch 块里调用 onError,把已接收到的部分内容传给调用方
  2. 2.调用方根据业务需求决定是否重试——如果已经收到了大部分内容,可能展示部分结果比重试更好
  3. 3.如果决定重试,把之前的问题重新发一次(不是续传,是全新的请求)
// 调用方的重试逻辑示例
public void chatWithRetry(String question, int maxRetries) {
    for (int i = 0; i <= maxRetries; i++) {
        final int attempt = i;
        final boolean[] success = {false};

        SseStreamClient.streamChat("Qwen/Qwen3-32B", null, question,
            new SseStreamClient.StreamCallback() {
                @Override
                public void onToken(String token) {
                    System.out.print(token);
                }

                @Override
                public void onComplete(String fullContent, SseStreamClient.Usage usage) {
                    success[0] = true;
                }

                @Override
                public void onError(Exception e, String partialContent) {
                    System.err.printf("第 %d 次请求失败: %s%n", attempt + 1, e.getMessage());
                    if (!partialContent.isEmpty()) {
                        System.err.println("已接收: " + partialContent.length() + " 字符");
                    }
                }
            }
        );

        if (success[0]) break;
        if (i < maxRetries) {
            System.err.println("等待 2 秒后重试...");
            try { Thread.sleep(2000); } catch (InterruptedException ignored) {}
        }
    }
}

重试要注意幂等性——大模型 Chat API 的请求天然是幂等的(同样的输入不一定得到同样的输出,但不会产生副作用),可以放心重试。但如果你的请求里包含了 Function Call 调用外部系统的操作,重试就要小心了。

SpringBoot-SSE服务端实战

为什么需要后端中间层

1. 前端直连大模型 API 的三个问题

你可能会想:前端直接调大模型 API 不就行了?何必多一层后端中间层?

不行。有三个硬伤:

  • API Key 暴露:前端代码跑在用户的浏览器里,你的 JavaScript 代码、网络请求对用户是完全透明的。打开 F12 → Network 面板,请求头里的 Authorization: Bearer sk-xxx 一目了然。API Key 泄露意味着任何人都能用你的额度调用大模型,账单直接爆炸。
  • 跨域限制:大模型 API 的域名(比如 api.siliconflow.cn)和你的前端域名(比如 www.yourapp.com)不同,浏览器的同源策略会拦截请求。虽然可以通过 CORS 解决,但 CORS 的响应头需要服务端配置——你控制不了第三方大模型 API 的 CORS 策略。
  • 无法加业务逻辑:用户是否登录了?是否有调用权限?单个用户每分钟最多调几次?对话记录要不要存?敏感词要不要过滤?这些业务逻辑都需要在后端处理。前端直连等于跳过了整个业务层,你的系统就是一个没有门禁的大楼。

所以架构必须是这样的:

前端浏览器 → 你的 SpringBoot 后端 → 大模型 API

后端负责鉴权、限流、日志、对话历史存储,然后代理调用大模型 API,把流式响应转发给前端。 Controller 线程不会被长时间占用。Controller 方法创建 SseEmitter 对象后立即返回,线程就释放了。真正的数据推送是在另一个异步线程里进行的。这意味着即使流式生成需要 30 秒,Tomcat 的 Controller 线程也只占用了几毫秒。

Spring Boot SseEmitter 核心机制

1. SseEmitter 是什么

SseEmitter 是 Spring MVC 提供的异步响应工具,专门用于 SSE 服务端推送。

普通的 Controller 返回值(比如 ResponseEntityStringMap)是同步响应——方法执行完,响应就发了,HTTP 连接就关了。一问一答,干脆利落。

SseEmitter异步响应——Controller 方法返回一个 SseEmitter 对象后,HTTP 连接不会关闭。你可以在其他线程里通过这个 SseEmitter 对象持续向客户端推送事件,直到你调用 emitter.complete() 主动关闭连接。

打个比方:普通 Controller 像自动售货机——投币、出货、走人。SseEmitter 像外卖平台——你下了单(发了请求),然后骑手持续给你推送状态更新(已接单、正在配送、即将到达),直到送达(complete())。

2. SseEmitter 的核心 API

SseEmitter 的 API 不多,核心就这几个:

API作用说明
new SseEmitter(timeout)创建实例,设置超时超时单位毫秒,超时后自动关闭连接
send(Object data)推送默认类型事件事件类型为 message,data 会被序列化为字符串
send(SseEventBuilder event)推送自定义事件可指定 event 名称、id、data、comment
complete()正常结束关闭 SSE 连接
completeWithError(Throwable ex)异常结束关闭连接并触发 onError 回调
onCompletion(Runnable)连接关闭回调不管正常还是异常关闭都会触发
onTimeout(Runnable)超时回调超时时触发
onError(Consumer<Throwable>)错误回调出错时触发

其中 send(SseEventBuilder) 是最常用的,因为它可以指定事件类型:

// 推送一个 token 事件
emitter.send(SseEmitter.event()
        .name("token")                    // event: token
        .data("{\"content\":\"你好\"}"));  // data: {"content":"你好"}

// 推送一个 done 事件
emitter.send(SseEmitter.event()
        .name("done")
        .data("{\"usage\":{\"total_tokens\":128}}"));

// 推送一个注释(心跳保活)
emitter.send(SseEmitter.event()
        .comment("keepalive"));

还记得上一篇讲的 SSE event: 字段吗?大模型 API 通常不用 event: 字段,所有事件都是默认的 message 类型。但你自己搭建 SSE 服务端时,event: 字段就很有用了——前端可以根据不同的事件类型做不同的处理。

3. SseEmitter 的线程模型

SseEmitter 的异步本质是理解它的关键。整个过程涉及两个线程:

Controller 线程(Tomcat 线程池里的线程):

  1. 1.接收 HTTP 请求
  2. 2.创建 SseEmitter 对象
  3. 3.启动异步任务(把 SseEmitter 对象传给异步线程)
  4. 4.返回 SseEmitter 对象
  5. 5.Controller 线程结束,释放回 Tomcat 线程池

推送线程(你自己创建的异步线程):

  1. 1.拿着 SseEmitter 对象
  2. 2.调用大模型流式 API
  3. 3.在回调里调用 emitter.send() 逐 Token 推送
  4. 4.调用 emitter.complete() 结束

这个设计的好处是 Controller 线程不会被长时间占用。如果不用 SseEmitter,而是在 Controller 里直接写一个循环发送数据,那这个请求会占住一个 Tomcat 线程直到流结束——30 秒的流式生成就占住一个线程 30 秒。Tomcat 默认 200 个线程,200 个并发用户就把线程池打满了。

SseEmitter,Controller 线程几毫秒就释放了。推送操作在独立的线程中进行,不占 Tomcat 线程池。

如果你在 Controller 线程里直接调 send() 发完所有数据再 complete(),那和普通接口没什么区别——线程还是被占住了。SseEmitter 的价值在于跨线程推送:Controller 线程返回,推送线程接管。

Java 实战:三个层次的 SSE 服务端

1. 最简版:Hello SSE

下述为示例代码,大家可自行创建项目进行运行验证,也可仅关注其中的核心思想。关于基础的 Spring Boot 框架代码逻辑,本文在概念章节中不再做过多展开。

先用最少的代码跑通一个 SSE 接口,理解 SseEmitter 的基本套路。

@RestController
@RequestMapping("/api/sse")
public class SimpleSseController {

    @GetMapping("/hello")
    public SseEmitter hello() {
        // 创建 SseEmitter,超时 60 秒
        SseEmitter emitter = new SseEmitter(60_000L);

        // 启动一个新线程做数据推送(不能在 Controller 线程里做)推送逻辑是异步长任务:适合交给独立线程/线程池处理
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    // 推送事件:event 名称为 "message"(默认),data 是字符串
                    emitter.send(SseEmitter.event()
                            .name("token")
                            .data("第 " + i + " 条消息"));
                    Thread.sleep(1000);  // 每秒推送一条
                }
                // 推送完毕,关闭连接
                emitter.send(SseEmitter.event()
                        .name("done")
                        .data("{\"msg\":\"推送完毕\"}"));
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        }).start();

        // Controller 线程立即返回,不等推送完成
        return emitter;
    }
}

启动 Spring Boot 之后,用浏览器直接访问 http://localhost:8080/api/sse/hello,你会看到数据一条一条地出现——每秒出一条,5 秒后结束。

也可以用下面这段 HTML 在浏览器里测试(后面完整版也用这种方式验证):

<!DOCTYPE html><html><body><h3>SSE 测试</h3><div id="output"></div><script>
    const output = document.getElementById('output');
    const eventSource = new EventSource('/api/sse/hello');

    eventSource.addEventListener('token', function(e) {
        output.innerHTML += '<p>收到 token: ' + e.data + '</p>';
    });

    eventSource.addEventListener('done', function(e) {
        output.innerHTML += '<p style="color:green">完成: ' + e.data + '</p>';
        eventSource.close();  // 收到 done 事件后关闭连接
    });

    eventSource.onerror = function() {
        output.innerHTML += '<p style="color:red">连接断开</p>';
        eventSource.close();
    };
</script></body></html>

这个最简版能跑通,但离生产可用还有距离——没有接入大模型、没有心跳、没有断连检测。接下来一步步加。

2. 完整版:接入大模型流式转发

完整版要实现的链路是:前端发请求 → Spring Boot 接收 → 调用大模型流式 API → 逐 Token 转发给前端。

事件类型设计:

事件名称用途data 格式
token内容增量{"content": "一段文字"}
done流结束{"usage": {"prompt_tokens":15, "completion_tokens":12}}
error错误通知{"code": 500, "message": "错误描述"}

前端根据事件类型做不同处理——收到 token 就拼接内容,收到 done 就结束,收到 error 就展示错误信息。

2.1 请求和响应结构

/** 前端请求体 */publicclassChatRequest{privateString question;// getter/setter 省略}

2.2 Controller 层

@RestController@RequestMapping("/api/chat")publicclassChatController{privatefinalChatService chatService;publicChatController(ChatService chatService){this.chatService = chatService;}@GetMapping("/stream")publicSseEmitterstream(@RequestParamString question){// 创建 SseEmitter,超时 3 分钟SseEmitter emitter =newSseEmitter(180_000L);// 设置回调(连接关闭时的清理逻辑)
        emitter.onCompletion(()->System.out.println("SSE 连接关闭"));
        emitter.onTimeout(()->System.out.println("SSE 连接超时"));
        emitter.onError(e ->System.err.println("SSE 连接异常: "+ e.getMessage()));// 异步线程中调用大模型并转发
        chatService.streamChat(question, emitter);return emitter;}}

这里用 @GetMapping + @RequestParam 是为了兼容浏览器原生的 EventSource API(它只支持 GET 请求)。如果你的前端用 fetch 来消费 SSE(后面会讲),可以改成 @PostMapping + @RequestBody,支持更复杂的请求体。

2.3 Service 层:流式转发核心逻辑

@ServicepublicclassChatService{privatestaticfinalStringAPI_URL="https://api.siliconflow.cn/v1/chat/completions";privatestaticfinalStringAPI_KEY="sk-xxx";// 替换为你的 API KeyprivatestaticfinalStringMODEL="Qwen/Qwen3-32B";privatefinalOkHttpClient httpClient =newOkHttpClient.Builder().connectTimeout(15,TimeUnit.SECONDS).readTimeout(60,TimeUnit.SECONDS).writeTimeout(15,TimeUnit.SECONDS).build();privatefinalGson gson =newGson();/**
     * 异步调用大模型流式 API,逐 Token 通过 SseEmitter 转发给前端
     */publicvoidstreamChat(String question,SseEmitter emitter){// 在新线程中执行(生产环境应使用线程池,后面会讲)newThread(()->doStreamChat(question, emitter)).start();}privatevoiddoStreamChat(String question,SseEmitter emitter){// 1. 构建大模型请求体JsonObject requestBody =newJsonObject();
        requestBody.addProperty("model",MODEL);
        requestBody.addProperty("temperature",0.7);
        requestBody.addProperty("max_tokens",2048);
        requestBody.addProperty("stream",true);JsonArray messages =newJsonArray();JsonObject systemMsg =newJsonObject();
        systemMsg.addProperty("role","system");
        systemMsg.addProperty("content","你是一个技术专家,回答简洁清晰。");
        messages.add(systemMsg);JsonObject userMsg =newJsonObject();
        userMsg.addProperty("role","user");
        userMsg.addProperty("content", question);
        messages.add(userMsg);
        requestBody.add("messages", messages);// 2. 发起 HTTP 请求Request request =newRequest.Builder().url(API_URL).addHeader("Authorization","Bearer "+API_KEY).addHeader("Content-Type","application/json").addHeader("Accept","text/event-stream").post(RequestBody.create(requestBody.toString(),MediaType.parse("application/json"))).build();// 3. 解析 SSE 流并转发try(Response response = httpClient.newCall(request).execute()){if(!response.isSuccessful()){sendError(emitter, response.code(),"大模型 API 调用失败: HTTP "+ response.code());return;}BufferedReader reader =newBufferedReader(newInputStreamReader(response.body().byteStream(),StandardCharsets.UTF_8));String line;while((line = reader.readLine())!=null){if(line.isEmpty()|| line.startsWith(":")){continue;}if(!line.startsWith("data:")){continue;}String data = line.substring(5);if(data.startsWith(" ")){
                    data = data.substring(1);}if("[DONE]".equals(data)){// 流结束,发送 done 事件
                    emitter.send(SseEmitter.event().name("done").data("{\"msg\":\"completed\"}"));
                    emitter.complete();return;}// 解析 JSONJsonObject chunk;try{
                    chunk =JsonParser.parseString(data).getAsJsonObject();}catch(Exception e){continue;// JSON 解析失败,跳过}// 提取 contentJsonArray choices = chunk.getAsJsonArray("choices");if(choices ==null|| choices.isEmpty()){// 可能是只有 usage 的 chunkif(chunk.has("usage")&&!chunk.get("usage").isJsonNull()){JsonObject usage = chunk.getAsJsonObject("usage");
                        emitter.send(SseEmitter.event().name("done").data(gson.toJson(usage)));
                        emitter.complete();return;}continue;}JsonObject choice = choices.get(0).getAsJsonObject();JsonObject delta = choice.getAsJsonObject("delta");if(delta !=null&& delta.has("content")){JsonElement contentElement = delta.get("content");if(!contentElement.isJsonNull()){String token = contentElement.getAsString();if(!token.isEmpty()){// 核心:通过 SseEmitter 把 token 推送给前端
                            emitter.send(SseEmitter.event().name("token").data("{\"content\":\""+escapeJson(token)+"\"}"));}}}// 检查 finish_reason,提取 usageJsonElement finishElement = choice.get("finish_reason");if(finishElement !=null&&!finishElement.isJsonNull()){if(chunk.has("usage")&&!chunk.get("usage").isJsonNull()){JsonObject usage = chunk.getAsJsonObject("usage");
                        emitter.send(SseEmitter.event().name("done").data(gson.toJson(usage)));}}}// readLine 返回 null 但没收到 [DONE],连接异常sendError(emitter,500,"流式响应异常结束");}catch(IOException e){sendError(emitter,500,"连接异常: "+ e.getMessage());}}/** 向前端推送错误事件 */privatevoidsendError(SseEmitter emitter,int code,String message){try{
            emitter.send(SseEmitter.event().name("error").data("{\"code\":"+ code +",\"message\":\""+escapeJson(message)+"\"}"));
            emitter.complete();}catch(IOException e){
            emitter.completeWithError(e);}}/** 简单的 JSON 字符串转义 */privateStringescapeJson(String s){return s.replace("\\","\\\\").replace("\"","\\\"").replace("\n","\\n").replace("\r","\\r").replace("\t","\\t");}}

核心逻辑就是一句话:在大模型 SSE 客户端的回调里,调用 emitter.send() 把数据转发给前端

整个链路是这样的:

  1. 1.前端发请求 → Controller 创建 SseEmitter,启动异步线程,立即返回
  2. 2.异步线程调用大模型流式 API → 逐行读取 data:
  3. 3.解析出 delta.content → 通过 emitter.send() 推送 token 事件给前端
  4. 4.收到 [DONE] → 推送 done 事件 → emitter.complete() 关闭连接
  5. 5.出错 → 推送 error 事件 → emitter.complete() 关闭连接

注意 sendError 方法的设计——出错时不是直接 completeWithError(),而是先通过 SSE 推送一个 error 事件让前端知道出了什么问题,然后再 complete()。如果直接 completeWithError(),前端只知道连接断了,不知道为什么断了。

3. 生产版:连接管理与健壮性

完整版能跑了,但在生产环境还有几个问题需要解决。

3.1 超时设置

SseEmitter 的默认超时是 30 秒。大模型生成一次回答可能需要 30~60 秒(特别是长文本、深度思考场景),30 秒一到 SseEmitter 自动关闭连接,前端直接断了。

建议把 SseEmitter 的超时设为 3~5 分钟

SseEmitter emitter =newSseEmitter(180_000L);// 3 分钟

这里有一个容易混淆的地方:SseEmitter 的超时和 OkHttp 的 readTimeout 是两回事。

超时类型含义建议值
SseEmitter(timeout)整个 SSE 连接的生命周期上限3~5 分钟
OkHttp readTimeout调用大模型 API 时,两个 chunk 之间的最大等待间隔30~60 秒

SseEmitter 超时是这次对话最多持续多久,OkHttp readTimeout 是大模型多久没吐出新 token 就认为它卡住了。两个维度不同,不要搞混。

3.2 心跳保活

模型在思考时,可能几秒甚至十几秒没有输出。如果你的 Spring Boot 前面有 Nginx 反向代理,Nginx 的 proxy_read_timeout 默认 60 秒——超过 60 秒没有数据传输,Nginx 会主动断开连接。

解决方案:定期发送心跳注释事件。上一篇讲过,SSE 的注释行(: 开头)会被客户端忽略,但对中间件来说有数据在传输,就不会超时断开。

@ServicepublicclassChatService{// 心跳定时器privatefinalScheduledExecutorService heartbeatScheduler =Executors.newScheduledThreadPool(2);publicvoidstreamChat(String question,SseEmitter emitter){// 启动心跳:每 15 秒发送一次注释事件ScheduledFuture<?> heartbeat = heartbeatScheduler.scheduleAtFixedRate(()->{try{
                emitter.send(SseEmitter.event().comment("keepalive"));}catch(IOException e){// 发送失败说明连接已断开,不需要处理}},15,15,TimeUnit.SECONDS);newThread(()->{try{doStreamChat(question, emitter);}finally{// 不管成功还是失败,都要停止心跳
                heartbeat.cancel(false);}}).start();}}

心跳间隔建议 15~20 秒。太频繁浪费带宽,太稀疏可能还是会被中间件断开。

3.3 客户端断连检测

用户关闭浏览器标签页、刷新页面、或者网络断开,SSE 连接会从客户端侧断开。这时候后端可能还在调用大模型 API——用户已经走了,你还在花钱调 API,纯浪费。

SseEmitter 提供了三个回调来感知连接状态变化:

SseEmitter emitter =newSseEmitter(180_000L);

emitter.onCompletion(()->{System.out.println("连接关闭(正常或异常)");// 清理资源:停止心跳、从连接池移除});

emitter.onTimeout(()->{System.out.println("连接超时");// 超时也会触发 onCompletion});

emitter.onError(e ->{System.out.println("连接异常: "+ e.getMessage());// 异常也会触发 onCompletion});

onCompletion 是最终回调——不管连接是正常 complete() 结束、超时结束、还是异常结束,onCompletion 都会被触发。资源清理逻辑放在 onCompletion 里就行。

另外,当客户端断开后,emitter.send() 会抛出 IOException。所以推送代码里要做好异常处理:

try{
    emitter.send(SseEmitter.event().name("token").data("{\"content\":\""+escapeJson(token)+"\"}"));}catch(IOException e){// 客户端已断开,停止继续推送// 如果还在调用大模型 API,可以尝试取消请求break;}

3.4 线程池管理

前面的代码用 new Thread() 来启动异步任务,这在生产环境是不可接受的——线程创建销毁的开销大,也没法控制并发数。

在 Ragent AI 项目中,大量使用了线程池,如果大家学习过 oneThread 动态线程池项目,可以写 1-2 个亮点到 Ragent AI,两者结合说了属于是。

应该用专门的线程池:

@Configuration
public class ThreadPoolConfig {

    /**
     * SSE 推送专用线程池
     * 注意:不要用 Tomcat 的线程池,SSE 推送是长耗时任务,会阻塞 Tomcat 线程
     */
    @Bean("sseExecutor")
    public ExecutorService sseExecutor() {
        return new ThreadPoolExecutor(
                10,                          // 核心线程数
                50,                          // 最大线程数
                60, TimeUnit.SECONDS,        // 空闲线程存活时间
                new LinkedBlockingQueue<>(100),  // 等待队列
                new ThreadFactory() {
                    private final AtomicInteger counter = new AtomicInteger(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "sse-push-" + counter.getAndIncrement());
                        t.setDaemon(true);
                        return t;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()  // 队列满时由调用线程执行
        );
    }
}

Service 里改用线程池:

@Service
public class ChatService {

    @Resource(name = "sseExecutor")
    private ExecutorService sseExecutor;

    public void streamChat(String question, SseEmitter emitter) {
        sseExecutor.submit(() -> {
            // 心跳和推送逻辑...
        });
    }
}

线程池的参数需要根据你的并发量和服务器配置来调。核心线程数可以参考你的预期并发 SSE 连接数。CallerRunsPolicy 拒绝策略意味着队列满时由调用线程(Controller 线程)执行——相当于降级为同步处理,不会丢失请求。

前端怎么接:EventSource API

后端开发需要知道前端怎么接 SSE,才能设计好接口。这里简要介绍前端消费 SSE 的两种方式。

1. 原生 EventSource 的基本用法

浏览器原生提供了 EventSource API 来消费 SSE:

// 创建 EventSource,自动发起 GET 请求
const eventSource = new EventSource('/api/chat/stream?question=什么是SSE');

// 监听自定义事件(对应后端 SseEmitter.event().name("token") 设置的名称)
eventSource.addEventListener('token', function(event) {
    const data = JSON.parse(event.data);
    document.getElementById('output').textContent += data.content;
});

eventSource.addEventListener('done', function(event) {
    console.log('完成:', event.data);
    eventSource.close();  // 必须手动关闭,否则 EventSource 会自动重连
});

eventSource.addEventListener('error', function(event) {
    if (event.data) {
        console.error('服务端错误:', event.data);
    }
    eventSource.close();
});

// 也可以监听默认的 message 事件(event 名称为 "message" 或未指定时触发)
// eventSource.onmessage = function(event) { ... };

注意这里和上一篇 SSE 协议的呼应:前端 addEventListener('token', ...) 里的 'token',就是后端 SseEmitter.event().name("token") 里设置的事件名称。所以设计事件类型时,前后端要约定好。

还有一个重要的点:EventSource 在连接断开后会自动重连。如果你在 done 事件里不调用 eventSource.close()EventSource 发现连接断了会尝试重新连接,导致重复请求。收到结束信号后一定要手动关闭。

2. EventSource 的局限和替代方案

EventSource 有两个主要局限:

只支持 GET 请求。你的问题文本只能放在 URL 参数里。如果问题很长(几百字),URL 长度可能超出限制。更重要的是,如果请求体比较复杂(包含对话历史、系统配置等),塞在 URL 参数里既不优雅也不安全。

不能自定义请求头。想在请求头里加 Authorization Token 做鉴权?EventSource 做不到。

替代方案是用 fetch API + ReadableStream,可以发 POST 请求、自定义请求头:

async function streamChat(question) {
    const response = await fetch('/api/chat/stream', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer your-token',  // 可以自定义请求头
            'Accept': 'text/event-stream'
        },
        body: JSON.stringify({ question: question })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        // 按行分割,处理 SSE 格式
        const lines = buffer.split('\n');
        buffer = lines.pop();  // 最后一个可能不完整,留到下一次

        for (const line of lines) {
            if (line.startsWith('event: ')) {
                // 解析事件类型
            } else if (line.startsWith('data: ')) {
                // 解析数据
            }
        }
    }
}

这也是为什么很多 AI 产品的前端不用原生 EventSource,而是用 fetch 手动处理 SSE 流——因为对话请求通常是 POST 且需要带 Token。如果你的后端用 @PostMapping,前端就必须用 fetch 方案。

生产环境的关键配置

1. Nginx 反向代理 SSE

这是生产环境最常踩的坑

默认情况下,Nginx 开启了 proxy_buffering——它会把后端的响应数据缓冲在内存里,攒够一定量(或等一段时间)再一次性转发给客户端。对普通 HTTP 请求来说,缓冲可以提升性能,减少后端压力。

但对 SSE 来说,缓冲是致命的。后端 emitter.send() 了一个 token,Nginx 缓冲住了不转发,前端收不到数据。等 Nginx 缓冲区满了或者超时了,一堆数据才一次性涌过来。前端看到的效果就是:卡了好几秒,然后"哗"一下出来一大段——完全不是逐字输出的打字机效果。

解决方案:在 Nginx 的 SSE 接口配置中关闭缓冲。

location /api/chat/stream {
    proxy_pass http://your-backend;

    # 关键:关闭代理缓冲,数据立即转发
    proxy_buffering off;

    # 关闭代理缓存
    proxy_cache off;

    # 设置足够长的读取超时(SSE 是长连接)
    proxy_read_timeout 300s;

    # 确保使用 HTTP/1.1 长连接
    proxy_http_version 1.1;
    proxy_set_header Connection '';

    # 开启 chunked 传输编码
    chunked_transfer_encoding on;

    # 告诉 Nginx 不要缓冲 SSE 响应
    # 有些 Nginx 版本即使关了 proxy_buffering,也需要这个头
    proxy_set_header X-Accel-Buffering no;
}

每个配置项的作用:

配置项作用不配的后果
proxy_buffering off关闭代理缓冲,后端数据立即转发数据被缓冲,前端收不到实时推送
proxy_cache off关闭代理缓存SSE 响应可能被缓存,返回过期数据
proxy_read_timeout 300s读取超时设为 5 分钟默认 60 秒,长文本生成可能超时断开
proxy_http_version 1.1使用 HTTP/1.1HTTP/1.0 不支持长连接
proxy_set_header Connection ''清除 Connection 头可能导致连接被提前关闭
X-Accel-Buffering no额外的缓冲关闭指令某些场景下 proxy_buffering off 不生效

你也可以在 Spring Boot 侧通过响应头来关闭缓冲:

@GetMapping("/stream")publicSseEmitterstream(@RequestParamString question,HttpServletResponse response){// 告诉 Nginx 不要缓冲这个响应
    response.setHeader("X-Accel-Buffering","no");
    response.setHeader("Cache-Control","no-cache");SseEmitter emitter =newSseEmitter(180_000L);// ...return emitter;}

proxy_buffering offX-Accel-Buffering: no 最好都配上。有些 Nginx 版本或配置场景下,只配一个可能不生效。双重保险总没错。

2. 连接数与线程池

SSE 是长连接,每个活跃的 SSE 连接在后端占用的资源包括:

  • 一个 SseEmitter 对象(内存)
  • 一个 SSE 推送线程(线程池)
  • 一个到大模型 API 的 HTTP 连接(OkHttp 连接池)
  • Tomcat 的一个 NIO 连接

SseEmitter 本身是异步的,Controller 线程立即返回,不占 Tomcat 线程池。但它占用 Tomcat 的 NIO 连接数。Tomcat 的 max-connections 默认 8192,通常够用。真正的瓶颈往往在推送线程池和大模型 API 的并发限制上。

线程池的配置建议:

参数建议值说明
核心线程数10~20覆盖正常并发量
最大线程数50~100覆盖峰值
队列容量100~200超过的请求等待或降级
拒绝策略CallerRunsPolicy队列满时降级为同步,不丢失请求

还有一个容易忽略的点:调用大模型 API 的 OkHttpClient 应该是全局单例,不要每次请求都 new OkHttpClient()。OkHttpClient 内部维护了连接池和线程池,每次 new 都会创建新的连接池,连接无法复用,资源浪费严重。

3. 内存泄漏防范

SseEmitter 对象在连接期间一直被 Spring 持有。如果连接异常断开但 SseEmitter 没有被 complete(),它会一直留在内存里直到超时。

防范措施:

  1. 1.设置合理的超时时间:不要设 0(表示永不超时)——如果客户端断开而你没感知到,这个 SseEmitter 就永远不会被清理。建议 3~5 分钟
  2. 2.onCompletion 回调里清理资源:停止心跳定时任务、记录日志
  3. 3.定期巡检连接池:虽然 onCompletion 回调应该能覆盖所有清理场景,但作为兜底,可以用一个定时任务每分钟检查一下连接池,清理超过超时时间还存在的连接

4. 负载均衡注意事项

如果你的 Spring Boot 服务做了多实例部署 + 轮询负载均衡,SSE 长连接会带来一个问题:

SSE 是一个持续的 HTTP 连接。连接建立后,所有的 send() 数据都必须通过同一个后端实例推送。如果中间负载均衡器把连接切到了另一个实例,数据就丢了。

对于 SSE 场景,有两种处理方式:

  • Sticky Session(会话粘滞):配置负载均衡器,让同一个客户端的请求始终打到同一个后端实例。Nginx 可以用 ip_hashsticky cookie 实现
  • 无状态设计:SSE 连接本身是有状态的(绑定在某个实例上),但业务逻辑做成无状态的——对话历史存在 Redis/MySQL 而不是内存里,任何实例都能处理

实际上,对于大模型流式转发这个场景,SSE 连接只在一次对话的生成过程中存在(几秒到几十秒),生成完就断开了。只要负载均衡器不在一次 SSE 连接中间切换后端实例(Nginx 的默认行为不会这么做),就不需要特殊配置。