-
Notifications
You must be signed in to change notification settings - Fork 865
Expand file tree
/
Copy pathrag_simple.py
More file actions
138 lines (125 loc) · 5.27 KB
/
rag_simple.py
File metadata and controls
138 lines (125 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from collections.abc import AsyncGenerator
from typing import Optional, Union
from agents import Agent, ItemHelpers, ModelSettings, OpenAIChatCompletionsModel, Runner, set_tracing_disabled
from openai import AsyncAzureOpenAI, AsyncOpenAI
from openai.types.responses import ResponseInputItemParam, ResponseTextDeltaEvent
from fastapi_app.api_models import (
AIChatRoles,
ChatRequestOverrides,
ItemPublic,
Message,
RAGContext,
RetrievalResponse,
RetrievalResponseDelta,
ThoughtStep,
)
from fastapi_app.postgres_searcher import PostgresSearcher
from fastapi_app.rag_base import RAGChatBase
set_tracing_disabled(disabled=True)
class SimpleRAGChat(RAGChatBase):
def __init__(
self,
*,
messages: list[ResponseInputItemParam],
overrides: ChatRequestOverrides,
searcher: PostgresSearcher,
openai_chat_client: Union[AsyncOpenAI, AsyncAzureOpenAI],
chat_model: str,
chat_deployment: Optional[str], # Not needed for non-Azure OpenAI
):
self.searcher = searcher
self.chat_params = self.get_chat_params(messages, overrides)
self.model_for_thoughts = (
{"model": chat_model, "deployment": chat_deployment} if chat_deployment else {"model": chat_model}
)
openai_agents_model = OpenAIChatCompletionsModel(
model=chat_model if chat_deployment is None else chat_deployment, openai_client=openai_chat_client
)
self.answer_agent = Agent(
name="Answerer",
instructions=self.answer_prompt_template,
model=openai_agents_model,
model_settings=ModelSettings(
temperature=self.chat_params.temperature,
max_tokens=self.chat_params.response_token_limit,
extra_body={"seed": self.chat_params.seed} if self.chat_params.seed is not None else {},
),
)
async def prepare_context(self) -> tuple[list[ItemPublic], list[ThoughtStep]]:
"""Retrieve relevant rows from the database and build a context for the chat model."""
results = await self.searcher.search_and_embed(
self.chat_params.original_user_query,
top=self.chat_params.top,
enable_vector_search=self.chat_params.enable_vector_search,
enable_text_search=self.chat_params.enable_text_search,
)
items = [ItemPublic.model_validate(item.to_dict()) for item in results]
thoughts = [
ThoughtStep(
title="Search query for database",
description=self.chat_params.original_user_query,
props={
"top": self.chat_params.top,
"vector_search": self.chat_params.enable_vector_search,
"text_search": self.chat_params.enable_text_search,
},
),
ThoughtStep(
title="Search results",
description=items,
),
]
return items, thoughts
async def answer(
self,
items: list[ItemPublic],
earlier_thoughts: list[ThoughtStep],
) -> RetrievalResponse:
run_results = await Runner.run(
self.answer_agent,
input=self.chat_params.past_messages
+ [{"content": self.prepare_rag_request(self.chat_params.original_user_query, items), "role": "user"}],
)
return RetrievalResponse(
message=Message(content=str(run_results.final_output), role=AIChatRoles.ASSISTANT),
context=RAGContext(
data_points={item.id: item for item in items},
thoughts=earlier_thoughts
+ [
ThoughtStep(
title="Prompt to generate answer",
description=[{"content": self.answer_prompt_template}]
+ ItemHelpers.input_to_new_input_list(run_results.input),
props=self.model_for_thoughts,
),
],
),
)
async def answer_stream(
self,
items: list[ItemPublic],
earlier_thoughts: list[ThoughtStep],
) -> AsyncGenerator[RetrievalResponseDelta, None]:
run_results = Runner.run_streamed(
self.answer_agent,
input=self.chat_params.past_messages
+ [{"content": self.prepare_rag_request(self.chat_params.original_user_query, items), "role": "user"}],
)
yield RetrievalResponseDelta(
context=RAGContext(
data_points={item.id: item for item in items},
thoughts=earlier_thoughts
+ [
ThoughtStep(
title="Prompt to generate answer",
description=[{"content": self.answer_agent.instructions}]
+ ItemHelpers.input_to_new_input_list(run_results.input),
props=self.model_for_thoughts,
),
],
),
)
async for event in run_results.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
yield RetrievalResponseDelta(delta=Message(content=str(event.data.delta), role=AIChatRoles.ASSISTANT))
return