Skip to content

API Reference

This section provides a detailed API reference for all public classes and functions in the AgentHelm framework.

Agent

Agent

Source code in orchestrator/agent.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Agent:
    def __init__(self, tools: list, tracer: ExecutionTracer, client: LLMClient):
        self.tools = tools
        self.tracer = tracer
        self.client = client
        self.tool_map = {func.__name__: func for func in self.tools}

    def run(self, task: str):
        return self.run_react(task, max_steps=1)

    def make_prompt(self, task: str, history: str = "") -> tuple[str, str]:
        tools_str = self.get_tools_str()

        system_prompt = """
        You are a helpful assistant that completes a task by planning and executing a sequence of tool calls.
        Your goal is to decide the single next step to achieve the user's request.

        You must respond with a single, valid JSON object with four keys:
        1. 'thought': A string explaining your reasoning for the chosen action.
        2. 'confidence': A float from 0.0 to 1.0 indicating your confidence in this choice.
        3. 'tool_name': The name of the tool to call.
        4. 'arguments': A dictionary of arguments for the tool.

        If you believe the task is complete, you must use the special tool 'finish' and provide the final answer.
        Do not add any other text, explanations, or markdown formatting.
        """

        user_prompt = f"""
        Here are the available tools:
        ---
        {tools_str}
        ---
        Here is the history of the steps executed so far:
        ---
        {history}
        ---
        Based on the history, what is the single next step to achieve the following user request?
        User's request: "{task}"
        """
        return system_prompt, user_prompt

    def get_tools_str(self) -> str:
        tools_str = ""
        for tool_func in self.tools:
            tool_name = tool_func.__name__
            if tool_name in TOOL_REGISTRY:
                contract = TOOL_REGISTRY[tool_name]['contract']
                tools_str += f"Tool Name: {tool_name}\n"
                tools_str += f"Inputs: {contract['inputs']}\n\n"
        tools_str += "Tool Name: finish\n"
        tools_str += "Inputs: {{'answer': 'The final answer to the user\'s request'}}\n\n"
        return tools_str

    def run_react(self, task: str, max_steps: int = 10):
        logging.info(f"Agent is running task with ReAct loop: {task}")

        successful_steps = []
        history = "No steps taken yet."

        for i in range(max_steps):
            logging.info(f"\n--- Step {i + 1} ---")

            system_prompt, user_prompt = self.make_prompt(task, history)
            llm_response = self.client.predict(system_prompt, user_prompt)
            logging.debug(f"LLM Response: {llm_response}")

            try:
                json_start = llm_response.find('{')
                json_end = llm_response.rfind('}') + 1
                clean_json_str = llm_response[json_start:json_end]
                choice = json.loads(clean_json_str)

                thought = choice.get('thought', 'N/A')
                confidence = choice.get('confidence', 1.0)
                tool_name = choice['tool_name']
                arguments = choice['arguments']
                logging.info(f"LLM chose tool: {tool_name} with arguments: {arguments}")
                logging.debug(f"LLM thought: {thought}")

                if tool_name == "finish":
                    logging.info(f"--- Workflow finished. Final Answer: {arguments.get('answer')} ---")
                    return {"status": "Workflow succeeded.", "final_answer": arguments.get('answer')}

                if tool_name in self.tool_map:
                    tool_function = self.tool_map[tool_name]

                    self.tracer.set_trace_context(reasoning=thought, confidence=confidence)

                    result = self.tracer.trace_and_execute(tool_function, **arguments)

                    successful_steps.append({"tool_name": tool_name, "arguments": arguments})
                    history += f"\nStep {i+1}: Executed tool '{tool_name}'. Observation: {result}"
                    logging.info(f"--- Tool execution finished. Result: {result} ---")
                else:
                    raise KeyError(f"LLM chose a tool '{tool_name}' that is not available.")

            except Exception as e:
                logging.error(f"--- Step {i + 1} FAILED: {e} ---")
                logging.warning("--- INITIATING ROLLBACK ---")

                for step in reversed(successful_steps):
                    completed_tool_name = step["tool_name"]
                    logging.info(f"Checking for rollback for: {completed_tool_name}")

                    contract = TOOL_REGISTRY[completed_tool_name]['contract']
                    compensating_tool_name = contract.get('compensating_tool')

                    if compensating_tool_name and compensating_tool_name in self.tool_map:
                        logging.info(f"Found compensating tool: {compensating_tool_name}. Running it now.")
                        compensating_tool_func = self.tool_map[compensating_tool_name]
                        self.tracer.set_trace_context("Executing compensating tool for failed workflow.", 1.0)
                        self.tracer.trace_and_execute(compensating_tool_func, **step["arguments"])
                    else:
                        logging.warning(f"No compensating tool found for {completed_tool_name}.")

                logging.info("--- Rollback complete. Halting workflow. ---")
                return {"status": "Workflow failed and was rolled back."}

        logging.warning("--- Workflow finished due to max steps reached. ---")
        return {"status": "Workflow finished due to max steps."}

