Skip to content
Go to Dashboard

Python SDK

本页说明如何安装、配置并使用官方 web-agent-sdk Python 包:起 session、跑 task、订阅事件。

bash
pip install web-agent-sdk

需要 Python 3.10+。SDK 是 async-first(asyncio / anyio)。

一个入口:Client

用户面 API 都挂在同一个 Client 上:

python
from web_agent.v1 import Client
资源对应产品用法
client.sessions / messages / eventsDoAnything(开放型)自由输入,agent 自己定路径
client.deep_researchDeepResearch(定型 — 研究→报告)Standalone API
client.web_searchWebSearch(定型 — 查询→结果)默认 wait=true 同步糖
client.trackTrack(监控 → snapshot)长寿命 monitor + webhook 通道

包名是 web-agent-sdk(带连字符),但 import 名是 web_agent——同 python-dateutildateutil 的惯例。

DoAnything —— 开放型任务

python
import asyncio
from web_agent.v1 import Client
from web_agent.v1.types import CreateSessionRequest

async def main():
    async with Client(
        api_key="wa_demo_xxxxxxxxxxxxxxxx",
        project_id="proj_demo_0001",
    ) as client:
        session = await client.sessions.create(CreateSessionRequest(
            instructions="找出 Hacker News 现在 Top 5 的故事,列表返回。",
        ))
        task = session.tasks[0]   # session 创建会隐式排好首个 task
        async for event in client.events.stream(session.id, task.id):
            print(event.type, event.data)
            if event.type == "task.completed":
                break

asyncio.run(main())

省略 api_key / project_id 时自动读 $WEBAGENT_API_KEY / $WEBAGENT_PROJECT_ID

「起新 task」 vs 「往运行中 task 塞消息」

python
# 1. 往当前 task 的对话队列里塞一条消息
#    (agent 在下个 ReAct 边界自己决定要不要消费)
await client.messages.send(
    session.id, task.id,
    content="顺便把每条的评论数也带上。",
)

# 2. 在同一个 session 里起一个新 task
#    (复用 browser / profile / workspace;上一个 task 必须已终态)
from web_agent.v1.types import CreateTaskRequest
new_task = await client.sessions.create_task(
    session.id,
    CreateTaskRequest(instructions="点进第一条,总结评论区。"),
)

回应 input request

python
await client.messages.intervene(
    session.id, task.id,
    input_request_id="ir_01HXX",
    response={"solved": True},
)

取消 / 停止 / 列表

python
await client.sessions.cancel_task(session.id, task.id, reason="user_cancelled")
await client.sessions.stop(session.id, force=False)            # 软停 session
listing = await client.sessions.list(status="running", limit=20)
for s in listing.items:
    print(s.id, s.status)

心跳事件

stream() 默认过滤心跳事件;做连接健康 UI 时传 include_heartbeats=True。续传支持 Last-Event-ID

python
client.events.stream(session.id, task.id, last_event_id="142")

DeepResearch —— 研究 → 报告

DR 是 Standalone API(pidless:/v1/deep_research),项目租户从 Bearer token 解析。

python
async with Client(api_key="wa_...", project_id="proj_demo") as client:
    task = await client.deep_research.run(
        topic="2026 年开源向量数据库格局",
        depth="deep",                       # light / standard / deep
        require_outline_approval=True,      # 默认开 outline HITL gate
    )
    print(task["task_id"], task["status"])

订阅事件流(DR 走的是 DoAnything 的 SSE 通道)+ 响应 outline gate:

python
# task["task_id"] / task["session_id"] 都在创建响应里
async for event in client.events.stream(
    task["session_id"], task["task_id"],
):
    if event.type == "task.input_request":
        # outline 已生成,请求人工批准
        await client.deep_research.intervene(
            task["task_id"],
            request_id=event.data["request_id"],
            response="approve",  # 或 {"action": "approve_with_edits", "edits": [...]}
        )
    if event.type == "task.completed":
        break

