Skip to content

MCP Integration

AgentHelm integrates with the Model Context Protocol (MCP) to connect agents to external tools and data sources.

Quick Start

import asyncio
from agenthelm import MCPToolAdapter, ToolAgent
import dspy

lm = dspy.LM("mistral/mistral-large-latest")

async def main():
    # Connect to an MCP server
    adapter = MCPToolAdapter(
        server_config={"command": "uvx", "args": ["mcp-server-time"]}
    )
    await adapter.connect()

    # Get tools from MCP server
    tools = adapter.get_tools()

    # Use with AgentHelm agents
    agent = ToolAgent(name="time_agent", lm=lm, tools=tools)
    result = agent.run("What time is it?")
    print(result.answer)

    await adapter.close()

asyncio.run(main())

MCPToolAdapter

Wraps MCP server tools as AgentHelm-compatible callables.

from agenthelm import MCPToolAdapter

adapter = MCPToolAdapter(
    server_config={
        "command": "uvx",
        "args": ["mcp-server-filesystem", "/path/to/files"],
        "env": {"DEBUG": "true"},  # Optional
    }
)

await adapter.connect()
tools = adapter.get_tools()

Server Config

Key Description
command Executable to run (e.g., uvx, npx, python)
args Command arguments
env Environment variables (optional)

Saga Support

Define compensating actions for MCP tools:

adapter = MCPToolAdapter(
    server_config={"command": "uvx", "args": ["mcp-server-files"]},
    compensations={
        "create_file": "delete_file",
        "write_file": "restore_file",
    }
)

On failure, the Orchestrator will run the compensating MCP tool.

MCPClient (Low-Level)

For direct MCP protocol access:

from agenthelm import MCPClient

client = MCPClient({"command": "uvx", "args": ["mcp-server-time"]})
await client.connect()

# List available tools
tools = await client.list_tools()

# Call a tool
result = await client.call_tool("get_time", {"timezone": "UTC"})

await client.close()

Example: File Operations Agent

adapter = MCPToolAdapter(
    server_config={"command": "uvx", "args": ["mcp-server-filesystem", "."]},
    compensations={"write_file": "delete_file"}
)
await adapter.connect()

agent = ToolAgent(
    name="file_agent",
    lm=lm,
    tools=adapter.get_tools(),
    role="You are a file management assistant."
)

result = agent.run("Create a new file called notes.txt with 'Hello World'")

API Reference

MCPToolAdapter

MCPToolAdapter(server_config, compensations=None)

Wraps MCP server tools as AgentHelm-compatible callables.

Example

adapter = MCPToolAdapter({"command": "uvx", "args": ["mcp-server-time"]}) await adapter.connect() tools = adapter.get_tools()

Source code in agenthelm/mcp/adapter.py
def __init__(
    self,
    server_config: dict,
    compensations: dict[str, str] | None = None,
):
    self._client = MCPClient(server_config)
    self._tools: list[dict] = []
    self._compensations = compensations or {}

Functions

close async

close()
Source code in agenthelm/mcp/adapter.py
async def close(self):
    await self._client.close()

connect async

connect()

Connect and discover tools.

Source code in agenthelm/mcp/adapter.py
async def connect(self):
    """Connect and discover tools."""
    await self._client.connect()
    self._tools = await self._client.list_tools()

get_tools

get_tools()

Return MCP tools as callable functions.

Registers each tool in TOOL_REGISTRY with compensation info.