Functions

__init__(tools, tracer, client)

Source code in orchestrator/agent.py
 9
10
11
12
13
def __init__(self, tools: list, tracer: ExecutionTracer, client: LLMClient):
    self.tools = tools
    self.tracer = tracer
    self.client = client
    self.tool_map = {func.__name__: func for func in self.tools}

run(task)

Source code in orchestrator/agent.py
15
16
def run(self, task: str):
    return self.run_react(task, max_steps=1)

run_react(task, max_steps=10)

Source code in orchestrator/agent.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def run_react(self, task: str, max_steps: int = 10):
    logging.info(f"Agent is running task with ReAct loop: {task}")

    successful_steps = []
    history = "No steps taken yet."

    for i in range(max_steps):
        logging.info(f"\n--- Step {i + 1} ---")

        system_prompt, user_prompt = self.make_prompt(task, history)
        llm_response = self.client.predict(system_prompt, user_prompt)
        logging.debug(f"LLM Response: {llm_response}")

        try:
            json_start = llm_response.find('{')
            json_end = llm_response.rfind('}') + 1
            clean_json_str = llm_response[json_start:json_end]
            choice = json.loads(clean_json_str)

            thought = choice.get('thought', 'N/A')
            confidence = choice.get('confidence', 1.0)
            tool_name = choice['tool_name']
            arguments = choice['arguments']
            logging.info(f"LLM chose tool: {tool_name} with arguments: {arguments}")
            logging.debug(f"LLM thought: {thought}")

            if tool_name == "finish":
                logging.info(f"--- Workflow finished. Final Answer: {arguments.get('answer')} ---")
                return {"status": "Workflow succeeded.", "final_answer": arguments.get('answer')}

            if tool_name in self.tool_map:
                tool_function = self.tool_map[tool_name]

                self.tracer.set_trace_context(reasoning=thought, confidence=confidence)

                result = self.tracer.trace_and_execute(tool_function, **arguments)

                successful_steps.append({"tool_name": tool_name, "arguments": arguments})
                history += f"\nStep {i+1}: Executed tool '{tool_name}'. Observation: {result}"
                logging.info(f"--- Tool execution finished. Result: {result} ---")
            else:
                raise KeyError(f"LLM chose a tool '{tool_name}' that is not available.")

        except Exception as e:
            logging.error(f"--- Step {i + 1} FAILED: {e} ---")
            logging.warning("--- INITIATING ROLLBACK ---")

            for step in reversed(successful_steps):
                completed_tool_name = step["tool_name"]
                logging.info(f"Checking for rollback for: {completed_tool_name}")

                contract = TOOL_REGISTRY[completed_tool_name]['contract']
                compensating_tool_name = contract.get('compensating_tool')

                if compensating_tool_name and compensating_tool_name in self.tool_map:
                    logging.info(f"Found compensating tool: {compensating_tool_name}. Running it now.")
                    compensating_tool_func = self.tool_map[compensating_tool_name]
                    self.tracer.set_trace_context("Executing compensating tool for failed workflow.", 1.0)
                    self.tracer.trace_and_execute(compensating_tool_func, **step["arguments"])
                else:
                    logging.warning(f"No compensating tool found for {completed_tool_name}.")

            logging.info("--- Rollback complete. Halting workflow. ---")
            return {"status": "Workflow failed and was rolled back."}

    logging.warning("--- Workflow finished due to max steps reached. ---")
    return {"status": "Workflow finished due to max steps."}

