Skip to content

Commit 4e47a58

Browse files
author
AgentPatterns
committed
feat(examples/python): add runnable example
1 parent 26b040f commit 4e47a58

File tree

6 files changed

+741
-0
lines changed

6 files changed

+741
-0
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Orchestrator Agent - Python Implementation
2+
3+
Runnable implementation of an orchestrator agent that plans sub-tasks,
4+
runs workers in parallel, and composes one final operations answer.
5+
6+
---
7+
8+
## Quick start
9+
10+
```bash
11+
# (optional) create venv
12+
python -m venv .venv && source .venv/bin/activate
13+
14+
# install dependencies
15+
pip install -r requirements.txt
16+
17+
# set API key
18+
export OPENAI_API_KEY="sk-..."
19+
20+
# run the agent
21+
python main.py
22+
```
23+
24+
## Full walkthrough
25+
26+
Read the complete implementation guide:
27+
https://agentpatterns.tech/en/agent-patterns/orchestrator-agent
28+
29+
## What's inside
30+
31+
- Plan step (`kind=plan`) with strict schema validation
32+
- Separate policy and execution allowlists for workers
33+
- Parallel dispatch with per-task timeout and retry-on-timeout
34+
- Global runtime deadline enforced in gateway dispatch
35+
- Critical vs non-critical task handling
36+
- Final synthesis after aggregation
37+
- Execution trace and stop reasons for debugging
38+
39+
## Project layout
40+
41+
```text
42+
examples/
43+
agent-patterns/
44+
orchestrator-agent/
45+
python/
46+
README.md
47+
main.py
48+
llm.py
49+
gateway.py
50+
workers.py
51+
requirements.txt
52+
```
53+
54+
## Notes
55+
56+
- Code and README are English-only by design.
57+
- The website provides multilingual explanations and theory.
58+
59+
## License
60+
61+
MIT
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
from __future__ import annotations
2+
3+
import hashlib
4+
import json
5+
import threading
6+
import time
7+
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
8+
from dataclasses import dataclass
9+
from typing import Any, Callable
10+
11+
12+
class StopRun(Exception):
13+
def __init__(self, reason: str):
14+
super().__init__(reason)
15+
self.reason = reason
16+
17+
18+
@dataclass(frozen=True)
19+
class Budget:
20+
max_tasks: int = 4
21+
max_parallel: int = 3
22+
max_retries_per_task: int = 1
23+
max_dispatches: int = 8
24+
task_timeout_seconds: float = 2.0
25+
max_seconds: int = 25
26+
27+
28+
def args_hash(args: dict[str, Any]) -> str:
29+
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
30+
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
31+
32+
33+
def validate_orchestration_plan(
34+
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
35+
) -> list[dict[str, Any]]:
36+
if not isinstance(raw_plan, dict):
37+
raise StopRun("invalid_plan:non_json")
38+
if raw_plan.get("kind") != "plan":
39+
raise StopRun("invalid_plan:kind")
40+
41+
tasks = raw_plan.get("tasks")
42+
if not isinstance(tasks, list):
43+
raise StopRun("invalid_plan:tasks")
44+
if not (1 <= len(tasks) <= max_tasks):
45+
raise StopRun("invalid_plan:max_tasks")
46+
47+
normalized: list[dict[str, Any]] = []
48+
seen_ids: set[str] = set()
49+
required_keys = {"id", "worker", "args", "critical"}
50+
51+
for task in tasks:
52+
if not isinstance(task, dict):
53+
raise StopRun("invalid_plan:task_shape")
54+
if not required_keys.issubset(task.keys()):
55+
raise StopRun("invalid_plan:missing_keys")
56+
57+
# Ignore unknown keys and keep only contract fields.
58+
task_id = task["id"]
59+
worker = task["worker"]
60+
args = task["args"]
61+
critical = task["critical"]
62+
63+
if not isinstance(task_id, str) or not task_id.strip():
64+
raise StopRun("invalid_plan:task_id")
65+
if task_id in seen_ids:
66+
raise StopRun("invalid_plan:duplicate_task_id")
67+
seen_ids.add(task_id)
68+
69+
if not isinstance(worker, str) or not worker.strip():
70+
raise StopRun("invalid_plan:worker")
71+
if worker not in allowed_workers:
72+
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
73+
74+
if not isinstance(args, dict):
75+
raise StopRun("invalid_plan:args")
76+
if not isinstance(critical, bool):
77+
raise StopRun("invalid_plan:critical")
78+
79+
normalized.append(
80+
{
81+
"id": task_id.strip(),
82+
"worker": worker.strip(),
83+
"args": dict(args),
84+
"critical": critical,
85+
}
86+
)
87+
88+
return normalized
89+
90+
91+
class OrchestratorGateway:
92+
def __init__(
93+
self,
94+
*,
95+
allow: set[str],
96+
registry: dict[str, Callable[..., dict[str, Any]]],
97+
budget: Budget,
98+
) -> None:
99+
self.allow = set(allow)
100+
self.registry = registry
101+
self.budget = budget
102+
self.dispatches = 0
103+
self._lock = threading.Lock()
104+
105+
def _consume_dispatch_budget(self) -> None:
106+
with self._lock:
107+
self.dispatches += 1
108+
if self.dispatches > self.budget.max_dispatches:
109+
raise StopRun("max_dispatches")
110+
111+
def _call_once(
112+
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
113+
) -> dict[str, Any]:
114+
if worker_name not in self.allow:
115+
raise StopRun(f"worker_denied:{worker_name}")
116+
fn = self.registry.get(worker_name)
117+
if fn is None:
118+
raise StopRun(f"worker_missing:{worker_name}")
119+
120+
remaining = deadline_monotonic - time.monotonic()
121+
if remaining <= 0:
122+
raise StopRun("max_seconds")
123+
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
124+
125+
with ThreadPoolExecutor(max_workers=1) as pool:
126+
future = pool.submit(fn, **args)
127+
try:
128+
result = future.result(timeout=task_timeout)
129+
except TimeoutError as exc:
130+
raise StopRun("task_timeout") from exc
131+
except TypeError as exc:
132+
raise StopRun(f"worker_bad_args:{worker_name}") from exc
133+
134+
if not isinstance(result, dict):
135+
raise StopRun(f"worker_bad_result:{worker_name}")
136+
return result
137+
138+
def _run_task_with_retry(
139+
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
140+
) -> dict[str, Any]:
141+
task_id = task["id"]
142+
worker_name = task["worker"]
143+
semantic_args = dict(task["args"])
144+
semantic_hash = args_hash(semantic_args)
145+
base_args = dict(semantic_args)
146+
base_args["request_id"] = request_id
147+
148+
attempts_total = self.budget.max_retries_per_task + 1
149+
last_reason = "unknown"
150+
151+
for attempt in range(1, attempts_total + 1):
152+
try:
153+
self._consume_dispatch_budget()
154+
observation = self._call_once(
155+
worker_name, base_args, deadline_monotonic=deadline_monotonic
156+
)
157+
return {
158+
"task_id": task_id,
159+
"worker": worker_name,
160+
"critical": task["critical"],
161+
"status": "done",
162+
"attempts_used": attempt,
163+
"retried": attempt > 1,
164+
"args_hash": semantic_hash,
165+
"observation": observation,
166+
}
167+
except StopRun as exc:
168+
last_reason = exc.reason
169+
if exc.reason == "task_timeout" and attempt < attempts_total:
170+
continue
171+
return {
172+
"task_id": task_id,
173+
"worker": worker_name,
174+
"critical": task["critical"],
175+
"status": "failed",
176+
"attempts_used": attempt,
177+
"retried": attempt > 1,
178+
"args_hash": semantic_hash,
179+
"stop_reason": last_reason,
180+
}
181+
182+
return {
183+
"task_id": task_id,
184+
"worker": worker_name,
185+
"critical": task["critical"],
186+
"status": "failed",
187+
"attempts_used": attempts_total,
188+
"retried": True,
189+
"args_hash": semantic_hash,
190+
"stop_reason": last_reason,
191+
}
192+
193+
def dispatch_parallel(
194+
self,
195+
tasks: list[dict[str, Any]],
196+
*,
197+
request_id: str,
198+
deadline_monotonic: float,
199+
) -> list[dict[str, Any]]:
200+
if not tasks:
201+
return []
202+
203+
indexed_tasks = list(enumerate(tasks))
204+
output: list[tuple[int, dict[str, Any]]] = []
205+
parallelism = min(self.budget.max_parallel, len(tasks))
206+
207+
with ThreadPoolExecutor(max_workers=parallelism) as pool:
208+
future_to_idx = {
209+
pool.submit(
210+
self._run_task_with_retry, task, request_id, deadline_monotonic
211+
): idx
212+
for idx, task in indexed_tasks
213+
}
214+
remaining = deadline_monotonic - time.monotonic()
215+
if remaining <= 0:
216+
raise StopRun("max_seconds")
217+
try:
218+
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
219+
idx = future_to_idx[future]
220+
output.append((idx, future.result()))
221+
except TimeoutError as exc:
222+
raise StopRun("max_seconds") from exc
223+
224+
output.sort(key=lambda item: item[0])
225+
return [item[1] for item in output]

0 commit comments

Comments
 (0)