Skip to content

Background Tasks

Background task system for async operations in hooks.

Task Definition

task

task(
    func: Callable[..., Any] | None = None,
    *,
    priority: int = 0,
    ttl: int = 300,
    transform: Callable[[Any], Any] | None = None,
) -> Task | Callable[[Callable[..., Any]], Task]

Decorator to define a background task.

Usage

@task def simple_task(x: int) -> int: return x * 2

@task(priority=2, ttl=600) def slow_task(query: str) -> str: return search_db(query)

@task(transform=lambda r: f"Result: {r}") def formatted_task(x: int) -> str: return x * 2

Task dataclass

A background task definition.

Usage

@task def my_task(x: int) -> int: return x * 2

Or with options

@task(priority=2, ttl=600) def slow_task(query: str) -> str: return search_db(query)

func instance-attribute

func: Callable[..., Any]

priority class-attribute instance-attribute

priority: int = 0

ttl class-attribute instance-attribute

ttl: int = 300

transform class-attribute instance-attribute

transform: Callable[[Any], Any] | None = None

name property

name: str

Get task function name.

__call__

__call__(*args: Any, **kwargs: Any) -> Any

Call the underlying function directly (for testing).

TaskResult dataclass

Result of a background task execution.

id instance-attribute

id: str

session_id instance-attribute

session_id: str

key instance-attribute

key: str

status class-attribute instance-attribute

status: TaskStatus = PENDING

value class-attribute instance-attribute

value: Any = None

error class-attribute instance-attribute

error: Exception | None = None

created_at class-attribute instance-attribute

created_at: float = field(default_factory=time)

started_at class-attribute instance-attribute

started_at: float | None = None

finished_at class-attribute instance-attribute

finished_at: float | None = None

ttl class-attribute instance-attribute

ttl: int = 300

is_finished property

is_finished: bool

Check if task has completed (success or failure).

is_expired property

is_expired: bool

Check if task result has expired based on TTL.

TTL is measured from finished_at (completion time) if available, otherwise from created_at. This ensures results remain available for the full TTL after completion, even for long-running tasks.

TaskStatus

Bases: str, Enum

Status of a background task.

PENDING class-attribute instance-attribute

PENDING = 'pending'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

FAILED class-attribute instance-attribute

FAILED = 'failed'

CANCELLED class-attribute instance-attribute

CANCELLED = 'cancelled'

Dependencies

Tasks

Unified background tasks dependency (enqueue + results).

Injected into handlers to both enqueue background tasks and access results from previous hook calls.

All operations are non-blocking: - add() submits to thread pool and returns immediately - pop()/get()/has() are dict lookups, return instantly - wait*() methods are async and yield while polling

Keys are optional. If omitted, the task key defaults to the function name. For multiple concurrent calls to the same function, provide an explicit key to avoid collisions (later enqueues overwrite earlier results for the same key).

Usage

@app.on_prompt() def handler(event, tasks: Tasks): if result := tasks.pop(memory_lookup): return allow(message=f"Found: {result}")

tasks.add(memory_lookup, event.prompt)
return allow()

__init__

__init__(backend: BaseBackend, session_id: str)

add

add(
    func: Callable[..., Any] | Task,
    *args: Any,
    key: str | None = None,
    ttl: int = 300,
    **kwargs: Any,
) -> TaskResult

Add a task to be executed in the background.

Parameters:

Name Type Description Default
func Callable[..., Any] | Task

The function or Task to execute

required
*args Any

Positional arguments for the function

()
key str | None

Optional unique key (defaults to function name)

None
ttl int

Time-to-live in seconds for the result (default 300)

300
**kwargs Any

Keyword arguments for the function

{}

Returns:

Type Description
TaskResult

TaskResult with status 'pending'

cancel

cancel(key: str | Callable[..., Any] | Task) -> bool

Cancel a pending/running task by key.

cancel_all

cancel_all() -> int

Cancel all tasks for this session. Returns count cancelled.

get

get(
    key: str | Callable[..., Any] | Task,
) -> TaskResult | None

Get a task result without removing it.

pop

pop(key: str | Callable[..., Any] | Task) -> Any | None

Pop a completed result value, removing it from storage.

Returns None if task not found or not yet completed.

pop_all

pop_all() -> list[Any]

Pop all completed results for this session.

pop_errors

pop_errors() -> list[tuple[str, Exception]]

Pop all failed results, returning (key, error) pairs.

has

has(
    key: str | Callable[..., Any] | Task | None = None,
) -> bool

Check if there are completed results.

Parameters:

Name Type Description Default
key str | Callable[..., Any] | Task | None

Specific task key to check, or None to check any

None

wait async

wait(
    key: str | Callable[..., Any] | Task,
    timeout: float = 30.0,
) -> Any | None

Wait for a specific task to complete.

wait_all async

wait_all(
    keys: list[str | Callable[..., Any] | Task],
    timeout: float = 30.0,
) -> dict[str, Any]

Wait for multiple tasks to complete. Returns dict of key -> value.

wait_any async

wait_any(
    keys: list[str | Callable[..., Any] | Task],
    timeout: float = 30.0,
) -> tuple[str, Any] | None

Wait for any task to complete. Returns (key, value) or None.

BackgroundTasks

FastAPI-style task enqueueing dependency.

Injected into handlers to allow adding background tasks.

Usage

@app.pre_tool("Write") def on_write(event, tasks: BackgroundTasks): tasks.add(review_code, event.content, key="review") return allow()

__init__

__init__(backend: BaseBackend, session_id: str)

add

