Skip to content

Orchestration

AgentHelm's orchestration layer executes plans by routing steps to registered agents.

Quick Start

import asyncio
from agenthelm import (
    AgentRegistry, Orchestrator, PlannerAgent, ToolAgent, tool
)
import dspy

lm = dspy.LM("mistral/mistral-large-latest")

# Define tools
@tool()
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool()
def write(content: str) -> str:
    """Write content to file."""
    return f"Wrote: {content}"

# Create and register agents
registry = AgentRegistry()
registry.register(ToolAgent(name="researcher", lm=lm, tools=[search]))
registry.register(ToolAgent(name="writer", lm=lm, tools=[write]))

# Generate a plan
planner = PlannerAgent(name="planner", lm=lm, tools=[search, write])
plan = planner.plan("Research AI trends and write a summary")

# Review and approve
print(plan.to_yaml())
plan.approved = True

# Execute
orchestrator = Orchestrator(registry)
result = asyncio.run(orchestrator.execute(plan))
print(result.answer)

AgentRegistry

Container for named agents that can be looked up during orchestration.

from agenthelm import AgentRegistry

registry = AgentRegistry()

# Register agents
registry.register(researcher_agent)  # Uses agent.name as key
registry.register(writer_agent)

# Lookup
agent = registry["researcher"]
agent = registry.get("researcher")  # Returns None if not found

# Check membership
if "researcher" in registry:
    ...

# Iterate
for name in registry:
    print(name)

# Properties
registry.names  # ["researcher", "writer"]
len(registry)   # 2

# Management
registry.unregister("researcher")
registry.clear()

Orchestrator

Executes approved plans by routing steps to agents.

from agenthelm import Orchestrator

orchestrator = Orchestrator(
    registry=registry,
    default_agent=fallback_agent  # Optional: for steps without agent_name
)

# Execute a plan (async)
result = await orchestrator.execute(plan)

Execution Flow

Plan.get_ready_steps()
┌───────────────────┐
│  Parallel Batch   │  Steps with no pending dependencies
│  (asyncio.gather) │  run concurrently
└───────────────────┘
   Mark completed
   Next batch...
    AgentResult

Parallel Execution

Steps without dependencies run in parallel:

plan = Plan(
    goal="Research and summarize",
    approved=True,
    steps=[
        PlanStep(id="a", agent_name="researcher", description="Search topic A"),
        PlanStep(id="b", agent_name="researcher", description="Search topic B"),
        PlanStep(id="c", agent_name="writer", description="Combine results",
                 depends_on=["a", "b"]),  # Waits for a and b
    ]
)

# Steps a and b run in parallel
# Step c runs after both complete

Error Handling

Failed steps are marked and tracked:

result = await orchestrator.execute(plan)

if not result.success:
    failed = [s for s in plan.steps if s.status == StepStatus.FAILED]
    for step in failed:
        print(f"Step {step.id} failed: {step.error}")

Plan Approval Flow

Plans must be approved before execution:

# Generate plan
plan = planner.plan("Do something complex")

# Review (CLI or programmatic)
print(plan.to_yaml())

# Approve
plan.approved = True

# Execute
result = await orchestrator.execute(plan)

CLI Approval (Week 5)

$ agenthelm plan "Research AI trends" --approve

Default Agent

For steps without agent_name, a default agent can be used:

default = ToolAgent(name="default", lm=lm, tools=[...])
orchestrator = Orchestrator(registry, default_agent=default)

# This step uses default_agent
PlanStep(id="step_1", tool_name="search", description="...")

Saga Pattern (Rollback on Failure)

AgentHelm supports the Saga pattern for compensating actions when steps fail.

How It Works

  1. Steps execute normally
  2. If a step fails, rollback runs for completed steps in reverse order
  3. Each completed step's compensating action is called

Defining Compensating Actions

Option 1: Per-Tool (default)

@tool(compensating_tool="delete_file")
def create_file(path: str) -> str:
    """Create a file. Rollback: delete it."""
    ...

Option 2: Per-Step (override)

PlanStep(
    id="step_1",
    tool_name="create_file",
    args={"path": "/tmp/data.txt"},
    compensate_tool="archive_file",  # Overrides tool default
    compensate_args={"path": "/tmp/data.txt"},
)

Enabling Rollback

# Rollback enabled by default
orchestrator = Orchestrator(registry, enable_rollback=True)

# Disable if needed
orchestrator = Orchestrator(registry, enable_rollback=False)

Example

plan = Plan(
    goal="Create and send report",
    approved=True,
    steps=[
        PlanStep(
            id="create",
            agent_name="writer",
            tool_name="create_report",
            description="Create report file",
            compensate_tool="delete_report",  # Rollback action
        ),
        PlanStep(
            id="send",
            agent_name="sender",
            tool_name="send_email",
            description="Send report via email",
            depends_on=["create"],
        ),
    ],
)

# If "send" fails, "create" is rolled back via delete_report
result = await orchestrator.execute(plan)

API Reference

AgentRegistry

AgentRegistry()

Registry for named agents.

Allows registering and looking up agents by name for orchestration.

Example

registry = AgentRegistry() registry.register(researcher_agent) registry.register(writer_agent)

agent = registry["researcher"] agent.run("Find information about...")

Source code in agenthelm/orchestration/registry.py
def __init__(self):
    self._agents: dict[str, BaseAgent] = {}

Attributes

names property

names

List of all registered agent names.

Functions

__contains__

__contains__(name)

Check if an agent is registered.

Source code in agenthelm/orchestration/registry.py
def __contains__(self, name: str) -> bool:
    """Check if an agent is registered."""
    return name in self._agents

__getitem__

__getitem__(name)

Get an agent by name, raises KeyError if not found.

