Skip to content
Go to Dashboard

SSE 进度订阅

本页说明如何在调用 POST /api/sessions/{session_id}/messages 写入一批 Message 后,使用 SSE(Server-Sent Events)订阅这批 Message 的 Memory 处理进度。

SSE 适合观察异步处理状态,例如页面上展示“已写入 Message、已生成 Facts、已生成 Summaries、已归入 Topics”。它不会返回原始 Message 内容、Facts 内容或 Summary 内容。

前置条件

  • 已创建 GUMem Project,并获得服务端可用的 API Key。
  • 已创建 Session,并保存 session_id
  • 你的服务端能够保存 Message 写入响应中的 request_id
  • 如果需要等待 factssummariestopics 阶段,后台 worker、Elasticsearch、LLM 和 embedding 配置应处于可用状态。

前端浏览器的原生 EventSource 不能直接设置 Authorization header。生产环境中建议由你的服务端持有 API Key 并转发 SSE,或使用支持自定义 header 的 fetch stream / SSE 客户端。

调用流程

  1. 写入一批 Message。
  2. 从写入响应中读取 data.request_id
  3. 使用同一个 session_idrequest_id 订阅 SSE。
  4. 根据 progresscompletedtimeouterror 事件更新业务状态。

写入 Message

调用 Message 写入接口:

bash
export GUMEM_BASE_URL="http://localhost:8000"
export GUMEM_API_KEY="<api_key>"
export GUMEM_SESSION_ID="session_xxx"

curl -sS -X POST "$GUMEM_BASE_URL/api/sessions/$GUMEM_SESSION_ID/messages" \
  -H "Authorization: Api-Key $GUMEM_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [
      {
        "role": "user",
        "content": "For team scheduling, use Berlin when I mention Europe."
      },
      {
        "role": "assistant",
        "content": "Got it. I will use Berlin for Europe scheduling."
      }
    ]
  }'

响应中的 data.request_id 是这批 Message 的进度订阅 ID:

json
{
  "code": 0,
  "message": "success",
  "data": {
    "request_id": "req_abc123def4567890"
  }
}

同一次写入请求中的所有 Message 共享同一个 request_id。如果写入空 Message 列表,request_id 可能为 null,此时不应发起 SSE 订阅。

订阅处理进度

使用 GET /api/sessions/{session_id}/messages/sse 订阅处理进度:

bash
export GUMEM_REQUEST_ID="req_abc123def4567890"

curl -N "$GUMEM_BASE_URL/api/sessions/$GUMEM_SESSION_ID/messages/sse?request_id=$GUMEM_REQUEST_ID&wait_until=topics&timeout_seconds=120" \
  -H "Authorization: Api-Key $GUMEM_API_KEY" \
  -H "Accept: text/event-stream"

查询参数:

参数必填默认值说明
request_id-Message 写入响应中的 data.request_id
wait_untiltopics等待到哪个阶段后发送 completed 并关闭连接。可选值:messagesfactssummariestopics
poll_interval_ms1000服务端查询进度的间隔,范围为 2505000 毫秒。
timeout_seconds120最大等待时间,范围为 1600 秒。
include_logsfalse是否附带同一 request_id 的服务端 SSE 日志事件。通常只在调试时开启。

鉴权方式与其他 GUMem API 一致:

http
Authorization: Api-Key <api_key>
Accept: text/event-stream

处理阶段

wait_until 使用对外阶段名:

text
messages -> facts -> summaries -> topics
阶段SSE stage说明
messagesmessages_ready这批 Message 已写入并可被进度查询命中。
factsfacts_ready已从这批 Message 生成 Facts。
summariessummaries_ready已从 Facts 生成 Summaries。
topicstopics_ready已从 Summaries 提取到 Topic。

如果暂时查不到 request_id,SSE 会返回 not_started 状态,并继续等待直到超时或后续处理开始。

事件类型

事件触发条件连接行为
progress阶段或计数发生变化。保持连接。
completed当前阶段达到 wait_until发送后关闭连接。
timeout达到 timeout_seconds 仍未完成。发送最后状态后关闭连接。
error流开始后发生未处理异常。发送错误 payload 后关闭连接。
loginclude_logs=true 且存在同一 request_id 的日志。保持连接。
: keep-alive状态未变化。保持连接。

返回内容说明

返回内容不是一次性的 JSON 响应,而是持续输出的 text/event-stream。每个事件由 eventdata 两部分组成:

返回片段说明
eventSSE 事件类型,例如 progresscompletedtimeouterrorlog
dataJSON 字符串,字段说明见下方 Payload 字段
: keep-aliveSSE 注释行,表示连接仍然存活但状态没有变化;客户端通常可以忽略。
json
{
  "event": "progress",
  "data": {
    "request_id": "req_abc123def4567890",
    "session_id": "session_xxx",
    "stage": "messages_ready",
    "wait_until": "topics",
    "message_count": 2,
    "processed_message_count": 0,
    "facts_count": 0,
    "summaries_count": 0,
    "topics_count": 0,
    "completed": false,
    "timestamp": "2026-05-07T12:00:00.000000+00:00"
  }
}

SSE payload 只返回阶段状态和数量统计。

字段说明
request_id本批 Message 写入 ID。
session_idURL 中的 Session ID。
stage当前处理阶段。可能为 not_startedmessages_readyfacts_readysummaries_readytopics_ready
wait_until本次订阅等待的目标阶段。
message_count该批次 Message 数量。
processed_message_count该批次中状态为 processed 的 Message 数量。
facts_count命中该批 Message 的 Facts 数量。
summaries_count命中该批 Facts 的 Summaries 数量。
topics_countSummaries 中去重后的 Topic 数量。
completed当前阶段是否已经达到 wait_until
timestamp服务端 UTC 时间。

服务端接入示例

如果你的前端需要展示进度,推荐由业务服务端转发 SSE,避免把 GUMem API Key 暴露给浏览器。

ts
async function streamMessageProgress(params: {
  baseUrl: string;
  apiKey: string;
  sessionId: string;
  requestId: string;
}) {
  const url = new URL(
    `/api/sessions/${params.sessionId}/messages/sse`,
    params.baseUrl
  );
  url.searchParams.set("request_id", params.requestId);
  url.searchParams.set("wait_until", "topics");
  url.searchParams.set("timeout_seconds", "120");

  const response = await fetch(url, {
    headers: {
      Authorization: `Api-Key ${params.apiKey}`,
      Accept: "text/event-stream",
    },
  });

  if (!response.ok || !response.body) {
    throw new Error(`SSE request failed: ${response.status}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    const chunk = decoder.decode(value, { stream: true });
    process.stdout.write(chunk);
  }
}

失败处理

  • 401:API Key 缺失或无效。检查 Authorization: Api-Key <api_key>
  • 422:缺少 request_id,或查询参数超出范围。
  • timeout 事件:指定时间内没有达到 wait_until。可以降低 wait_until 验证较早阶段,或延长 timeout_seconds
  • 长时间停留在 messages_ready:通常表示 Message 已写入,但后续 Facts / Summaries / Topics 处理尚未完成。检查后台 worker、Elasticsearch、LLM 和 embedding 配置。
  • 网络断开:重新使用同一个 request_id 发起订阅。SSE 是进度观察通道,不应作为唯一的业务完成依据。

检查结果

本地或无完整 LLM / embedding 配置的环境,建议先使用 wait_until=messages 验证 SSE 连接和鉴权是否正常。生产环境中再根据业务需要等待 factssummariestopics

下一步

阅读 新增记忆 了解如何写入 Message,或阅读 查询记忆 了解如何在后续请求中召回 Memory。