Tools

The @tool decorator

tool(inputs=None, outputs=None, side_effects=None, max_cost=0.0, requires_approval=False, retries=0, compensating_tool=None)

A decorator to register a function as a tool in the orchestration framework. If 'inputs' or 'outputs' are not provided, they will be inferred from the function's type hints.

Source code in orchestrator/core/tool.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def tool(inputs: dict = None,
         outputs: dict = None,
         side_effects: List[str] = None,
         max_cost: float = 0.0,
         requires_approval: bool = False,
         retries: int = 0,
         compensating_tool: str = None):
    """
    A decorator to register a function as a tool in the orchestration framework.
    If 'inputs' or 'outputs' are not provided, they will be inferred from the function's type hints.
    """

    def tool_decorator(func: Callable) -> Callable:
        """This is the actual decorator that wraps the function and builds the contract."""
        tool_name = func.__name__

        # --- Introspection Logic --- 
        sig = inspect.signature(func)

        # Infer inputs from type hints if not provided
        introspected_inputs = {
            param.name: param.annotation.__name__
            for param in sig.parameters.values()
            if param.kind == param.POSITIONAL_OR_KEYWORD
        }

        # Infer outputs from return type hint if not provided
        introspected_outputs = {}
        if sig.return_annotation is not inspect.Signature.empty:
            # For simplicity, we'll assume the output is a dict with a single key 'result'
            introspected_outputs = {'result': sig.return_annotation.__name__}

        # --- Contract Creation --- 
        final_inputs = inputs if inputs is not None else introspected_inputs
        final_outputs = outputs if outputs is not None else introspected_outputs

        contract = {
            "inputs": final_inputs,
            "outputs": final_outputs,
            "side_effects": side_effects or [],
            "max_cost": max_cost,
            "requires_approval": requires_approval,
            "retries": retries,
            "compensating_tool": compensating_tool,
        }

        # Register the tool and its contract
        TOOL_REGISTRY[tool_name] = {
            "function": func,
            "contract": contract
        }

        @wraps(func)
        def wrapper(*args, **kwargs):
            # The orchestrator will use the registry to perform checks 
            # before this wrapper is ever called.
            return func(*args, **kwargs)

        return wrapper

    return tool_decorator

Tool Registry

TOOL_REGISTRY = {} module-attribute

Tracer

ExecutionTracer

Source code in orchestrator/core/tracer.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class ExecutionTracer:
    def __init__(self, storage: FileStorage, approval_handler: ApprovalHandler = None):
        self.storage = storage
        self.approval_handler = approval_handler or CliHandler()
        self._current_reasoning = None
        self._current_confidence = 1.0

    def set_trace_context(self, reasoning: str, confidence: float):
        """Sets the LLM reasoning context for the next trace event."""
        self._current_reasoning = reasoning
        self._current_confidence = confidence

    def trace_and_execute(self, tool_func: Callable, *args, **kwargs):
        pargs = inspect.signature(tool_func).bind(*args, **kwargs).arguments
        timestamp = datetime.now(timezone.utc)
        start_time = time.monotonic()
        output = None
        error_state = None

        try:
            contract = TOOL_REGISTRY.get(tool_func.__name__, {}).get('contract', {})
            requires_approval = contract.get('requires_approval', False)
            if requires_approval:
                user_approval = self.approval_handler.request_approval(tool_func.__name__, pargs)
                if not user_approval:
                    raise PermissionError("User did not approve execution.")

            retries = contract.get('retries', 0)
            for attempt in range(retries + 1):
                try:
                    output = tool_func(*args, **kwargs)
                    error_state = None  # Reset error state on success
                    break  # If successful, exit the loop
                except Exception as e:
                    error_state = str(e)
                    logging.warning(f"Attempt {attempt + 1}/{retries + 1} failed: {error_state}")
                    if attempt < retries:
                        time.sleep(1)  # Wait 1 second before the next attempt

            if error_state:
                raise RuntimeError(error_state)

        except Exception as e:
            error_state = str(e)

        execution_time = time.monotonic() - start_time
        outputs_dict = {"result": output} if error_state is None else {}

        event = Event(timestamp=timestamp,
                      tool_name=tool_func.__name__,
                      inputs=pargs,
                      outputs=outputs_dict,
                      execution_time=execution_time,
                      error_state=error_state,
                      llm_reasoning_trace=self._current_reasoning or "",
                      confidence_score=self._current_confidence)

        # Clear the context for the next run
        self._current_reasoning = None
        self._current_confidence = 1.0

        self.storage.save(event.model_dump())

        if error_state:
            raise RuntimeError(error_state)

        return output