Source code in agenthelm/orchestration/registry.py
def __getitem__(self, name: str) -> BaseAgent:
    """Get an agent by name, raises KeyError if not found."""
    if name not in self._agents:
        raise KeyError(f"Agent '{name}' not found in registry")
    return self._agents[name]

__iter__

__iter__()

Iterate over agent names.

Source code in agenthelm/orchestration/registry.py
def __iter__(self) -> Iterator[str]:
    """Iterate over agent names."""
    return iter(self._agents)

__len__

__len__()

Number of registered agents.

Source code in agenthelm/orchestration/registry.py
def __len__(self) -> int:
    """Number of registered agents."""
    return len(self._agents)

clear

clear()

Remove all agents from registry.

Source code in agenthelm/orchestration/registry.py
def clear(self) -> None:
    """Remove all agents from registry."""
    self._agents.clear()

get

get(name)

Get an agent by name, returns None if not found.

Source code in agenthelm/orchestration/registry.py
def get(self, name: str) -> BaseAgent | None:
    """Get an agent by name, returns None if not found."""
    return self._agents.get(name)

register

register(agent)

Register an agent by its name.

Parameters:

Name Type Description Default
agent BaseAgent

Agent to register (uses agent.name as key)

required

Raises:

Type Description
ValueError

If agent with same name already registered

Source code in agenthelm/orchestration/registry.py
def register(self, agent: BaseAgent) -> None:
    """
    Register an agent by its name.

    Args:
        agent: Agent to register (uses agent.name as key)

    Raises:
        ValueError: If agent with same name already registered
    """
    if agent.name in self._agents:
        raise ValueError(f"Agent '{agent.name}' already registered")
    self._agents[agent.name] = agent

unregister

unregister(name)

Remove an agent from the registry.

Parameters:

Name Type Description Default
name str

Name of agent to remove

required

Returns:

Type Description
BaseAgent | None

The removed agent, or None if not found

Source code in agenthelm/orchestration/registry.py
def unregister(self, name: str) -> BaseAgent | None:
    """
    Remove an agent from the registry.

    Args:
        name: Name of agent to remove

    Returns:
        The removed agent, or None if not found
    """
    return self._agents.pop(name, None)

options: show_source: false

Orchestrator

Orchestrator(
    registry, default_agent=None, enable_rollback=True
)

Executes plans by routing steps to registered agents.

Supports: - Sequential execution (steps with dependencies) - Parallel execution (independent steps) - Saga pattern: rollback on failure via compensating actions - Error handling and step failure tracking

Example

registry = AgentRegistry() registry.register(researcher) registry.register(writer)

orchestrator = Orchestrator(registry) result = await orchestrator.execute(plan)

Initialize orchestrator.

Parameters:

Name Type Description Default
registry AgentRegistry

Registry of named agents

required
default_agent BaseAgent | None

Fallback agent for steps without agent_name

None
enable_rollback bool

If True, run compensating actions on failure

True
Source code in agenthelm/orchestration/orchestrator.py
def __init__(
    self,
    registry: AgentRegistry,
    default_agent: BaseAgent | None = None,
    enable_rollback: bool = True,
):
    """
    Initialize orchestrator.

    Args:
        registry: Registry of named agents
        default_agent: Fallback agent for steps without agent_name
        enable_rollback: If True, run compensating actions on failure
    """
    self.registry = registry
    self.default_agent = default_agent
    self.enable_rollback = enable_rollback

Attributes

default_agent instance-attribute

default_agent = default_agent

enable_rollback instance-attribute

enable_rollback = enable_rollback

registry instance-attribute

registry = registry

Functions

execute async

execute(plan)

Execute a plan by routing steps to agents.

On failure, if enable_rollback is True, runs compensating actions for completed steps in reverse order (Saga pattern).

Parameters:

Name Type Description Default
plan Plan

The plan to execute

required

Returns:

Type Description
AgentResult

AgentResult with aggregated events and metrics

Source code in agenthelm/orchestration/orchestrator.py
async def execute(self, plan: Plan) -> AgentResult:
    """
    Execute a plan by routing steps to agents.

    On failure, if enable_rollback is True, runs compensating actions
    for completed steps in reverse order (Saga pattern).

    Args:
        plan: The plan to execute

    Returns:
        AgentResult with aggregated events and metrics
    """
    if not plan.approved:
        raise ValueError("Plan must be approved before execution")

    result = AgentResult(success=False)
    all_events: list[Event] = []
    failed = False

    while not plan.is_complete:
        ready_steps = plan.get_ready_steps()

        if not ready_steps:
            # No steps ready but plan not complete - deadlock
            result.error = "Plan execution deadlock: no steps ready"
            failed = True
            break

        # Execute ready steps in parallel
        step_results = await asyncio.gather(
            *[self._execute_step(step) for step in ready_steps],
            return_exceptions=True,
        )

        # Process results
        for step, step_result in zip(ready_steps, step_results):
            if isinstance(step_result, Exception):
                plan.mark_failed(step.id, str(step_result))
                failed = True
            else:
                output, events = step_result
                plan.mark_completed(step.id, result=output)
                all_events.extend(events)

        # On first failure, break and rollback
        if failed:
            break

    # Saga: rollback completed steps on failure
    if failed and self.enable_rollback:
        rollback_events = await self._rollback(plan)
        all_events.extend(rollback_events)

    # Build final result
    result.success = plan.success
    for event in all_events:
        result.add_event(event)

    if not result.success and not result.error:
        failed_steps = [s for s in plan.steps if s.status == StepStatus.FAILED]
        if failed_steps:
            result.error = f"Steps failed: {[s.id for s in failed_steps]}"

    return result

options: show_source: false