# 拉取产物三件套(final.md / citations.json / confidence.json)
artifacts = await client.deep_research.list_artifacts(task["task_id"])
final = await client.deep_research.get_artifact(
    task["task_id"], artifacts[0]["id"],
)

WebSearch —— 查询 → 结果

WS 是 project-scoped API。run() 默认 wait=true,服务器阻塞 ≤30s 后返回 done envelope;超时退化 202,调 get(task_id) 轮询。

python
# 同步形态(默认)
result = await client.web_search.run(
    queries=["best Python ORM 2026"],
    engines=["tavily"],
    summarize=True,
)
for hit in result["results"]["results"]:
    print(hit["title"], hit["url"])

# 异步形态
pending = await client.web_search.run_async(queries=["best Python ORM 2026"])
detail = await client.web_search.get(pending["task_id"])

# Refine(同 task 内重搜)
await client.web_search.refine(
    pending["task_id"],
    text="再加 site:reddit.com 限制重搜",
)

Track —— 长寿命监控

Track 是 project-scoped API。一个 monitor 是长寿命后台 job:cron / interval / event 调度 + 抽取目标 + 通知通道(webhook)。每次 tick 产生一行 snapshot;trigger DSL 判定 diff 值得通知时,绑定的通道发送 payload。

python
mon = await client.track.create(
    intent="apple.com 上 iPhone 17 Pro 跌破 $999 时通知我",
    schedule={"kind": "interval", "every_seconds": 3600},
    notify_channel={"kind": "callback_url", "url": "https://hooks.example.com/track"},
)

# 生命周期 —— 通过 patch 暂停 / 恢复 / refine:
await client.track.pause(mon["id"], reason="人工核对")
await client.track.resume(mon["id"])
await client.track.refine(mon["id"], trigger_dsl={"op": "lt", "field": "price", "value": 999})

# 手动触发一次 tick(绕开 schedule);返回 per-tick payload:
outcome = await client.track.run_now(mon["id"])

# 拉 snapshot 历史(最新优先):
snapshots = await client.track.list_snapshots(mon["id"])
snap = await client.track.get_snapshot(mon["id"], snapshots["items"][0]["id"])

# webhook outbox 历史 + 重投死行:
deliveries = await client.track.list_deliveries(mon["id"], include_payload=True)
await client.track.retry_delivery(mon["id"], deliveries["items"][0]["id"])

# 取消 monitor(终态):
await client.track.cancel(mon["id"])  # 等价 await client.track.delete(mon["id"])

Alignment HITL(可选)

Supervisor 需要让你澄清意图(如 "你说的是 SKU A 还是 SKU B?")时,monitor 进入 pending_clarification,发出 alignment.input_request 事件。用 intervene() 回答:

python
await client.track.intervene(
    mon["id"],
    request_id="req_align_1",
    response="SKU A",
)

也可以随时往 alignment 队列推自由文本:client.track.message(mon_id, content="…")

错误

SDK 用 typed exception,按类 catch:

python
from web_agent.v1 import (
    UnauthorizedError, InsufficientCreditsError, RateLimitedError,
)

try:
    await client.sessions.create(CreateSessionRequest(instructions="…"))
except InsufficientCreditsError as e:
    print("top up:", e.detail, e.extra)

每个 exception 都继承 ApiError,带 code / detail / extra,与 API 错误信封 对应。

异常类HTTPcode
UnauthorizedError401unauthorized
ForbiddenError403forbiddensafety_boundary_violated
NotFoundError404*_not_found
ConflictError409conflict
ValidationError422validation_error
RateLimitedError429rate_limit_exceeded
InsufficientCreditsError402insufficient_credits
BudgetExceededError402budget_exceeded

类型

SessionTaskEvent 等 DoAnything 资源都是 dataclass,从 web_agent.v1 重新导出:

python
from web_agent.v1 import Session, Task, Event, TaskStatus

DR / DS / WS 的响应直接是 dict[str, Any](按 OpenAPI 信封原样回传),用键名访问即可(task["task_id"] / task["status"])。mypy --strict 通过。

接下来