Functions

__init__(storage, approval_handler=None)

Source code in orchestrator/core/tracer.py
15
16
17
18
19
def __init__(self, storage: FileStorage, approval_handler: ApprovalHandler = None):
    self.storage = storage
    self.approval_handler = approval_handler or CliHandler()
    self._current_reasoning = None
    self._current_confidence = 1.0

set_trace_context(reasoning, confidence)

Sets the LLM reasoning context for the next trace event.

Source code in orchestrator/core/tracer.py
21
22
23
24
def set_trace_context(self, reasoning: str, confidence: float):
    """Sets the LLM reasoning context for the next trace event."""
    self._current_reasoning = reasoning
    self._current_confidence = confidence

trace_and_execute(tool_func, *args, **kwargs)

Source code in orchestrator/core/tracer.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def trace_and_execute(self, tool_func: Callable, *args, **kwargs):
    pargs = inspect.signature(tool_func).bind(*args, **kwargs).arguments
    timestamp = datetime.now(timezone.utc)
    start_time = time.monotonic()
    output = None
    error_state = None

    try:
        contract = TOOL_REGISTRY.get(tool_func.__name__, {}).get('contract', {})
        requires_approval = contract.get('requires_approval', False)
        if requires_approval:
            user_approval = self.approval_handler.request_approval(tool_func.__name__, pargs)
            if not user_approval:
                raise PermissionError("User did not approve execution.")

        retries = contract.get('retries', 0)
        for attempt in range(retries + 1):
            try:
                output = tool_func(*args, **kwargs)
                error_state = None  # Reset error state on success
                break  # If successful, exit the loop
            except Exception as e:
                error_state = str(e)
                logging.warning(f"Attempt {attempt + 1}/{retries + 1} failed: {error_state}")
                if attempt < retries:
                    time.sleep(1)  # Wait 1 second before the next attempt

        if error_state:
            raise RuntimeError(error_state)

    except Exception as e:
        error_state = str(e)

    execution_time = time.monotonic() - start_time
    outputs_dict = {"result": output} if error_state is None else {}

    event = Event(timestamp=timestamp,
                  tool_name=tool_func.__name__,
                  inputs=pargs,
                  outputs=outputs_dict,
                  execution_time=execution_time,
                  error_state=error_state,
                  llm_reasoning_trace=self._current_reasoning or "",
                  confidence_score=self._current_confidence)

    # Clear the context for the next run
    self._current_reasoning = None
    self._current_confidence = 1.0

    self.storage.save(event.model_dump())

    if error_state:
        raise RuntimeError(error_state)

    return output

Event Model

Event

Bases: BaseModel

timestamp: When the event happened. tool_name: The name of the tool that was called. inputs: The arguments the tool was called with. outputs: What the tool returned. execution_time: How long it took to run. error_state: Any error that occurred, or null if it succeeded. llm_reasoning_trace: (For now, this can be a placeholder string). confidence_score: (For now, this can be a placeholder float, like 1.0).

Source code in orchestrator/core/event.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Event(BaseModel):
    '''
        timestamp: When the event happened.
        tool_name: The name of the tool that was called.
        inputs: The arguments the tool was called with.
        outputs: What the tool returned.
        execution_time: How long it took to run.
        error_state: Any error that occurred, or null if it succeeded.
        llm_reasoning_trace: (For now, this can be a placeholder string).
        confidence_score: (For now, this can be a placeholder float, like 1.0).
    '''
    timestamp: datetime
    tool_name: str
    inputs: dict[str, Any]
    outputs: dict[str, Any]
    execution_time: float
    error_state: Optional[str] = None
    llm_reasoning_trace: str = "placeholder"
    confidence_score: float = 1.0

