Orchestration¶
AgentHelm's orchestration layer executes plans by routing steps to registered agents.
Quick Start¶
import asyncio
from agenthelm import (
AgentRegistry, Orchestrator, PlannerAgent, ToolAgent, tool
)
import dspy
lm = dspy.LM("mistral/mistral-large-latest")
# Define tools
@tool()
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool()
def write(content: str) -> str:
"""Write content to file."""
return f"Wrote: {content}"
# Create and register agents
registry = AgentRegistry()
registry.register(ToolAgent(name="researcher", lm=lm, tools=[search]))
registry.register(ToolAgent(name="writer", lm=lm, tools=[write]))
# Generate a plan
planner = PlannerAgent(name="planner", lm=lm, tools=[search, write])
plan = planner.plan("Research AI trends and write a summary")
# Review and approve
print(plan.to_yaml())
plan.approved = True
# Execute
orchestrator = Orchestrator(registry)
result = asyncio.run(orchestrator.execute(plan))
print(result.answer)
AgentRegistry¶
Container for named agents that can be looked up during orchestration.
from agenthelm import AgentRegistry
registry = AgentRegistry()
# Register agents
registry.register(researcher_agent) # Uses agent.name as key
registry.register(writer_agent)
# Lookup
agent = registry["researcher"]
agent = registry.get("researcher") # Returns None if not found
# Check membership
if "researcher" in registry:
...
# Iterate
for name in registry:
print(name)
# Properties
registry.names # ["researcher", "writer"]
len(registry) # 2
# Management
registry.unregister("researcher")
registry.clear()
Orchestrator¶
Executes approved plans by routing steps to agents.
from agenthelm import Orchestrator
orchestrator = Orchestrator(
registry=registry,
default_agent=fallback_agent # Optional: for steps without agent_name
)
# Execute a plan (async)
result = await orchestrator.execute(plan)
Execution Flow¶
Plan.get_ready_steps()
│
▼
┌───────────────────┐
│ Parallel Batch │ Steps with no pending dependencies
│ (asyncio.gather) │ run concurrently
└───────────────────┘
│
▼
Mark completed
│
▼
Next batch...
│
▼
AgentResult
Parallel Execution¶
Steps without dependencies run in parallel:
plan = Plan(
goal="Research and summarize",
approved=True,
steps=[
PlanStep(id="a", agent_name="researcher", description="Search topic A"),
PlanStep(id="b", agent_name="researcher", description="Search topic B"),
PlanStep(id="c", agent_name="writer", description="Combine results",
depends_on=["a", "b"]), # Waits for a and b
]
)
# Steps a and b run in parallel
# Step c runs after both complete
Error Handling¶
Failed steps are marked and tracked:
result = await orchestrator.execute(plan)
if not result.success:
failed = [s for s in plan.steps if s.status == StepStatus.FAILED]
for step in failed:
print(f"Step {step.id} failed: {step.error}")
Plan Approval Flow¶
Plans must be approved before execution:
# Generate plan
plan = planner.plan("Do something complex")
# Review (CLI or programmatic)
print(plan.to_yaml())
# Approve
plan.approved = True
# Execute
result = await orchestrator.execute(plan)
CLI Approval (Week 5)¶
Default Agent¶
For steps without agent_name, a default agent can be used:
default = ToolAgent(name="default", lm=lm, tools=[...])
orchestrator = Orchestrator(registry, default_agent=default)
# This step uses default_agent
PlanStep(id="step_1", tool_name="search", description="...")
Saga Pattern (Rollback on Failure)¶
AgentHelm supports the Saga pattern for compensating actions when steps fail.
How It Works¶
- Steps execute normally
- If a step fails, rollback runs for completed steps in reverse order
- Each completed step's compensating action is called
Defining Compensating Actions¶
Option 1: Per-Tool (default)
@tool(compensating_tool="delete_file")
def create_file(path: str) -> str:
"""Create a file. Rollback: delete it."""
...
Option 2: Per-Step (override)
PlanStep(
id="step_1",
tool_name="create_file",
args={"path": "/tmp/data.txt"},
compensate_tool="archive_file", # Overrides tool default
compensate_args={"path": "/tmp/data.txt"},
)
Enabling Rollback¶
# Rollback enabled by default
orchestrator = Orchestrator(registry, enable_rollback=True)
# Disable if needed
orchestrator = Orchestrator(registry, enable_rollback=False)
Example¶
plan = Plan(
goal="Create and send report",
approved=True,
steps=[
PlanStep(
id="create",
agent_name="writer",
tool_name="create_report",
description="Create report file",
compensate_tool="delete_report", # Rollback action
),
PlanStep(
id="send",
agent_name="sender",
tool_name="send_email",
description="Send report via email",
depends_on=["create"],
),
],
)
# If "send" fails, "create" is rolled back via delete_report
result = await orchestrator.execute(plan)
API Reference¶
AgentRegistry ¶
Registry for named agents.
Allows registering and looking up agents by name for orchestration.
Example
registry = AgentRegistry() registry.register(researcher_agent) registry.register(writer_agent)
agent = registry["researcher"] agent.run("Find information about...")
Source code in agenthelm/orchestration/registry.py
Attributes¶
Functions¶
__contains__ ¶
__getitem__ ¶
Get an agent by name, raises KeyError if not found.
__iter__ ¶
__len__ ¶
clear ¶
get ¶
register ¶
Register an agent by its name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
BaseAgent
|
Agent to register (uses agent.name as key) |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If agent with same name already registered |
Source code in agenthelm/orchestration/registry.py
options: show_source: false
Orchestrator ¶
Executes plans by routing steps to registered agents.
Supports: - Sequential execution (steps with dependencies) - Parallel execution (independent steps) - Saga pattern: rollback on failure via compensating actions - Error handling and step failure tracking
Example
registry = AgentRegistry() registry.register(researcher) registry.register(writer)
orchestrator = Orchestrator(registry) result = await orchestrator.execute(plan)
Initialize orchestrator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
AgentRegistry
|
Registry of named agents |
required |
default_agent
|
BaseAgent | None
|
Fallback agent for steps without agent_name |
None
|
enable_rollback
|
bool
|
If True, run compensating actions on failure |
True
|
Source code in agenthelm/orchestration/orchestrator.py
Attributes¶
Functions¶
execute
async
¶
Execute a plan by routing steps to agents.
On failure, if enable_rollback is True, runs compensating actions for completed steps in reverse order (Saga pattern).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
plan
|
Plan
|
The plan to execute |
required |
Returns:
| Type | Description |
|---|---|
AgentResult
|
AgentResult with aggregated events and metrics |
Source code in agenthelm/orchestration/orchestrator.py
options: show_source: false