SSE 进度订阅
本页说明如何在调用 POST /api/sessions/{session_id}/messages 写入一批 Message 后,使用 SSE(Server-Sent Events)订阅这批 Message 的 Memory 处理进度。
SSE 适合观察异步处理状态,例如页面上展示“已写入 Message、已生成 Facts、已生成 Summaries、已归入 Topics”。它不会返回原始 Message 内容、Facts 内容或 Summary 内容。
前置条件
- 已创建 GUMem Project,并获得服务端可用的 API Key。
- 已创建 Session,并保存
session_id。 - 你的服务端能够保存 Message 写入响应中的
request_id。 - 如果需要等待
facts、summaries或topics阶段,后台 worker、Elasticsearch、LLM 和 embedding 配置应处于可用状态。
前端浏览器的原生 EventSource 不能直接设置 Authorization header。生产环境中建议由你的服务端持有 API Key 并转发 SSE,或使用支持自定义 header 的 fetch stream / SSE 客户端。
调用流程
- 写入一批 Message。
- 从写入响应中读取
data.request_id。 - 使用同一个
session_id和request_id订阅 SSE。 - 根据
progress、completed、timeout或error事件更新业务状态。
写入 Message
调用 Message 写入接口:
export GUMEM_BASE_URL="http://localhost:8000"
export GUMEM_API_KEY="<api_key>"
export GUMEM_SESSION_ID="session_xxx"
curl -sS -X POST "$GUMEM_BASE_URL/api/sessions/$GUMEM_SESSION_ID/messages" \
-H "Authorization: Api-Key $GUMEM_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{
"role": "user",
"content": "For team scheduling, use Berlin when I mention Europe."
},
{
"role": "assistant",
"content": "Got it. I will use Berlin for Europe scheduling."
}
]
}'响应中的 data.request_id 是这批 Message 的进度订阅 ID:
{
"code": 0,
"message": "success",
"data": {
"request_id": "req_abc123def4567890"
}
}同一次写入请求中的所有 Message 共享同一个 request_id。如果写入空 Message 列表,request_id 可能为 null,此时不应发起 SSE 订阅。
订阅处理进度
使用 GET /api/sessions/{session_id}/messages/sse 订阅处理进度:
export GUMEM_REQUEST_ID="req_abc123def4567890"
curl -N "$GUMEM_BASE_URL/api/sessions/$GUMEM_SESSION_ID/messages/sse?request_id=$GUMEM_REQUEST_ID&wait_until=topics&timeout_seconds=120" \
-H "Authorization: Api-Key $GUMEM_API_KEY" \
-H "Accept: text/event-stream"查询参数:
| 参数 | 必填 | 默认值 | 说明 |
|---|---|---|---|
request_id | 是 | - | Message 写入响应中的 data.request_id。 |
wait_until | 否 | topics | 等待到哪个阶段后发送 completed 并关闭连接。可选值:messages、facts、summaries、topics。 |
poll_interval_ms | 否 | 1000 | 服务端查询进度的间隔,范围为 250 到 5000 毫秒。 |
timeout_seconds | 否 | 120 | 最大等待时间,范围为 1 到 600 秒。 |
include_logs | 否 | false | 是否附带同一 request_id 的服务端 SSE 日志事件。通常只在调试时开启。 |
鉴权方式与其他 GUMem API 一致:
Authorization: Api-Key <api_key>
Accept: text/event-stream处理阶段
wait_until 使用对外阶段名:
messages -> facts -> summaries -> topics| 阶段 | SSE stage | 说明 |
|---|---|---|
messages | messages_ready | 这批 Message 已写入并可被进度查询命中。 |
facts | facts_ready | 已从这批 Message 生成 Facts。 |
summaries | summaries_ready | 已从 Facts 生成 Summaries。 |
topics | topics_ready | 已从 Summaries 提取到 Topic。 |
如果暂时查不到 request_id,SSE 会返回 not_started 状态,并继续等待直到超时或后续处理开始。
事件类型
| 事件 | 触发条件 | 连接行为 |
|---|---|---|
progress | 阶段或计数发生变化。 | 保持连接。 |
completed | 当前阶段达到 wait_until。 | 发送后关闭连接。 |
timeout | 达到 timeout_seconds 仍未完成。 | 发送最后状态后关闭连接。 |
error | 流开始后发生未处理异常。 | 发送错误 payload 后关闭连接。 |
log | include_logs=true 且存在同一 request_id 的日志。 | 保持连接。 |
: keep-alive | 状态未变化。 | 保持连接。 |
返回内容说明
返回内容不是一次性的 JSON 响应,而是持续输出的 text/event-stream。每个事件由 event 和 data 两部分组成:
| 返回片段 | 说明 |
|---|---|
event | SSE 事件类型,例如 progress、completed、timeout、error 或 log。 |
data | JSON 字符串,字段说明见下方 Payload 字段。 |
: keep-alive | SSE 注释行,表示连接仍然存活但状态没有变化;客户端通常可以忽略。 |
{
"event": "progress",
"data": {
"request_id": "req_abc123def4567890",
"session_id": "session_xxx",
"stage": "messages_ready",
"wait_until": "topics",
"message_count": 2,
"processed_message_count": 0,
"facts_count": 0,
"summaries_count": 0,
"topics_count": 0,
"completed": false,
"timestamp": "2026-05-07T12:00:00.000000+00:00"
}
}SSE payload 只返回阶段状态和数量统计。
| 字段 | 说明 |
|---|---|
request_id | 本批 Message 写入 ID。 |
session_id | URL 中的 Session ID。 |
stage | 当前处理阶段。可能为 not_started、messages_ready、facts_ready、summaries_ready 或 topics_ready。 |
wait_until | 本次订阅等待的目标阶段。 |
message_count | 该批次 Message 数量。 |
processed_message_count | 该批次中状态为 processed 的 Message 数量。 |
facts_count | 命中该批 Message 的 Facts 数量。 |
summaries_count | 命中该批 Facts 的 Summaries 数量。 |
topics_count | Summaries 中去重后的 Topic 数量。 |
completed | 当前阶段是否已经达到 wait_until。 |
timestamp | 服务端 UTC 时间。 |
服务端接入示例
如果你的前端需要展示进度,推荐由业务服务端转发 SSE,避免把 GUMem API Key 暴露给浏览器。
async function streamMessageProgress(params: {
baseUrl: string;
apiKey: string;
sessionId: string;
requestId: string;
}) {
const url = new URL(
`/api/sessions/${params.sessionId}/messages/sse`,
params.baseUrl
);
url.searchParams.set("request_id", params.requestId);
url.searchParams.set("wait_until", "topics");
url.searchParams.set("timeout_seconds", "120");
const response = await fetch(url, {
headers: {
Authorization: `Api-Key ${params.apiKey}`,
Accept: "text/event-stream",
},
});
if (!response.ok || !response.body) {
throw new Error(`SSE request failed: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
process.stdout.write(chunk);
}
}失败处理
401:API Key 缺失或无效。检查Authorization: Api-Key <api_key>。422:缺少request_id,或查询参数超出范围。timeout事件:指定时间内没有达到wait_until。可以降低wait_until验证较早阶段,或延长timeout_seconds。- 长时间停留在
messages_ready:通常表示 Message 已写入,但后续 Facts / Summaries / Topics 处理尚未完成。检查后台 worker、Elasticsearch、LLM 和 embedding 配置。 - 网络断开:重新使用同一个
request_id发起订阅。SSE 是进度观察通道,不应作为唯一的业务完成依据。
检查结果
本地或无完整 LLM / embedding 配置的环境,建议先使用 wait_until=messages 验证 SSE 连接和鉴权是否正常。生产环境中再根据业务需要等待 facts、summaries 或 topics。