Attributes

confidence_score = 1.0 class-attribute instance-attribute

error_state = None class-attribute instance-attribute

execution_time instance-attribute

inputs instance-attribute

llm_reasoning_trace = 'placeholder' class-attribute instance-attribute

outputs instance-attribute

timestamp instance-attribute

tool_name instance-attribute

Storage

FileStorage

Source code in orchestrator/core/storage.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class FileStorage:
    def __init__(self, file_path: str):
        self.file_path = file_path
        # Create the file with an empty list if it doesn't exist
        if not self.exists():
            with open(self.file_path, 'w') as f:
                json.dump([], f)

    def save(self, data: Dict[str, Any], override=False) -> None:
        """
        Save data to a JSON file.
        If override is True, it will replace the entire file content.
        If override is False, it will append the data to the existing list.
        """
        if override:
            with open(self.file_path, 'w') as f:
                json.dump([data], f, indent=2, default=str)
        else:
            current_data = self.load()
            current_data.append(data)
            with open(self.file_path, 'w') as f:
                json.dump(current_data, f, indent=2, default=str)

    def load(self) -> List[Dict[str, Any]]:
        """Load data from a JSON file. Returns a list of dictionaries."""
        if not self.exists():
            return []
        with open(self.file_path, 'r') as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return []

    def exists(self) -> bool:
        """Check if the storage file exists."""
        return os.path.exists(self.file_path)

Attributes

file_path = file_path instance-attribute

Functions

__init__(file_path)

Source code in orchestrator/core/storage.py
 7
 8
 9
10
11
12
def __init__(self, file_path: str):
    self.file_path = file_path
    # Create the file with an empty list if it doesn't exist
    if not self.exists():
        with open(self.file_path, 'w') as f:
            json.dump([], f)

exists()

Check if the storage file exists.

Source code in orchestrator/core/storage.py
39
40
41
def exists(self) -> bool:
    """Check if the storage file exists."""
    return os.path.exists(self.file_path)

load()

Load data from a JSON file. Returns a list of dictionaries.

Source code in orchestrator/core/storage.py
29
30
31
32
33
34
35
36
37
def load(self) -> List[Dict[str, Any]]:
    """Load data from a JSON file. Returns a list of dictionaries."""
    if not self.exists():
        return []
    with open(self.file_path, 'r') as f:
        try:
            return json.load(f)
        except json.JSONDecodeError:
            return []

save(data, override=False)

Save data to a JSON file. If override is True, it will replace the entire file content. If override is False, it will append the data to the existing list.

Source code in orchestrator/core/storage.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def save(self, data: Dict[str, Any], override=False) -> None:
    """
    Save data to a JSON file.
    If override is True, it will replace the entire file content.
    If override is False, it will append the data to the existing list.
    """
    if override:
        with open(self.file_path, 'w') as f:
            json.dump([data], f, indent=2, default=str)
    else:
        current_data = self.load()
        current_data.append(data)
        with open(self.file_path, 'w') as f:
            json.dump(current_data, f, indent=2, default=str)

LLM Clients

LLMClient

Bases: object

Source code in orchestrator/llm/base.py
1
2
3
4
5
6
7
8
class LLMClient(object):
    def __init__(self, model_name: str, base_url: str, api_key: str):
        self.model_name = model_name
        self.base_url = base_url
        self.api_key = api_key

    def predict(self, system_prompt: str, user_prompt: str) -> str:
        raise NotImplementedError("Subclasses must implement this method")

Functions

__init__(model_name, base_url, api_key)

Source code in orchestrator/llm/base.py
2
3
4
5
def __init__(self, model_name: str, base_url: str, api_key: str):
    self.model_name = model_name
    self.base_url = base_url
    self.api_key = api_key

predict(system_prompt, user_prompt)

Source code in orchestrator/llm/base.py
7
8
def predict(self, system_prompt: str, user_prompt: str) -> str:
    raise NotImplementedError("Subclasses must implement this method")

MistralClient

Bases: LLMClient

A client for interacting with the Mistral AI API.

