Skip to content
Go to Dashboard

Python SDK

This page covers how to install, configure, and use the official web-agent-sdk Python package: open a session, run a task, and stream events.

bash
pip install web-agent-sdk

Requires Python 3.10+. The SDK is async-first (asyncio / anyio).

One entry point: Client

User-facing API products live on the same Client:

python
from web_agent.v1 import Client
ResourceProductUse case
client.sessions / messages / eventsDoAnything (open-ended)Free-form input; the agent picks the path.
client.deep_researchDeepResearch (research → report)Standalone API.
client.web_searchWebSearch (query → results)Synchronous by default (wait=true).
client.trackTrack (monitor → snapshot)Long-lived monitors with webhook delivery.

The package name is web-agent-sdk (hyphen) but the import is web_agent — same convention as python-dateutildateutil.

DoAnything — open-ended tasks

python
import asyncio
from web_agent.v1 import Client
from web_agent.v1.types import CreateSessionRequest

async def main():
    async with Client(
        api_key="wa_demo_xxxxxxxxxxxxxxxx",
        project_id="proj_demo_0001",
    ) as client:
        session = await client.sessions.create(CreateSessionRequest(
            instructions="Search Hacker News for the top 5 stories today, return them as a list.",
        ))
        task = session.tasks[0]   # session-create implicitly queues the first task
        async for event in client.events.stream(session.id, task.id):
            print(event.type, event.data)
            if event.type == "task.completed":
                break

asyncio.run(main())

api_key and project_id default to $WEBAGENT_API_KEY / $WEBAGENT_PROJECT_ID if you omit them.

Follow-up task vs. inflight message

python
# 1. Push a message into the *current* task's chat queue
#    (agent peeks the queue at the next ReAct boundary)
await client.messages.send(
    session.id, task.id,
    content="Also include the comment count for each.",
)

# 2. Start a NEW task in the SAME session
#    (reuses browser, profile, workspace; previous task must be terminal)
from web_agent.v1.types import CreateTaskRequest
new_task = await client.sessions.create_task(
    session.id,
    CreateTaskRequest(instructions="Click into the first post and summarise it."),
)

Answer an input request

python
await client.messages.intervene(
    session.id, task.id,
    input_request_id="ir_01HXX",
    response={"solved": True},
)

Cancel / stop / list

python
await client.sessions.cancel_task(session.id, task.id, reason="user_cancelled")
await client.sessions.stop(session.id, force=False)            # soft stop session
listing = await client.sessions.list(status="running", limit=20)
for s in listing.items:
    print(s.id, s.status)

Heartbeats and resume

stream() filters heartbeats by default; pass include_heartbeats=True for connection-health UIs. Resume an interrupted stream with Last-Event-ID:

python
client.events.stream(session.id, task.id, last_event_id="142")

DeepResearch — research → report

DR is a Standalone API (pidless: /v1/deep_research); the project tenant resolves from the Bearer token.

python
async with Client(api_key="wa_...", project_id="proj_demo") as client:
    task = await client.deep_research.run(
        topic="Open-source vector DB landscape 2026",
        depth="deep",                       # light / standard / deep
        require_outline_approval=True,      # outline HITL gate (default on)
    )
    print(task["task_id"], task["status"])

Subscribe to events (DR uses the DoAnything SSE channel) and respond to the outline gate:

python
async for event in client.events.stream(
    task["session_id"], task["task_id"],
):
    if event.type == "task.input_request":
        # outline ready, awaiting human approval
        await client.deep_research.intervene(
            task["task_id"],
            request_id=event.data["request_id"],
            response="approve",  # or {"action": "approve_with_edits", "edits": [...]}
        )
    if event.type == "task.completed":
        break

# Pull the three-piece artifact set (final.md / citations.json / confidence.json)
artifacts = await client.deep_research.list_artifacts(task["task_id"])
final = await client.deep_research.get_artifact(
    task["task_id"], artifacts[0]["id"],
)

WebSearch — query → results

WS is a project-scoped API. run() defaults to wait=true: the server blocks for ≤30s and returns the done envelope; on timeout it returns 202 — call get(task_id) to poll.

python
# Synchronous (default)
result = await client.web_search.run(
    queries=["best Python ORM 2026"],
    engines=["tavily"],
    summarize=True,
)
for hit in result["results"]["results"]:
    print(hit["title"], hit["url"])

# Async
pending = await client.web_search.run_async(queries=["best Python ORM 2026"])
detail = await client.web_search.get(pending["task_id"])

# Refine (re-run within the same task)
await client.web_search.refine(
    pending["task_id"],
    text="add site:reddit.com and re-run",
)

Track — long-lived monitors

Track is a project-scoped API. A monitor is a long-lived background job: a cron / interval / event schedule + an extraction goal + a notify channel (webhook). Each tick produces a snapshot row; whenever the trigger DSL judges the diff worth notifying, the configured channel fires.

python
mon = await client.track.create(
    intent="Notify me when the iPhone 17 Pro listing on apple.com goes below $999",
    schedule={"kind": "interval", "every_seconds": 3600},
    notify_channel={"kind": "callback_url", "url": "https://hooks.example.com/track"},
)

# Lifecycle controls — pause / resume / refine via patch:
await client.track.pause(mon["id"], reason="manual review")
await client.track.resume(mon["id"])
await client.track.refine(mon["id"], trigger_dsl={"op": "lt", "field": "price", "value": 999})

# Manually fire a tick (bypasses schedule); inspect the per-tick payload:
outcome = await client.track.run_now(mon["id"])

# Pull the snapshot history (newest first):
snapshots = await client.track.list_snapshots(mon["id"])
snap = await client.track.get_snapshot(mon["id"], snapshots["items"][0]["id"])

# Inspect webhook outbox + retry a dead row:
deliveries = await client.track.list_deliveries(mon["id"], include_payload=True)
await client.track.retry_delivery(mon["id"], deliveries["items"][0]["id"])

# Cancel terminates the monitor (terminal state):
await client.track.cancel(mon["id"])  # equivalent: await client.track.delete(mon["id"])

Alignment HITL (optional)

If the supervisor needs you to disambiguate intent (e.g. "did you mean SKU A or SKU B?"), the monitor moves to pending_clarification and emits an alignment.input_request event. Answer with intervene():

python
await client.track.intervene(
    mon["id"],
    request_id="req_align_1",
    response="SKU A",
)

You can also push free-text guidance into the alignment queue at any time via client.track.message(mon_id, content="…").

Errors

The SDK raises typed exceptions you can catch by class:

python
from web_agent.v1 import (
    UnauthorizedError, InsufficientCreditsError, RateLimitedError,
)

try:
    await client.sessions.create(CreateSessionRequest(instructions="…"))
except InsufficientCreditsError as e:
    print("top up:", e.detail, e.extra)

Every exception subclasses ApiError and carries code / detail / extra matching the API error envelope.

Exception classHTTPcode
UnauthorizedError401unauthorized
ForbiddenError403forbidden, safety_boundary_violated
NotFoundError404*_not_found
ConflictError409conflict
ValidationError422validation_error
RateLimitedError429rate_limit_exceeded
InsufficientCreditsError402insufficient_credits
BudgetExceededError402budget_exceeded

Type stubs

DoAnything resources (Session, Task, Event, etc.) are dataclasses re-exported from web_agent.v1:

python
from web_agent.v1 import Session, Task, Event, TaskStatus

DR / DS / WS responses are returned as dict[str, Any] (the OpenAPI envelope verbatim) — index by key (task["task_id"] / task["status"]). mypy --strict is supported.

Next steps