-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandler.py
More file actions
113 lines (92 loc) · 4.12 KB
/
handler.py
File metadata and controls
113 lines (92 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from functools import partial
import logging
from typing import Any, List, Optional, Type, get_args, get_origin
import httpx
from pydantic import BaseModel, PrivateAttr, ValidationError
from ..http_client import APIHttpClient
from .operations import APIOperation
log = logging.getLogger(__name__)
class _APIOperationExecutor:
_http_client: Optional[APIHttpClient] = PrivateAttr(default=None)
def __getattribute__(self, name: str) -> Any:
attr = super().__getattribute__(name)
if isinstance(attr, APIOperation):
return partial(self._execute_operation, operation=attr)
return attr
async def _execute_operation(self, operation: APIOperation, **kwargs: Any) -> Any:
handler = APIRequestHandler(executor=self, operation=operation, kwargs=kwargs)
return await handler.execute()
class APIRequestHandler:
def __init__(
self, executor: _APIOperationExecutor, operation: APIOperation, kwargs: dict
):
self.executor = executor
self.operation = operation
self.kwargs = kwargs
self.http_client = executor._http_client
async def execute(self) -> Any:
endpoint, request_kwargs = self._prepare_request_args()
response = await self._make_request(
self.operation.method, endpoint, **request_kwargs
)
return await self._parse_and_validate_response(
response, self.operation.response_model, endpoint
)
def _prepare_request_args(self) -> tuple[str, dict]:
format_args = {}
if isinstance(self.executor, BaseModel):
format_args.update(self.executor.model_dump())
format_args.update(self.kwargs)
endpoint = self.operation.endpoint_template.format(**format_args)
payload = None
if json_data_obj := self.kwargs.get("data"):
if isinstance(json_data_obj, BaseModel):
payload = json_data_obj.model_dump(exclude_none=True)
request_kwargs = {"params": self.kwargs.get("params"), "json": payload}
return endpoint, {k: v for k, v in request_kwargs.items() if v is not None}
async def _make_request(
self, method: str, endpoint: str, **kwargs: Any
) -> httpx.Response:
if not self.http_client:
raise RuntimeError("HTTP Client is not initialized.")
return await self.http_client.request(
method=method, endpoint=endpoint, **kwargs
)
def _inject_client_into_model(self, model_instance: BaseModel) -> BaseModel:
if hasattr(model_instance, "_http_client"):
model_instance._http_client = self.http_client
return model_instance
async def _parse_and_validate_response(
self,
response: httpx.Response,
response_model: Type[BaseModel] | Type[List[BaseModel]] | None,
endpoint_for_logging: str,
) -> Any:
if response_model is None:
return None
try:
json_response = response.json()
log.debug(f"Validating JSON response for endpoint: {endpoint_for_logging}")
except httpx.ResponseNotRead:
log.warning(f"No JSON response body for {endpoint_for_logging}")
return None
try:
origin = get_origin(response_model)
if origin is list or origin is List:
item_model = get_args(response_model)[0]
instances = [item_model.model_validate(item) for item in json_response]
for instance in instances:
self._inject_client_into_model(instance)
log.debug("Successfully validated response into list of models.")
return instances
else:
instance = response_model.model_validate(json_response)
self._inject_client_into_model(instance)
log.debug("Successfully validated response into single model.")
return instance
except ValidationError as e:
log.error(
f"Pydantic validation failed for {endpoint_for_logging}. Error: {e}"
)
log.error(f"Failing JSON data: {json_response}")
raise e