Source code in orchestrator/llm/mistral_client.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MistralClient(LLMClient):
    """A client for interacting with the Mistral AI API."""

    def __init__(self, model_name: str, api_key: str, base_url: str = "https://api.mistral.ai/v1"):
        super().__init__(model_name, base_url, api_key)

    def predict(self, system_prompt: str, user_prompt: str) -> str:
        """Calls the Mistral Chat Completions API."""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": self.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": system_prompt
                },
                {
                    "role": "user",
                    "content": user_prompt
                }
            ]
        }
        response = requests.post(f"{self.base_url}/chat/completions", headers=headers, json=data)
        response.raise_for_status()
        result = response.json()
        return result['choices'][0]['message']['content'].strip()

Functions

__init__(model_name, api_key, base_url='https://api.mistral.ai/v1')

Source code in orchestrator/llm/mistral_client.py
8
9
def __init__(self, model_name: str, api_key: str, base_url: str = "https://api.mistral.ai/v1"):
    super().__init__(model_name, base_url, api_key)

predict(system_prompt, user_prompt)

Calls the Mistral Chat Completions API.

Source code in orchestrator/llm/mistral_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def predict(self, system_prompt: str, user_prompt: str) -> str:
    """Calls the Mistral Chat Completions API."""
    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json"
    }
    data = {
        "model": self.model_name,
        "messages": [
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": user_prompt
            }
        ]
    }
    response = requests.post(f"{self.base_url}/chat/completions", headers=headers, json=data)
    response.raise_for_status()
    result = response.json()
    return result['choices'][0]['message']['content'].strip()

OpenAIClient

Bases: LLMClient

Source code in orchestrator/llm/openai_client.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class OpenAIClient(LLMClient):
    def __init__(self, model_name: str, api_key: str, base_url: str = "https://api.openai.com/v1"):
        super().__init__(model_name, base_url, api_key)

    def predict(self, system_prompt: str, user_prompt: str) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": self.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": system_prompt
                },
                {
                    "role": "user",
                    "content": user_prompt
                }
            ],
            "max_tokens": 1000
        }
        response = requests.post(f"{self.base_url}/chat/completions", headers=headers, json=data)
        response.raise_for_status()
        result = response.json()
        return result['choices'][0]['message']['content'].strip()

Functions

__init__(model_name, api_key, base_url='https://api.openai.com/v1')

Source code in orchestrator/llm/openai_client.py
6
7
def __init__(self, model_name: str, api_key: str, base_url: str = "https://api.openai.com/v1"):
    super().__init__(model_name, base_url, api_key)

predict(system_prompt, user_prompt)

Source code in orchestrator/llm/openai_client.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def predict(self, system_prompt: str, user_prompt: str) -> str:
    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json"
    }
    data = {
        "model": self.model_name,
        "messages": [
            {
                "role": "system",
                "content": system_prompt
            },
            {
                "role": "user",
                "content": user_prompt
            }
        ],
        "max_tokens": 1000
    }
    response = requests.post(f"{self.base_url}/chat/completions", headers=headers, json=data)
    response.raise_for_status()
    result = response.json()
    return result['choices'][0]['message']['content'].strip()

Approval Handlers

ApprovalHandler

Source code in orchestrator/core/handlers.py
1
2
3
class ApprovalHandler:
    def request_approval(self, tool_name: str, arguments: dict) -> bool:
        raise NotImplementedError()

Functions

request_approval(tool_name, arguments)

Source code in orchestrator/core/handlers.py
2
3
def request_approval(self, tool_name: str, arguments: dict) -> bool:
    raise NotImplementedError()

CliHandler

Bases: ApprovalHandler

Source code in orchestrator/core/handlers.py
5
6
7
class CliHandler(ApprovalHandler):
    def request_approval(self, tool_name: str, arguments: dict) -> bool:
        return input(f"Approve execution for tool {tool_name} with arguments {arguments}? (y/n): ").lower() == 'y'

Functions

request_approval(tool_name, arguments)

Source code in orchestrator/core/handlers.py
6
7
def request_approval(self, tool_name: str, arguments: dict) -> bool:
    return input(f"Approve execution for tool {tool_name} with arguments {arguments}? (y/n): ").lower() == 'y'