Python SDK
This page covers how to install, configure, and use the official web-agent-sdk Python package: open a session, run a task, and stream events.
pip install web-agent-sdkRequires Python 3.10+. The SDK is async-first (asyncio / anyio).
One entry point: Client
User-facing API products live on the same Client:
from web_agent.v1 import Client| Resource | Product | Use case |
|---|---|---|
client.sessions / messages / events | DoAnything (open-ended) | Free-form input; the agent picks the path. |
client.deep_research | DeepResearch (research → report) | Standalone API. |
client.web_search | WebSearch (query → results) | Synchronous by default (wait=true). |
client.track | Track (monitor → snapshot) | Long-lived monitors with webhook delivery. |
The package name is
web-agent-sdk(hyphen) but the import isweb_agent— same convention aspython-dateutil→dateutil.
DoAnything — open-ended tasks
import asyncio
from web_agent.v1 import Client
from web_agent.v1.types import CreateSessionRequest
async def main():
async with Client(
api_key="wa_demo_xxxxxxxxxxxxxxxx",
project_id="proj_demo_0001",
) as client:
session = await client.sessions.create(CreateSessionRequest(
instructions="Search Hacker News for the top 5 stories today, return them as a list.",
))
task = session.tasks[0] # session-create implicitly queues the first task
async for event in client.events.stream(session.id, task.id):
print(event.type, event.data)
if event.type == "task.completed":
break
asyncio.run(main())api_key and project_id default to $WEBAGENT_API_KEY / $WEBAGENT_PROJECT_ID if you omit them.
Follow-up task vs. inflight message
# 1. Push a message into the *current* task's chat queue
# (agent peeks the queue at the next ReAct boundary)
await client.messages.send(
session.id, task.id,
content="Also include the comment count for each.",
)
# 2. Start a NEW task in the SAME session
# (reuses browser, profile, workspace; previous task must be terminal)
from web_agent.v1.types import CreateTaskRequest
new_task = await client.sessions.create_task(
session.id,
CreateTaskRequest(instructions="Click into the first post and summarise it."),
)Answer an input request
await client.messages.intervene(
session.id, task.id,
input_request_id="ir_01HXX",
response={"solved": True},
)Cancel / stop / list
await client.sessions.cancel_task(session.id, task.id, reason="user_cancelled")
await client.sessions.stop(session.id, force=False) # soft stop session
listing = await client.sessions.list(status="running", limit=20)
for s in listing.items:
print(s.id, s.status)Heartbeats and resume
stream() filters heartbeats by default; pass include_heartbeats=True for connection-health UIs. Resume an interrupted stream with Last-Event-ID:
client.events.stream(session.id, task.id, last_event_id="142")DeepResearch — research → report
DR is a Standalone API (pidless: /v1/deep_research); the project tenant resolves from the Bearer token.
async with Client(api_key="wa_...", project_id="proj_demo") as client:
task = await client.deep_research.run(
topic="Open-source vector DB landscape 2026",
depth="deep", # light / standard / deep
require_outline_approval=True, # outline HITL gate (default on)
)
print(task["task_id"], task["status"])Subscribe to events (DR uses the DoAnything SSE channel) and respond to the outline gate:
async for event in client.events.stream(
task["session_id"], task["task_id"],
):
if event.type == "task.input_request":
# outline ready, awaiting human approval
await client.deep_research.intervene(
task["task_id"],
request_id=event.data["request_id"],
response="approve", # or {"action": "approve_with_edits", "edits": [...]}
)
if event.type == "task.completed":
break
# Pull the three-piece artifact set (final.md / citations.json / confidence.json)
artifacts = await client.deep_research.list_artifacts(task["task_id"])
final = await client.deep_research.get_artifact(
task["task_id"], artifacts[0]["id"],
)WebSearch — query → results
WS is a project-scoped API. run() defaults to wait=true: the server blocks for ≤30s and returns the done envelope; on timeout it returns 202 — call get(task_id) to poll.
# Synchronous (default)
result = await client.web_search.run(
queries=["best Python ORM 2026"],
engines=["tavily"],
summarize=True,
)
for hit in result["results"]["results"]:
print(hit["title"], hit["url"])
# Async
pending = await client.web_search.run_async(queries=["best Python ORM 2026"])
detail = await client.web_search.get(pending["task_id"])
# Refine (re-run within the same task)
await client.web_search.refine(
pending["task_id"],
text="add site:reddit.com and re-run",
)Track — long-lived monitors
Track is a project-scoped API. A monitor is a long-lived background job: a cron / interval / event schedule + an extraction goal + a notify channel (webhook). Each tick produces a snapshot row; whenever the trigger DSL judges the diff worth notifying, the configured channel fires.
mon = await client.track.create(
intent="Notify me when the iPhone 17 Pro listing on apple.com goes below $999",
schedule={"kind": "interval", "every_seconds": 3600},
notify_channel={"kind": "callback_url", "url": "https://hooks.example.com/track"},
)
# Lifecycle controls — pause / resume / refine via patch:
await client.track.pause(mon["id"], reason="manual review")
await client.track.resume(mon["id"])
await client.track.refine(mon["id"], trigger_dsl={"op": "lt", "field": "price", "value": 999})
# Manually fire a tick (bypasses schedule); inspect the per-tick payload:
outcome = await client.track.run_now(mon["id"])
# Pull the snapshot history (newest first):
snapshots = await client.track.list_snapshots(mon["id"])
snap = await client.track.get_snapshot(mon["id"], snapshots["items"][0]["id"])
# Inspect webhook outbox + retry a dead row:
deliveries = await client.track.list_deliveries(mon["id"], include_payload=True)
await client.track.retry_delivery(mon["id"], deliveries["items"][0]["id"])
# Cancel terminates the monitor (terminal state):
await client.track.cancel(mon["id"]) # equivalent: await client.track.delete(mon["id"])Alignment HITL (optional)
If the supervisor needs you to disambiguate intent (e.g. "did you mean SKU A or SKU B?"), the monitor moves to pending_clarification and emits an alignment.input_request event. Answer with intervene():
await client.track.intervene(
mon["id"],
request_id="req_align_1",
response="SKU A",
)You can also push free-text guidance into the alignment queue at any time via client.track.message(mon_id, content="…").
Errors
The SDK raises typed exceptions you can catch by class:
from web_agent.v1 import (
UnauthorizedError, InsufficientCreditsError, RateLimitedError,
)
try:
await client.sessions.create(CreateSessionRequest(instructions="…"))
except InsufficientCreditsError as e:
print("top up:", e.detail, e.extra)Every exception subclasses ApiError and carries code / detail / extra matching the API error envelope.
| Exception class | HTTP | code |
|---|---|---|
UnauthorizedError | 401 | unauthorized |
ForbiddenError | 403 | forbidden, safety_boundary_violated |
NotFoundError | 404 | *_not_found |
ConflictError | 409 | conflict |
ValidationError | 422 | validation_error |
RateLimitedError | 429 | rate_limit_exceeded |
InsufficientCreditsError | 402 | insufficient_credits |
BudgetExceededError | 402 | budget_exceeded |
Type stubs
DoAnything resources (Session, Task, Event, etc.) are dataclasses re-exported from web_agent.v1:
from web_agent.v1 import Session, Task, Event, TaskStatusDR / DS / WS responses are returned as dict[str, Any] (the OpenAPI envelope verbatim) — index by key (task["task_id"] / task["status"]). mypy --strict is supported.
Next steps
- TypeScript SDK — same surface in JS/TS.
- Errors & retries — recommended retry policy, idempotency keys.
- Sessions & Tasks — lifecycle, profiles, workspaces.