|
| 1 | +import time |
| 2 | +from http import HTTPStatus |
| 3 | + |
| 4 | +import requests |
| 5 | + |
| 6 | +from groundlight.client import EdgeNotAvailableError |
| 7 | +from groundlight.edge.config import EdgeEndpointConfig |
| 8 | + |
| 9 | +_EDGE_METHOD_UNAVAILABLE_HINT = ( |
| 10 | + "Make sure the client is pointed at a running Edge Endpoint " |
| 11 | + "(via GROUNDLIGHT_ENDPOINT env var or the endpoint= constructor arg)." |
| 12 | +) |
| 13 | + |
| 14 | + |
| 15 | +class EdgeEndpointApi: |
| 16 | + """ |
| 17 | + Namespace for operations that are specific to the Edge Endpoint, |
| 18 | + such as setting and getting the EdgeEndpoint configuration. |
| 19 | + """ |
| 20 | + |
| 21 | + def __init__(self, client) -> None: |
| 22 | + self._client = client |
| 23 | + |
| 24 | + def _base_url(self) -> str: |
| 25 | + return self._client.edge_base_url() |
| 26 | + |
| 27 | + def _request(self, method: str, path: str, **kwargs) -> requests.Response: |
| 28 | + url = f"{self._base_url()}{path}" |
| 29 | + headers = self._client.get_raw_headers() |
| 30 | + try: |
| 31 | + response = requests.request( |
| 32 | + method, url, headers=headers, verify=self._client.configuration.verify_ssl, timeout=10, **kwargs |
| 33 | + ) |
| 34 | + response.raise_for_status() |
| 35 | + except requests.exceptions.HTTPError as e: |
| 36 | + if e.response is not None and e.response.status_code == HTTPStatus.NOT_FOUND: |
| 37 | + raise EdgeNotAvailableError( |
| 38 | + f"Edge method not available at {url}. {_EDGE_METHOD_UNAVAILABLE_HINT}" |
| 39 | + ) from e |
| 40 | + raise |
| 41 | + except requests.exceptions.ConnectionError as e: |
| 42 | + raise EdgeNotAvailableError( |
| 43 | + f"Could not connect to {self._base_url()}. {_EDGE_METHOD_UNAVAILABLE_HINT}" |
| 44 | + ) from e |
| 45 | + return response |
| 46 | + |
| 47 | + def get_config(self) -> EdgeEndpointConfig: |
| 48 | + """Retrieve the active edge endpoint configuration.""" |
| 49 | + response = self._request("GET", "/edge-config") |
| 50 | + return EdgeEndpointConfig.from_payload(response.json()) |
| 51 | + |
| 52 | + def get_detector_readiness(self) -> dict[str, bool]: |
| 53 | + """Check which configured detectors have inference pods ready to serve. |
| 54 | +
|
| 55 | + :return: Dict mapping detector_id to readiness (True/False). |
| 56 | + """ |
| 57 | + response = self._request("GET", "/edge-detector-readiness") |
| 58 | + return {det_id: info["ready"] for det_id, info in response.json().items()} |
| 59 | + |
| 60 | + def set_config( |
| 61 | + self, |
| 62 | + config: EdgeEndpointConfig, |
| 63 | + timeout_sec: float = 600, |
| 64 | + ) -> EdgeEndpointConfig: |
| 65 | + """Replace the edge endpoint configuration and wait until all detectors are ready. |
| 66 | +
|
| 67 | + :param config: The new configuration to apply. |
| 68 | + :param timeout_sec: Max seconds to wait for all detectors to become ready. |
| 69 | + :return: The applied configuration as reported by the edge endpoint. |
| 70 | + """ |
| 71 | + self._request("PUT", "/edge-config", json=config.to_payload()) |
| 72 | + |
| 73 | + desired_ids = {d.detector_id for d in config.detectors} |
| 74 | + if not desired_ids: |
| 75 | + return self.get_config() |
| 76 | + |
| 77 | + deadline = time.time() + timeout_sec |
| 78 | + while time.time() < deadline: |
| 79 | + readiness = self.get_detector_readiness() |
| 80 | + if all(readiness.get(did, False) for did in desired_ids): |
| 81 | + return self.get_config() |
| 82 | + time.sleep(1) |
| 83 | + |
| 84 | + raise TimeoutError( |
| 85 | + f"Edge detectors were not all ready within {timeout_sec}s. " |
| 86 | + "The edge endpoint may still be converging, or may have encountered an error." |
| 87 | + ) |
0 commit comments