Source code in agenthelm/mcp/adapter.py
def get_tools(self) -> list[Callable]:
    """
    Return MCP tools as callable functions.

    Registers each tool in TOOL_REGISTRY with compensation info.
    """
    callables = []
    for tool_info in self._tools:
        name = tool_info["name"]
        description = tool_info.get("description", "MCP tool")

        # Create wrapper function
        def make_tool_func(tool_name: str):
            def tool_func(**kwargs) -> Any:
                """Sync wrapper for async MCP call."""
                # Bridge async -> sync for DSPy compatibility
                loop = asyncio.get_event_loop()
                if loop.is_running():
                    # If already in async context, use run_coroutine_threadsafe

                    future = asyncio.run_coroutine_threadsafe(
                        self._client.call_tool(tool_name, kwargs), loop
                    )
                    return future.result(timeout=30)
                else:
                    return asyncio.run(self._client.call_tool(tool_name, kwargs))

            return tool_func

        func = make_tool_func(name)
        func.__name__ = name
        func.__doc__ = description

        # Register in TOOL_REGISTRY with compensation
        compensate = self._compensations.get(name)
        TOOL_REGISTRY[name] = {
            "function": func,
            "contract": {
                "inputs": self._extract_input_schema(tool_info),
                "outputs": {"result": "Any"},
                "compensating_tool": compensate,
                "tags": ["mcp"],
            },
        }

        callables.append(func)
    return callables

options: show_source: false

MCPClient

MCPClient(server_config)

Low-level MCP protocol client.

Note: On Windows, MCP stdio transport may have issues with subprocess buffering and pipe handling. Using PYTHONUNBUFFERED=1 can help.

Initialize MCP client.

Parameters:

Name Type Description Default
server_config dict

Dict with 'command', 'args', and optional 'env' Example: {"command": "uvx", "args": ["mcp-server-time"]}

required
Source code in agenthelm/mcp/client.py
def __init__(self, server_config: dict):
    """
    Initialize MCP client.

    Args:
        server_config: Dict with 'command', 'args', and optional 'env'
            Example: {"command": "uvx", "args": ["mcp-server-time"]}
    """
    self.server_config = server_config
    self._session: ClientSession | None = None
    self._context = None
    self._read = None
    self._write = None

Attributes

server_config instance-attribute

server_config = server_config

Functions

__aenter__ async

__aenter__()

Async context manager entry.

Source code in agenthelm/mcp/client.py
async def __aenter__(self):
    """Async context manager entry."""
    await self.connect()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in agenthelm/mcp/client.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.close()

call_tool async

call_tool(name, arguments)

Call a tool on the MCP server.

Source code in agenthelm/mcp/client.py
async def call_tool(self, name: str, arguments: dict) -> Any:
    """Call a tool on the MCP server."""
    if not self._session:
        raise RuntimeError("Not connected. Call connect() first.")
    result = await self._session.call_tool(name, arguments)
    return result.content

close async

close()

Close the connection and clean up resources.

Source code in agenthelm/mcp/client.py
async def close(self):
    """Close the connection and clean up resources."""
    self._session = None
    if self._context:
        try:
            await self._context.__aexit__(None, None, None)
        except Exception:
            pass  # Ignore cleanup errors
        self._context = None
    self._read = None
    self._write = None

connect async

connect()

Connect to the MCP server.

Source code in agenthelm/mcp/client.py
async def connect(self):
    """Connect to the MCP server."""
    # Build environment with unbuffered Python (helps on Windows)
    env = os.environ.copy()
    env["PYTHONUNBUFFERED"] = "1"
    if self.server_config.get("env"):
        env.update(self.server_config["env"])

    params = StdioServerParameters(
        command=self.server_config["command"],
        args=self.server_config.get("args", []),
        env=env,
    )

    # Store the context manager to properly close it later
    self._context = stdio_client(params)
    self._read, self._write = await self._context.__aenter__()
    self._session = ClientSession(self._read, self._write)
    await self._session.initialize()

list_tools async

list_tools()

List available tools from the MCP server.

Source code in agenthelm/mcp/client.py
async def list_tools(self) -> list[dict]:
    """List available tools from the MCP server."""
    if not self._session:
        raise RuntimeError("Not connected. Call connect() first.")
    result = await self._session.list_tools()
    return [tool.model_dump() for tool in result.tools]

options: show_source: false