add(
    func: Callable[..., Any] | Task,
    *args: Any,
    key: str,
    ttl: int = 300,
    **kwargs: Any,
) -> TaskResult

Add a task to be executed in the background.

Parameters:

Name Type Description Default
func Callable[..., Any] | Task

The function or Task to execute

required
*args Any

Positional arguments for the function

()
key str

Unique key for this task (used to retrieve results)

required
ttl int

Time-to-live in seconds for the result (default 300)

300
**kwargs Any

Keyword arguments for the function

{}

Returns:

Type Description
TaskResult

TaskResult with status 'pending'

cancel

cancel(key: str) -> bool

Cancel a pending/running task by key.

cancel_all

cancel_all() -> int

Cancel all tasks for this session. Returns count cancelled.

PendingResults

Access to completed background task results.

Injected into handlers to retrieve results from previous tasks.

Usage

@app.on_prompt() def check_memory(event, pending: PendingResults): if result := pending.pop("memory"): return allow(message=f"Found: {result}") return allow()

__init__

__init__(backend: BaseBackend, session_id: str)

get

get(key: str) -> TaskResult | None

Get a task result without removing it.

pop

pop(key: str) -> Any | None

Pop a completed result value, removing it from storage.

Returns None if task not found or not yet completed.

pop_all

pop_all() -> list[Any]

Pop all completed results for this session.

pop_errors

pop_errors() -> list[tuple[str, Exception]]

Pop all failed results, returning (key, error) pairs.

has

has(key: str | None = None) -> bool

Check if there are completed results.

Parameters:

Name Type Description Default
key str | None

Specific task key to check, or None to check any

None

wait async

wait(key: str, timeout: float = 30.0) -> Any | None

Wait for a specific task to complete.

Parameters:

Name Type Description Default
key str

Task key to wait for

required
timeout float

Maximum seconds to wait

30.0

Returns:

Type Description
Any | None

Task result value, or None if timeout/failed/cancelled

wait_all async

wait_all(
    keys: list[str], timeout: float = 30.0
) -> dict[str, Any]

Wait for multiple tasks to complete.

Parameters:

Name Type Description Default
keys list[str]

List of task keys to wait for

required
timeout float

Maximum seconds to wait

30.0

Returns:

Type Description
dict[str, Any]

Dict of key -> value for completed tasks

wait_any async

wait_any(
    keys: list[str], timeout: float = 30.0
) -> tuple[str, Any] | None

Wait for any task to complete.

Parameters:

Name Type Description Default
keys list[str]

List of task keys to wait for

required
timeout float

Maximum seconds to wait

30.0

Returns:

Type Description
tuple[str, Any] | None

(key, value) tuple for first completed task, or None

Backends

BaseBackend

Bases: ABC

Abstract base class for task backends.

enqueue abstractmethod

enqueue(
    task: Task,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    *,
    session_id: str,
    key: str,
) -> TaskResult

Enqueue a task for execution.

get abstractmethod

get(session_id: str, key: str) -> TaskResult | None

Get a task result by session and key.

pop abstractmethod

pop(session_id: str, key: str) -> Any | None

Pop a completed result value, removing it from storage.

pop_all abstractmethod

pop_all(session_id: str) -> list[Any]

Pop all completed results for a session.

cancel abstractmethod

cancel(session_id: str, key: str) -> bool

Cancel a pending/running task.

cancel_all abstractmethod

cancel_all(session_id: str) -> int

Cancel all tasks for a session. Returns count cancelled.

pop_errors abstractmethod

pop_errors(session_id: str) -> list[tuple[str, Exception]]

Pop all failed results for a session, returning (key, error) pairs.

has abstractmethod

has(session_id: str, key: str | None = None) -> bool

Check if there are any completed results.

wait abstractmethod async

wait(
    session_id: str, key: str, timeout: float = 30.0
) -> Any | None

Wait for a specific task to complete.

wait_all abstractmethod async

wait_all(
    session_id: str, keys: list[str], timeout: float = 30.0
) -> dict[str, Any]

Wait for multiple tasks to complete. Returns dict of key -> value.

wait_any abstractmethod async

wait_any(
    session_id: str, keys: list[str], timeout: float = 30.0
) -> tuple[str, Any] | None

Wait for any task to complete. Returns (key, value) or None.

InMemoryBackend

Bases: BaseBackend

In-memory task backend using ThreadPoolExecutor.

Tasks are executed in a thread pool and results are stored in memory. Results are automatically cleaned up when expired (lazy cleanup on access).

__init__

__init__(max_workers: int = 4)

enqueue

enqueue(
    task: Task,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    *,
    session_id: str,
    key: str,
) -> TaskResult

Enqueue a task for background execution.

get

get(session_id: str, key: str) -> TaskResult | None

Get a task result by session and key.

pop

pop(session_id: str, key: str) -> Any | None

Pop a completed result value, removing it from storage.

pop_all

pop_all(session_id: str) -> list[Any]

Pop all completed results for a session.

shutdown

shutdown(wait: bool = True) -> None

Shutdown the thread pool.

Testing

ImmediateBackend

Bases: BaseBackend

Backend that executes tasks immediately (synchronously).

Useful for testing when you want tasks to complete before assertions. Tracks full lifecycle (status transitions, timestamps).

Example

backend = ImmediateBackend() app = HookApp(task_backend=backend)

Tasks complete immediately

client = TestClient(app) response = client.send(MockEvent.bash(command="ls"))

Results are immediately available

assert backend.get("session", "key").status == TaskStatus.COMPLETED

enqueue

enqueue(
    task: Task,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    *,
    session_id: str,
    key: str,
) -> TaskResult

Execute task immediately (synchronous).