Background Tasks¶
Background task system for async operations in hooks.
Task Definition¶
task ¶
task(
func: Callable[..., Any] | None = None,
*,
priority: int = 0,
ttl: int = 300,
transform: Callable[[Any], Any] | None = None,
) -> Task | Callable[[Callable[..., Any]], Task]
Decorator to define a background task.
Usage
@task def simple_task(x: int) -> int: return x * 2
@task(priority=2, ttl=600) def slow_task(query: str) -> str: return search_db(query)
@task(transform=lambda r: f"Result: {r}") def formatted_task(x: int) -> str: return x * 2
Task
dataclass
¶
TaskResult
dataclass
¶
Result of a background task execution.
is_expired
property
¶
Check if task result has expired based on TTL.
TTL is measured from finished_at (completion time) if available, otherwise from created_at. This ensures results remain available for the full TTL after completion, even for long-running tasks.
TaskStatus ¶
Bases: str, Enum
Status of a background task.
Dependencies¶
Tasks ¶
Unified background tasks dependency (enqueue + results).
Injected into handlers to both enqueue background tasks and access results from previous hook calls.
All operations are non-blocking:
- add() submits to thread pool and returns immediately
- pop()/get()/has() are dict lookups, return instantly
- wait*() methods are async and yield while polling
Keys are optional. If omitted, the task key defaults to the function name. For multiple concurrent calls to the same function, provide an explicit key to avoid collisions (later enqueues overwrite earlier results for the same key).
Usage
@app.on_prompt() def handler(event, tasks: Tasks): if result := tasks.pop(memory_lookup): return allow(message=f"Found: {result}")
tasks.add(memory_lookup, event.prompt)
return allow()
add ¶
add(
func: Callable[..., Any] | Task,
*args: Any,
key: str | None = None,
ttl: int = 300,
**kwargs: Any,
) -> TaskResult
Add a task to be executed in the background.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Any] | Task
|
The function or Task to execute |
required |
*args
|
Any
|
Positional arguments for the function |
()
|
key
|
str | None
|
Optional unique key (defaults to function name) |
None
|
ttl
|
int
|
Time-to-live in seconds for the result (default 300) |
300
|
**kwargs
|
Any
|
Keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
TaskResult
|
TaskResult with status 'pending' |
get ¶
Get a task result without removing it.
pop ¶
Pop a completed result value, removing it from storage.
Returns None if task not found or not yet completed.
pop_errors ¶
Pop all failed results, returning (key, error) pairs.
has ¶
Check if there are completed results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str | Callable[..., Any] | Task | None
|
Specific task key to check, or None to check any |
None
|
wait
async
¶
Wait for a specific task to complete.
wait_all
async
¶
Wait for multiple tasks to complete. Returns dict of key -> value.
wait_any
async
¶
wait_any(
keys: list[str | Callable[..., Any] | Task],
timeout: float = 30.0,
) -> tuple[str, Any] | None
Wait for any task to complete. Returns (key, value) or None.
BackgroundTasks ¶
FastAPI-style task enqueueing dependency.
Injected into handlers to allow adding background tasks.
Usage
@app.pre_tool("Write") def on_write(event, tasks: BackgroundTasks): tasks.add(review_code, event.content, key="review") return allow()
add ¶
add(
func: Callable[..., Any] | Task,
*args: Any,
key: str,
ttl: int = 300,
**kwargs: Any,
) -> TaskResult
Add a task to be executed in the background.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Any] | Task
|
The function or Task to execute |
required |
*args
|
Any
|
Positional arguments for the function |
()
|
key
|
str
|
Unique key for this task (used to retrieve results) |
required |
ttl
|
int
|
Time-to-live in seconds for the result (default 300) |
300
|
**kwargs
|
Any
|
Keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
TaskResult
|
TaskResult with status 'pending' |
PendingResults ¶
Access to completed background task results.
Injected into handlers to retrieve results from previous tasks.
Usage
@app.on_prompt() def check_memory(event, pending: PendingResults): if result := pending.pop("memory"): return allow(message=f"Found: {result}") return allow()
pop ¶
Pop a completed result value, removing it from storage.
Returns None if task not found or not yet completed.
pop_errors ¶
Pop all failed results, returning (key, error) pairs.
has ¶
Check if there are completed results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str | None
|
Specific task key to check, or None to check any |
None
|
wait
async
¶
Wait for a specific task to complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Task key to wait for |
required |
timeout
|
float
|
Maximum seconds to wait |
30.0
|
Returns:
| Type | Description |
|---|---|
Any | None
|
Task result value, or None if timeout/failed/cancelled |
wait_all
async
¶
Wait for multiple tasks to complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
List of task keys to wait for |
required |
timeout
|
float
|
Maximum seconds to wait |
30.0
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict of key -> value for completed tasks |
wait_any
async
¶
Wait for any task to complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[str]
|
List of task keys to wait for |
required |
timeout
|
float
|
Maximum seconds to wait |
30.0
|
Returns:
| Type | Description |
|---|---|
tuple[str, Any] | None
|
(key, value) tuple for first completed task, or None |
Backends¶
BaseBackend ¶
Bases: ABC
Abstract base class for task backends.
enqueue
abstractmethod
¶
enqueue(
task: Task,
args: tuple[Any, ...],
kwargs: dict[str, Any],
*,
session_id: str,
key: str,
) -> TaskResult
Enqueue a task for execution.
get
abstractmethod
¶
Get a task result by session and key.
pop
abstractmethod
¶
Pop a completed result value, removing it from storage.
pop_all
abstractmethod
¶
Pop all completed results for a session.
cancel_all
abstractmethod
¶
Cancel all tasks for a session. Returns count cancelled.
pop_errors
abstractmethod
¶
Pop all failed results for a session, returning (key, error) pairs.
has
abstractmethod
¶
Check if there are any completed results.
wait
abstractmethod
async
¶
Wait for a specific task to complete.
wait_all
abstractmethod
async
¶
Wait for multiple tasks to complete. Returns dict of key -> value.
wait_any
abstractmethod
async
¶
Wait for any task to complete. Returns (key, value) or None.
InMemoryBackend ¶
Bases: BaseBackend
In-memory task backend using ThreadPoolExecutor.
Tasks are executed in a thread pool and results are stored in memory. Results are automatically cleaned up when expired (lazy cleanup on access).
Testing¶
ImmediateBackend ¶
Bases: BaseBackend
Backend that executes tasks immediately (synchronously).
Useful for testing when you want tasks to complete before assertions. Tracks full lifecycle (status transitions, timestamps).
Example
backend = ImmediateBackend() app = HookApp(task_backend=backend)
Tasks complete immediately¶
client = TestClient(app) response = client.send(MockEvent.bash(command="ls"))
Results are immediately available¶
assert backend.get("session", "key").status == TaskStatus.COMPLETED
enqueue ¶
enqueue(
task: Task,
args: tuple[Any, ...],
kwargs: dict[str, Any],
*,
session_id: str,
key: str,
) -> TaskResult
Execute task immediately (synchronous).