Skip to content

joehunt/toolthread

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

toolthread

Long-running tool execution with real-time progress updates for AI agents.

AI agents often need to kick off tasks that take minutes, not milliseconds -- deploying infrastructure, running data pipelines, waiting on human approval. toolthread gives your agent a clean way to start background work, stream progress updates, ask the customer questions mid-task, and poll for results, all without blocking the conversation.

Features

  • Background task execution with progress streaming
  • Customer Q&A -- tasks can ask the customer questions and block until answered
  • Long-polling for efficient real-time updates (no busy loops)
  • Pluggable storage -- in-memory (default) or SQLite, or bring your own
  • Google ADK integration out of the box
  • Thread-safe, zero external dependencies for core

Installation

pip install toolthread

With Google ADK support:

pip install toolthread[adk]

Quick Start

from toolthread import TaskManager, TaskContext

manager = TaskManager()

@manager.register_task("deploy")
def deploy(ctx: TaskContext, params: dict) -> str:
    ctx.update("Pulling latest image...")
    # ... do work ...
    ctx.update("Running migrations...")
    # ... do work ...
    return "Deployed v1.2.3 to production"

# Start the task (returns immediately)
task_id = manager.start_task("deploy", env="production")

# Poll for updates (long-polls up to 30s by default)
response = manager.poll_task(task_id)
for update in response.updates:
    print(f"[{update.level.value}] {update.message}")

if response.result:
    print(f"Done: {response.result.result}")

Customer Interaction

Tasks can ask the customer questions and wait for answers:

@manager.register_task("setup_database")
def setup_database(ctx: TaskContext, params: dict) -> str:
    ctx.update("Checking existing databases...")
    answer = ctx.ask_customer(
        "Found an existing database. Drop and recreate? (yes/no)",
        timeout=120.0,
    )
    if answer.lower() == "yes":
        ctx.update("Dropping and recreating database...")
    else:
        ctx.update("Keeping existing database, running migrations only...")
    return "Database setup complete"

# Agent starts the task, polls, sees the pending question,
# asks the customer, then relays their answer:
task_id = manager.start_task("setup_database")
response = manager.poll_task(task_id)

if response.pending_question:
    # Show the question to the customer, get their answer...
    manager.respond_to_question(
        task_id,
        response.pending_question.question_id,
        "yes",
    )

Google ADK Integration

from toolthread.adk import create_adk_tools

# Wrap your TaskManager into ADK-compatible tools
tools = create_adk_tools(manager)

# Use the tools with your ADK agent -- they handle
# start_task, poll_task, respond_to_question, and cancel_task

See the Developer Guide for a complete ADK agent example and system instructions template.

Documentation

License

MIT

About

Long-running tool execution with real-time progress updates for AI agents

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages