Skip to content

Commit e134707

Browse files
author
AgentPatterns
committed
feat(examples/python): add research-agent runnable example
1 parent d7d1c13 commit e134707

7 files changed

Lines changed: 771 additions & 0 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Research Agent (Python)
2+
3+
Runnable production-style demo of a bounded research pipeline:
4+
5+
1. Search
6+
2. Dedupe URLs
7+
3. Policy check
8+
4. Read and extract notes with provenance
9+
5. Verify notes
10+
6. Synthesize grounded answer with citations
11+
12+
Run:
13+
14+
```bash
15+
python main.py
16+
```
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
6+
def propose_research_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
7+
del goal
8+
query = request["request"]["question"]
9+
return {
10+
"steps": [
11+
{
12+
"id": "r1",
13+
"action": "search_sources",
14+
"args": {
15+
"query": query,
16+
},
17+
},
18+
{"id": "r2", "action": "dedupe_urls", "args": {}},
19+
{"id": "r3", "action": "read_extract_notes", "args": {}},
20+
{"id": "r4", "action": "verify_notes", "args": {}},
21+
{"id": "r5", "action": "synthesize_answer", "args": {}},
22+
]
23+
}
24+
25+
26+
def synthesize_from_notes(*, goal: str, notes: list[dict[str, Any]]) -> dict[str, Any]:
27+
del goal
28+
if not notes:
29+
return {
30+
"answer": "",
31+
"citations": [],
32+
}
33+
34+
selected = notes[:3]
35+
citations = [str(item["id"]) for item in selected]
36+
37+
claims = [str(item["claim"]).strip() for item in selected]
38+
answer = (
39+
"Research brief: "
40+
+ " ".join(claims)
41+
+ " Timeline values are estimates and may change."
42+
)
43+
44+
return {
45+
"answer": answer,
46+
"citations": citations,
47+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
6+
def build_request(*, report_date: str, region: str) -> dict[str, Any]:
7+
return {
8+
"request": {
9+
"report_date": report_date,
10+
"region": region.upper(),
11+
"question": (
12+
"What is the current US payments incident status and what enterprise SLA "
13+
"commitments apply for uptime and P1 response time?"
14+
),
15+
},
16+
"policy_hints": {
17+
"allowed_domains_policy": [
18+
"official-status.example.com",
19+
"vendor.example.com",
20+
"regulator.example.org",
21+
],
22+
"allowed_domains_execution": [
23+
"official-status.example.com",
24+
"vendor.example.com",
25+
],
26+
"max_urls": 6,
27+
"max_read_pages": 3,
28+
"max_notes": 6,
29+
"max_answer_chars": 850,
30+
},
31+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from typing import Any
5+
from urllib.parse import urlparse
6+
7+
8+
class StopRun(Exception):
9+
def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
10+
super().__init__(reason)
11+
self.reason = reason
12+
self.details = details or {}
13+
14+
15+
@dataclass(frozen=True)
16+
class Budget:
17+
max_seconds: int = 25
18+
max_steps: int = 8
19+
max_urls: int = 6
20+
max_read_pages: int = 3
21+
max_notes: int = 6
22+
max_answer_chars: int = 850
23+
24+
25+
@dataclass(frozen=True)
26+
class Decision:
27+
kind: str
28+
reason: str
29+
30+
31+
EXPECTED_ACTION_SEQUENCE = [
32+
"search_sources",
33+
"dedupe_urls",
34+
"read_extract_notes",
35+
"verify_notes",
36+
"synthesize_answer",
37+
]
38+
39+
40+
def normalize_url(url: str) -> str:
41+
parsed = urlparse(str(url).strip())
42+
scheme = (parsed.scheme or "https").lower()
43+
host = parsed.netloc.lower()
44+
path = parsed.path or "/"
45+
if path != "/" and path.endswith("/"):
46+
path = path[:-1]
47+
return f"{scheme}://{host}{path}"
48+
49+
50+
def get_domain(url: str) -> str:
51+
return urlparse(str(url).strip()).netloc.lower()
52+
53+
54+
def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
55+
if not isinstance(raw_steps, list) or not raw_steps:
56+
raise StopRun("invalid_plan:steps")
57+
if len(raw_steps) > max_steps:
58+
raise StopRun("invalid_plan:too_many_steps")
59+
60+
out: list[dict[str, Any]] = []
61+
actions: list[str] = []
62+
63+
for raw in raw_steps:
64+
if not isinstance(raw, dict):
65+
raise StopRun("invalid_step:not_object")
66+
step_id = raw.get("id")
67+
action = raw.get("action")
68+
args = raw.get("args")
69+
70+
if not isinstance(step_id, str) or not step_id.strip():
71+
raise StopRun("invalid_step:id")
72+
if not isinstance(action, str) or not action.strip():
73+
raise StopRun("invalid_step:action")
74+
if not isinstance(args, dict):
75+
raise StopRun("invalid_step:args")
76+
77+
normalized = {
78+
"id": step_id.strip(),
79+
"action": action.strip(),
80+
"args": dict(args),
81+
}
82+
out.append(normalized)
83+
actions.append(normalized["action"])
84+
85+
if actions != EXPECTED_ACTION_SEQUENCE:
86+
raise StopRun(
87+
"invalid_plan:step_sequence",
88+
details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
89+
)
90+
91+
return out
92+
93+
94+
def dedupe_urls(*, raw_urls: list[str], max_urls: int) -> list[str]:
95+
seen: set[str] = set()
96+
out: list[str] = []
97+
for raw in raw_urls:
98+
normalized = normalize_url(raw)
99+
if normalized in seen:
100+
continue
101+
seen.add(normalized)
102+
out.append(normalized)
103+
if len(out) >= max_urls:
104+
break
105+
return out
106+
107+
108+
class ResearchGateway:
109+
def __init__(
110+
self,
111+
*,
112+
allowed_domains_policy: set[str],
113+
allowed_domains_execution: set[str],
114+
budget: Budget,
115+
):
116+
self.allowed_domains_policy = {d.lower() for d in allowed_domains_policy}
117+
self.allowed_domains_execution = {d.lower() for d in allowed_domains_execution}
118+
self.budget = budget
119+
120+
def evaluate_source(self, *, url: str) -> Decision:
121+
domain = get_domain(url)
122+
if domain not in self.allowed_domains_policy:
123+
return Decision(kind="deny", reason="source_denied_policy")
124+
if domain not in self.allowed_domains_execution:
125+
return Decision(kind="deny", reason="source_denied_execution")
126+
return Decision(kind="allow", reason="policy_pass")
127+
128+
def validate_notes(self, *, notes: list[dict[str, Any]]) -> None:
129+
if not isinstance(notes, list) or not notes:
130+
raise StopRun("invalid_notes:empty")
131+
if len(notes) > self.budget.max_notes:
132+
raise StopRun("invalid_notes:too_many")
133+
134+
for note in notes:
135+
if not isinstance(note, dict):
136+
raise StopRun("invalid_notes:item")
137+
if not isinstance(note.get("id"), str) or not note["id"].strip():
138+
raise StopRun("invalid_notes:id")
139+
if not isinstance(note.get("url"), str) or not note["url"].strip():
140+
raise StopRun("invalid_notes:url")
141+
if not isinstance(note.get("claim"), str) or not note["claim"].strip():
142+
raise StopRun("invalid_notes:claim")
143+
quote = note.get("quote")
144+
if not isinstance(quote, str) or len(quote.strip()) < 20:
145+
raise StopRun("invalid_notes:quote")
146+
147+
def validate_synthesis(self, *, answer: str, citations: list[str], notes: list[dict[str, Any]]) -> None:
148+
if not isinstance(answer, str) or not answer.strip():
149+
raise StopRun("invalid_answer:empty")
150+
if len(answer) > self.budget.max_answer_chars:
151+
raise StopRun("invalid_answer:too_long")
152+
153+
if not isinstance(citations, list) or not citations:
154+
raise StopRun("invalid_answer:citations")
155+
156+
note_ids = {str(item["id"]) for item in notes}
157+
for citation in citations:
158+
if str(citation) not in note_ids:
159+
raise StopRun("invalid_answer:citation_unknown")

0 commit comments

Comments
 (0)