diff --git a/.deepsource.toml b/.deepsource.toml new file mode 100644 index 0000000..01a1066 --- /dev/null +++ b/.deepsource.toml @@ -0,0 +1,9 @@ +version = 1 + +[[analyzers]] +name = "python" +enabled = true + + [analyzers.meta] + runtime_version = "3.x.x" + max_line_length = 200 diff --git a/.github/workflows/api-validation.yml b/.github/workflows/api-validation.yml new file mode 100644 index 0000000..51ed331 --- /dev/null +++ b/.github/workflows/api-validation.yml @@ -0,0 +1,132 @@ +name: API Validation with Schemathesis + +on: + pull_request: + push: + branches: [ main ] + +jobs: + schemathesis: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Checkout schema validator repository + uses: actions/checkout@v4 + with: + repository: doe-iri/iri-facility-api-docs + ref: main + path: schema-validator + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" + + - name: Install uv + run: pip install uv + + - name: Build an image + run: docker build --platform=linux/amd64 -t iri-facility-api-base . + + - name: Run Facility API container + run: | + docker run -d \ + -p 8000:8000 \ + --platform=linux/amd64 \ + --name iri-facility-api-base \ + -e IRI_API_ADAPTER_facility=app.demo_adapter.DemoAdapter \ + -e IRI_API_ADAPTER_status=app.demo_adapter.DemoAdapter \ + -e IRI_API_ADAPTER_account=app.demo_adapter.DemoAdapter \ + -e IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \ + -e IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \ + -e IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \ + -e API_URL_ROOT=http://127.0.0.1:8000 \ + -e IRI_API_TOKEN=12345 \ + iri-facility-api-base + + - name: Wait for API to be ready + run: | + for i in {1..60}; do + if curl -fs http://127.0.0.1:8000/openapi.json; then + echo "API ready" + exit 0 + fi + sleep 2 + done + echo "API did not start" + exit 1 + + - name: Create venv & install validator dependencies + run: | + uv venv + source .venv/bin/activate + uv pip install -r schema-validator/verification/requirements.txt + + - name: Run Schemathesis validation (local spec) + id: schemathesis_local + env: + IRI_API_TOKEN: "12345" # This is dummy token for testing (mock adapter) + run: | + set +e + source .venv/bin/activate + python schema-validator/verification/api-validator.py \ + --baseurl http://127.0.0.1:8000 \ + --report-name schemathesis-local + echo "exitcode=$?" >> $GITHUB_OUTPUT + + - name: Run Schemathesis validation (official spec) + id: schemathesis_official + env: + IRI_API_TOKEN: "12345" + run: | + set +e + source .venv/bin/activate + python schema-validator/verification/api-validator.py \ + --baseurl http://localhost:8000 \ + --schema-url https://raw.githubusercontent.com/doe-iri/iri-facility-api-docs/refs/heads/main/specification/openapi/openapi_iri_facility_api_v1.json \ + --report-name schemathesis-official + echo "exitcode=$?" >> $GITHUB_OUTPUT + + - name: Fail if any Schemathesis run failed + if: always() + run: | + if [ "${{ steps.schemathesis_local.outputs.exitcode }}" != "0" ] || \ + [ "${{ steps.schemathesis_official.outputs.exitcode }}" != "0" ]; then + echo "One or more Schemathesis validations failed" + exit 1 + else + echo "Both Schemathesis validations passed" + fi + + - name: Upload Schemathesis report # This only works on git actions + if: always() && env.ACT != 'true' + uses: actions/upload-artifact@v4 + with: + if-no-files-found: warn + name: schemathesis-report + path: | + schemathesis-local.html + schemathesis-local.xml + schemathesis-official.html + schemathesis-official.xml + + - name: Save Schemathesis reports locally # This only works if run locally with act + if: always() && env.ACT == 'true' + run: | + mkdir -p artifacts + cp schemathesis-local.html schemathesis-local.xml artifacts/ || true + cp schemathesis-official.html schemathesis-official.xml artifacts/ || true + + - name: Dump API logs + if: always() + run: docker logs iri-facility-api-base || true + + - name: Stop container + if: always() + run: docker stop iri-facility-api-base || true diff --git a/Dockerfile b/Dockerfile index f3ad071..93c80d9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3 +FROM python:3.14 RUN mkdir /app COPY . /app diff --git a/Makefile b/Makefile index 5bd9034..abd87e4 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,12 @@ dev : .venv @source ./.venv/bin/activate && \ + IRI_API_ADAPTER_facility=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_status=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_account=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \ + OPENTELEMETRY_ENABLED=true \ API_URL_ROOT='http://127.0.0.1:8000' fastapi dev diff --git a/README.md b/README.md index 5ef41a6..02b796c 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ If using docker (see next section), your dockerfile could extend this reference - `API_URL_ROOT`: the base url when constructing links returned by the api (eg.: https://iri.myfacility.com) - `API_PREFIX`: the path prefix where the api is hosted. Defaults to `/`. (eg.: `/api`) - `API_URL`: the path to the api itself. Defaults to `api/v1`. +- `OPENTELEMETRY_ENABLED`: Enables OpenTelemetry. If enabled, the application will use OpenTelemetry SDKs and emit traces, metrics, and logs. Default to false +- `OTLP_ENDPOINT`: OpenTelemetry Protocol collector endpoint to export telemetry data. If empty or not set, telemetry data is logged locally to log file. Default: "" Links to data, created by this api, will concatenate these values producing links, eg: `https://iri.myfacility.com/my_api_prefix/my_api_url/projects/123` diff --git a/VALIDATION.MD b/VALIDATION.MD new file mode 100644 index 0000000..26f77ef --- /dev/null +++ b/VALIDATION.MD @@ -0,0 +1,23 @@ +# API Validation with Schemathesis + +On every pull request or push to `main` branch, Github Actions run the following steps below that validates an IRI Facility API implementation against OpenAPI spec using Schemathesis. + +1. Builds the Facility API Docker image from Dockerfile. +2. Runs the API container with demo adapter. +3. Waits for `/openapi.json` to become available on localhost:8000. +4. Runs Schemathesis validation twice: + - Against Facilities API’s OpenAPI spec. (http://localhost:8000/openapi.json) + - Against the official IRI Facility API OpenAPI spec. (https://github.com/doe-iri/iri-facility-api-docs/blob/main/specification/openapi/openapi_iri_facility_api_v1.json) +5. Fails the workflow if either validation fails. +6. Saves Schemathesis HTML/XML reports as artifacts (or saves it locally when run with `act`). +7. Dumps API container logs and do clean up to stop container. + +## Running locally + +```bash +act -W .github/workflows/api-validator.yml -s GITHUB_TOKEN= +``` + +## Known issues + +Python implementation not fully aligns with the official Specification. Running against Official Spec will continue to fail, until Spec or Py implementation is fixed. diff --git a/app/config.py b/app/config.py index 078b6a5..71a7c20 100644 --- a/app/config.py +++ b/app/config.py @@ -40,3 +40,8 @@ API_URL_ROOT = os.environ.get("API_URL_ROOT", "https://api.iri.nersc.gov") API_PREFIX = os.environ.get("API_PREFIX", "/") API_URL = os.environ.get("API_URL", "api/v1") + +OPENTELEMETRY_ENABLED = os.environ.get("OPENTELEMETRY_ENABLED", "false").lower() == "true" +OPENTELEMETRY_DEBUG = os.environ.get("OPENTELEMETRY_DEBUG", "false").lower() == "true" +OTLP_ENDPOINT = os.environ.get("OTLP_ENDPOINT", "") +OTEL_SAMPLE_RATE = float(os.environ.get("OTEL_SAMPLE_RATE", "0.2")) diff --git a/app/demo_adapter.py b/app/demo_adapter.py index 9cbc796..50a9267 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -1,7 +1,6 @@ import datetime import random import uuid -import time import os import stat import pwd @@ -10,9 +9,11 @@ import subprocess import pathlib import base64 -from pydantic import BaseModel from typing import Any, Tuple +from pydantic import BaseModel from fastapi import HTTPException +from .routers.common import AllocationUnit, Capability +from .routers.facility import models as facility_models, facility_adapter as facility_adapter from .routers.status import models as status_models, facility_adapter as status_adapter from .routers.account import models as account_models, facility_adapter as account_adapter from .routers.compute import models as compute_models, facility_adapter as compute_adapter @@ -35,14 +36,28 @@ def get_base_temp_dir(cls): os.makedirs(cls._base_temp_dir, exist_ok=True) # create a test file - with open(f"{cls._base_temp_dir}/test.txt", "w") as f: + with open(f"{cls._base_temp_dir}/test.txt", encoding="utf-8", mode="w") as f: f.write("hello world") return cls._base_temp_dir +def demo_uuid(kind: str, name: str) -> str: + return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"demo:{kind}:{name}")) + + +def utc_now() -> datetime.datetime: + """Return current UTC datetime timestamp""" + return datetime.datetime.now(datetime.timezone.utc) + + +def utc_timestamp() -> int: + """Return current UTC datetime timestamp as integer""" + return int(utc_now().timestamp()) + + class DemoAdapter(status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, filesystem_adapter.FacilityAdapter, - task_adapter.FacilityAdapter): + task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter): def __init__(self): self.resources = [] self.incidents = [] @@ -52,17 +67,63 @@ def __init__(self): self.projects = [] self.project_allocations = [] self.user_allocations = [] - + self.facility = {} + self.locations = [] + self.sites = [] self._init_state() def _init_state(self): - day_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1) + now = utc_now() + + site1 = facility_models.Site( + id=demo_uuid("site", "demo_site_1"), + name="Demo Site 1", + description="The first demo site", + last_modified=now, + short_name="DS1", + operating_organization="Demo Org", + country_name="USA", + locality_name="Demo City", + state_or_province_name="DC", + latitude=36.173357, + longitude=-234.51452, + resource_uris=[]) + + site2 = facility_models.Site( + id=demo_uuid("site", "demo_site_2"), + name="Demo Site 2", + description="The second demo site", + last_modified=now, + short_name="DS2", + operating_organization="Demo Org", + country_name="USA", + locality_name="Example Town", + state_or_province_name="ET", + latitude=38.410558, + longitude=-286.36999, + resource_uris=[]) + + self.facility = facility_models.Facility( + id=demo_uuid("facility", "demo_facility"), + name="Demo Facility", + description="A demo facility for testing the IRI Facility API", + last_modified=now, + short_name="DEMO", + organization_name="Demo Organization", + support_uri="https://support.demo.example", + site_uris=[site1.self_uri, site2.self_uri] + ) + + self.sites = [site1, site2] + + + day_ago = utc_now() - datetime.timedelta(days=1) self.capabilities = { - "cpu": account_models.Capability(id=str(uuid.uuid4()), name="CPU Nodes", units=[account_models.AllocationUnit.node_hours]), - "gpu": account_models.Capability(id=str(uuid.uuid4()), name="GPU Nodes", units=[account_models.AllocationUnit.node_hours]), - "hpss": account_models.Capability(id=str(uuid.uuid4()), name="Tape Storage", units=[account_models.AllocationUnit.bytes, account_models.AllocationUnit.inodes]), - "gpfs": account_models.Capability(id=str(uuid.uuid4()), name="GPFS Storage", units=[account_models.AllocationUnit.bytes, account_models.AllocationUnit.inodes]), + "cpu": Capability(id=str(uuid.uuid4()), name="CPU Nodes", units=[AllocationUnit.node_hours]), + "gpu": Capability(id=str(uuid.uuid4()), name="GPU Nodes", units=[AllocationUnit.node_hours]), + "hpss": Capability(id=str(uuid.uuid4()), name="Tape Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), + "gpfs": Capability(id=str(uuid.uuid4()), name="GPFS Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), } pm = status_models.Resource(id=str(uuid.uuid4()), group="perlmutter", name="compute nodes", description="the perlmutter computer compute nodes", capability_ids=[ @@ -182,6 +243,64 @@ def _init_state(self): d += datetime.timedelta(minutes=int(random.random() * 15 + 1)) + # ---------------------------- + # Facility API + # ---------------------------- + + async def get_facility( + self: "DemoAdapter", + modified_since: str | None = None, + ) -> facility_models.Facility: + return self.facility + + + async def list_sites( + self: "DemoAdapter", + modified_since: str | None = None, + name: str | None = None, + offset: int | None = None, + limit: int | None = None, + short_name: str | None = None, + ) -> list[facility_models.Site]: + + sites = self.sites + + if name: + sites = [s for s in sites if name.lower() in s.name.lower()] + + if short_name: + sites = [s for s in sites if s.short_name == short_name] + + if modified_since: + ms = datetime.datetime.fromisoformat(str(modified_since)) + sites = [s for s in sites if s.last_modified > ms] + + o = offset or 0 + l = limit or len(sites) + return sites[o:o+l] + + + async def get_site( + self: "DemoAdapter", + site_id: str, + modified_since: str | None = None, + ) -> facility_models.Site: + + site = next((s for s in self.sites if s.id == site_id), None) + if not site: + raise HTTPException(status_code=404, detail="Site not found") + + if modified_since: + ms = datetime.datetime.fromisoformat(str(modified_since)) + if site.last_modified <= ms: + raise HTTPException(status_code=304, headers={"Last-Modified": site.last_modified.isoformat()}) + + return site + + + # ---------------------------- + # Status API + # ---------------------------- async def get_resources( self : "DemoAdapter", @@ -192,6 +311,8 @@ async def get_resources( group : str | None = None, modified_since : datetime.datetime | None = None, resource_type : status_models.ResourceType | None = None, + current_status : status_models.Status | None = None, + capability: Capability | None = None ) -> list[status_models.Resource]: return status_models.Resource.find(self.resources, name, description, group, modified_since, resource_type)[offset:offset + limit] @@ -241,6 +362,7 @@ async def get_incidents( time_ : datetime.datetime | None = None, modified_since : datetime.datetime | None = None, resource_id : str | None = None, + resolution: status_models.Resolution | None = None, ) -> list[status_models.Incident]: return status_models.Incident.find(self.incidents, name, description, status, type, from_, to, time_, modified_since, resource_id)[offset:offset + limit] @@ -254,7 +376,7 @@ async def get_incident( async def get_capabilities( self : "DemoAdapter", - ) -> list[account_models.Capability]: + ) -> list[Capability]: return self.capabilities.values() @@ -316,7 +438,7 @@ async def submit_job( id="job_123", status=compute_models.JobStatus( state=compute_models.JobState.NEW, - time=time.time(), + time=utc_timestamp(), message="job submitted", exit_code=None, meta_data={ "account": "account1" }, @@ -335,7 +457,7 @@ async def submit_job_script( id="job_123", status=compute_models.JobStatus( state=compute_models.JobState.NEW, - time=time.time(), + time=utc_timestamp(), message="job submitted", exit_code=None, meta_data={ "account": "account1" }, @@ -354,7 +476,7 @@ async def update_job( id=job_id, status=compute_models.JobStatus( state=compute_models.JobState.ACTIVE, - time=time.time(), + time=utc_timestamp(), message="job updated", exit_code=None, meta_data={ "account": "account1" }, @@ -374,7 +496,7 @@ async def get_job( id=job_id, status=compute_models.JobStatus( state=compute_models.JobState.COMPLETED, - time=time.time(), + time=utc_timestamp(), message="job completed successfully", exit_code=0, meta_data={ "account": "account1" }, @@ -396,7 +518,7 @@ async def get_jobs( id=f"job_{i}", status=compute_models.JobStatus( state=random.choice([s for s in compute_models.JobState]), - time=time.time() - (random.random() * 100), + time=utc_timestamp() - int(random.random() * 100), message="", exit_code=random.choice([0, 0, 0, 0, 0, 1, 1, 128, 127]), meta_data={ "account": "account1" }, @@ -864,7 +986,7 @@ class DemoTaskQueue: @staticmethod async def _process_tasks(da: DemoAdapter): - now = time.time() + now = utc_timestamp() _tasks = [] for t in DemoTaskQueue.tasks: if now - t.start > 5 * 60 and t.status in [task_models.TaskStatus.completed, task_models.TaskStatus.canceled, task_models.TaskStatus.failed]: @@ -885,5 +1007,5 @@ async def _process_tasks(da: DemoAdapter): @staticmethod def _create_task(user: account_models.User, resource: status_models.Resource, command: task_models.TaskCommand) -> str: task_id = f"task_{len(DemoTaskQueue.tasks)}" - DemoTaskQueue.tasks.append(DemoTask(id=task_id, body=command.model_dump_json(), user=user, resource=resource, start=time.time())) + DemoTaskQueue.tasks.append(DemoTask(id=task_id, body=command.model_dump_json(), user=user, resource=resource, start=utc_timestamp())) return task_id diff --git a/app/main.py b/app/main.py index be5feaa..fa3f1ed 100644 --- a/app/main.py +++ b/app/main.py @@ -2,8 +2,16 @@ """Main API application""" import logging from fastapi import FastAPI +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor, SimpleSpanProcessor +from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from app.routers.error_handlers import install_error_handlers +from app.routers.facility import facility from app.routers.status import status from app.routers.account import account from app.routers.compute import compute @@ -12,14 +20,40 @@ from . import config +# ------------------------------------------------------------------ +# OpenTelemetry Tracing Configuration +# ------------------------------------------------------------------ +if config.OPENTELEMETRY_ENABLED: + resource = Resource.create({ + "service.name": "iri-facility-api", + "service.version": config.API_VERSION, + "service.endpoint": config.API_URL_ROOT}) + + samplerate = "1.0" if config.OPENTELEMETRY_DEBUG else config.OTEL_SAMPLE_RATE + provider = TracerProvider(resource=resource, sampler=ParentBased(TraceIdRatioBased(samplerate))) + trace.set_tracer_provider(provider) + + if config.OTLP_ENDPOINT: + exporter = OTLPSpanExporter(endpoint=config.OTLP_ENDPOINT, insecure=True) + span_processor = BatchSpanProcessor(exporter) + else: + exporter = ConsoleSpanExporter() + span_processor = SimpleSpanProcessor(exporter) + provider.add_span_processor(span_processor) + tracer = trace.get_tracer(__name__) +# ------------------------------------------------------------------ APP = FastAPI(**config.API_CONFIG) +if config.OPENTELEMETRY_ENABLED: + FastAPIInstrumentor.instrument_app(APP) + install_error_handlers(APP) api_prefix = f"{config.API_PREFIX}{config.API_URL}" # Attach routers under the prefix +APP.include_router(facility.router, prefix=api_prefix) APP.include_router(status.router, prefix=api_prefix) APP.include_router(account.router, prefix=api_prefix) APP.include_router(compute.router, prefix=api_prefix) diff --git a/app/routers/account/account.py b/app/routers/account/account.py index 951fc9b..f856312 100644 --- a/app/routers/account/account.py +++ b/app/routers/account/account.py @@ -1,7 +1,8 @@ -from fastapi import HTTPException, Request, Depends +from fastapi import HTTPException, Request, Depends, Query from . import models, facility_adapter from .. import iri_router from ..error_handlers import DEFAULT_RESPONSES +from ..common import forbidExtraQueryParams, StrictDateTime, Capability router = iri_router.IriRouter( @@ -21,7 +22,12 @@ ) async def get_capabilities( request : Request, - ) -> list[models.Capability]: + name : str = Query(default=None, min_length=1), + modified_since: StrictDateTime = Query(default=None), + offset : int = Query(default=0, ge=0, le=1000), + limit : int = Query(default=100, ge=0, le=1000), + _forbid = Depends(forbidExtraQueryParams("name", "modified_since", "offset", "limit")), + ) -> list[Capability]: return await router.adapter.get_capabilities() @@ -35,7 +41,9 @@ async def get_capabilities( async def get_capability( capability_id : str, request : Request, - ) -> models.Capability: + modified_since: StrictDateTime = Query(default=None), + _forbid = Depends(forbidExtraQueryParams("modified_since")), + ) -> Capability: caps = await router.adapter.get_capabilities() cc = next((c for c in caps if c.id == capability_id), None) if not cc: @@ -53,6 +61,7 @@ async def get_capability( ) async def get_projects( request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> list[models.Project]: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: @@ -71,6 +80,7 @@ async def get_projects( async def get_project( project_id : str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> models.Project: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: @@ -93,6 +103,7 @@ async def get_project( async def get_project_allocations( project_id: str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> list[models.ProjectAllocation]: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: @@ -116,6 +127,7 @@ async def get_project_allocation( project_id: str, project_allocation_id : str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> models.ProjectAllocation: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: @@ -141,6 +153,7 @@ async def get_user_allocations( project_id: str, project_allocation_id : str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> list[models.UserAllocation]: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: @@ -169,6 +182,7 @@ async def get_user_allocation( project_allocation_id : str, user_allocation_id : str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ) -> models.UserAllocation: user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) if not user: diff --git a/app/routers/account/facility_adapter.py b/app/routers/account/facility_adapter.py index 78b622f..235a2f7 100644 --- a/app/routers/account/facility_adapter.py +++ b/app/routers/account/facility_adapter.py @@ -1,5 +1,6 @@ from abc import abstractmethod from . import models as account_models +from ..common import Capability from ..iri_router import AuthenticatedAdapter @@ -13,7 +14,7 @@ class FacilityAdapter(AuthenticatedAdapter): @abstractmethod async def get_capabilities( self : "FacilityAdapter", - ) -> list[account_models.Capability]: + ) -> list[Capability]: pass diff --git a/app/routers/account/models.py b/app/routers/account/models.py index 6ed69ea..1a9333d 100644 --- a/app/routers/account/models.py +++ b/app/routers/account/models.py @@ -1,28 +1,9 @@ -from pydantic import BaseModel, computed_field, Field -import enum +from pydantic import computed_field, Field from ... import config +from ..common import IRIBaseModel, AllocationUnit -class AllocationUnit(enum.Enum): - node_hours = "node_hours" - bytes = "bytes" - inodes = "inodes" - - -class Capability(BaseModel): - """ - An aspect of a resource that can have an allocation. - For example, Perlmutter nodes with GPUs - For some resources at a facility, this will be 1 to 1 with the resource. - It is a way to further subdivide a resource into allocatable sub-resources. - The word "capability" is also known to users as something they need for a job to run. (eg. gpu) - """ - id: str - name: str - units: list[AllocationUnit] - - -class User(BaseModel): +class User(IRIBaseModel): """A user of the facility""" id: str name: str @@ -31,7 +12,7 @@ class User(BaseModel): # we could expose more fields here (eg. email) but it might be against policy -class Project(BaseModel): +class Project(IRIBaseModel): """A project and its users at a facility""" id: str name: str @@ -39,14 +20,14 @@ class Project(BaseModel): user_ids: list[str] -class AllocationEntry(BaseModel): +class AllocationEntry(IRIBaseModel): """Base class for allocations.""" allocation: float # how much this allocation can spend usage: float # how much this allocation has spent unit: AllocationUnit -class ProjectAllocation(BaseModel): +class ProjectAllocation(IRIBaseModel): """ A project's allocation for a capability. (aka. repo) This allocation is a piece of the total allocation for the capability. (eg. 5% of the total node hours of Perlmutter GPU nodes) @@ -71,7 +52,7 @@ def capability_uri(self) -> str: return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/account/capabilities/{self.capability_id}" -class UserAllocation(BaseModel): +class UserAllocation(IRIBaseModel): """ A user's allcation in a project. This allocation is a piece of the project's allocation. diff --git a/app/routers/common.py b/app/routers/common.py new file mode 100644 index 0000000..fd2882f --- /dev/null +++ b/app/routers/common.py @@ -0,0 +1,204 @@ +"""Default models used by multiple routers.""" +import datetime +import enum +from typing import Optional +from urllib.parse import parse_qs + +from pydantic_core import core_schema +from pydantic import BaseModel, ConfigDict, Field, computed_field, model_serializer +from fastapi import Request, HTTPException + +from .. import config + + +# These are Pydantic custom types for strict validation +# that are not implmented in Pydantic by default. +# ----------------------------------------------------------------------- +# StrictBool: a strict boolean type +class StrictBool: + """Strict boolean: + - Accepts: real booleans, 'true', 'false' + - Rejects everything else. + """ + + @classmethod + def __get_pydantic_core_schema__(cls, source, handler): + return core_schema.no_info_plain_validator_function(cls.validate) + + @staticmethod + def validate(value): + """Validate the input value as a strict boolean.""" + if isinstance(value, bool): + return value + if isinstance(value, str): + v = value.strip().lower() + if v == "true": + return True + if v == "false": + return False + raise ValueError("Invalid boolean value. Expected 'true' or 'false'.") + raise ValueError("Invalid boolean value. Expected true/false or 'true'/'false'.") + + @classmethod + def __get_pydantic_json_schema__(cls, schema, handler): + return { + "type": "boolean", + "description": "Strict boolean. Only true/false allowed (bool or string)." + } + +# ----------------------------------------------------------------------- +# StrictDateTime: a strict ISO8601 datetime type + +class StrictDateTime: + """ + Strict ISO8601 datetime: + - Accepts datetime objects + - Accepts ISO8601 strings: 2025-12-06T10:00:00Z, 2025-12-06T10:00:00+00:00 + - Converts 'Z' → UTC + - Converts naive datetimes → UTC + - Rejects integers ("0"), null, garbage strings, etc. + """ + + @classmethod + def __get_pydantic_core_schema__(cls, source, handler): + return core_schema.no_info_plain_validator_function(cls.validate) + + @staticmethod + def validate(value): + if isinstance(value, datetime.datetime): + return StrictDateTime._normalize(value) + if not isinstance(value, str): + raise ValueError("Invalid datetime value. Expected ISO8601 datetime string.") + v = value.strip() + if v.endswith("Z"): + v = v[:-1] + "+00:00" + try: + dt = datetime.datetime.fromisoformat(v) + except Exception as ex: + raise ValueError("Invalid datetime format. Expected ISO8601 string.") from ex + + return StrictDateTime._normalize(dt) + + @staticmethod + def _normalize(dt: datetime.datetime) -> datetime.datetime: + if dt.tzinfo is None: + return dt.replace(tzinfo=datetime.timezone.utc) + return dt + + @classmethod + def __get_pydantic_json_schema__(cls, schema, handler): + return { + "type": "string", + "format": "date-time", + "description": "Strict ISO8601 datetime. Only valid ISO8601 datetime strings are accepted." + } + + +def forbidExtraQueryParams(*allowedParams: str, multiParams: set[str] | None = None): + multiParams = multiParams or set() + + async def checker(req: Request): + if "*" in allowedParams: + return + + raw_qs = req.scope.get("query_string", b"") + parsed = parse_qs(raw_qs.decode("utf-8", errors="strict"), keep_blank_values=True) + + allowed = set(allowedParams) + + for key, values in parsed.items(): + if key not in allowed: + raise HTTPException(status_code=422, + detail=[{"type": "extra_forbidden", + "loc": ["query", key], + "msg": f"Unexpected query parameter: {key}"}]) + + + if len(values) > 1 and key not in multiParams: + raise HTTPException(status_code=422, + detail=[{"type": "duplicate_forbidden", + "loc": ["query", key], + "msg": f"Duplicate query parameter: {key}"}]) + + return checker + + + + +class IRIBaseModel(BaseModel): + """Base model for IRI models.""" + model_config = ConfigDict(extra="allow") + + @model_serializer(mode="wrap") + def _hide_extra(self, handler, info): + data = handler(self) + + model_fields = set(self.model_fields or {}) + computed_fields = set(self.model_computed_fields or {}) + extra = getattr(self, "__pydantic_extra__", {}) or {} + for k in extra: + if k not in model_fields and k not in computed_fields: + data.pop(k, None) + return data + + def get_extra(self, key, default=None): + return getattr(self, "__pydantic_extra__", {}).get(key, default) + + +class NamedObject(IRIBaseModel): + id: str = Field(..., description="The unique identifier for the object. Typically a UUID or URN.") + def _self_path(self) -> str: + raise NotImplementedError + + @computed_field(description="The canonical URL of this object") + @property + def self_uri(self) -> str: + """Computed self URI property.""" + return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}{self._self_path()}" + + name: Optional[str] = Field(None, description="The long name of the object.") + description: Optional[str] = Field(None, description="Human-readable description of the object.") + last_modified: StrictDateTime = Field(..., description="ISO 8601 timestamp when this object was last modified.") + + @staticmethod + def find_by_id(a, id, allow_name: bool|None=False): + # Find a resource by its id. + # If allow_name is True, the id parameter can also match the resource's name. + return next((r for r in a if r.id == id or (allow_name and r.name == id)), None) + + + @staticmethod + def find(a, name, description, modified_since): + def normalize(dt: datetime) -> datetime: + # Convert naive datetimes into UTC-aware versions + if dt.tzinfo is None: + return dt.replace(tzinfo=datetime.timezone.utc) + return dt + if name: + a = [aa for aa in a if aa.name == name] + if description: + a = [aa for aa in a if description in aa.description] + if modified_since: + if modified_since.tzinfo is None: + modified_since = modified_since.replace(tzinfo=datetime.timezone.utc) + a = [aa for aa in a if normalize(aa.last_modified) >= modified_since] + return a + + +class AllocationUnit(enum.Enum): + node_hours = "node_hours" + bytes = "bytes" + inodes = "inodes" + + +class Capability(IRIBaseModel): + """ + An aspect of a resource that can have an allocation. + For example, Perlmutter nodes with GPUs + For some resources at a facility, this will be 1 to 1 with the resource. + It is a way to further subdivide a resource into allocatable sub-resources. + The word "capability" is also known to users as something they need for a job to run. (eg. gpu) + """ + id: str + name: str + units: list[AllocationUnit] \ No newline at end of file diff --git a/app/routers/compute/compute.py b/app/routers/compute/compute.py index e7481ac..e915048 100644 --- a/app/routers/compute/compute.py +++ b/app/routers/compute/compute.py @@ -1,9 +1,12 @@ -from typing import List, Annotated -from fastapi import HTTPException, Request, Depends, status, Form, Query +"""Compute resource API router""" +from fastapi import HTTPException, Request, Depends, status, Query from . import models, facility_adapter from .. import iri_router + from ..error_handlers import DEFAULT_RESPONSES from ..status.status import router as status_router +from ..common import forbidExtraQueryParams, StrictBool + router = iri_router.IriRouter( facility_adapter.FacilityAdapter, @@ -11,7 +14,6 @@ tags=["compute"], ) - @router.post( "/job/{resource_id:str}", dependencies=[Depends(router.current_user)], @@ -24,6 +26,7 @@ async def submit_job( resource_id: str, job_spec : models.JobSpec, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ): """ Submit a job on a compute resource @@ -45,39 +48,41 @@ async def submit_job( return await router.adapter.submit_job(resource, user, job_spec) -@router.post( - "/job/script/{resource_id:str}", - dependencies=[Depends(router.current_user)], - response_model=models.Job, - response_model_exclude_unset=True, - responses=DEFAULT_RESPONSES, - operation_id="launchJobScript", -) -async def submit_job_path( - resource_id: str, - job_script_path : str, - request : Request, - args : Annotated[List[str], Form()] = [], - ): - """ - Submit a job on a compute resource - - - **resource**: the name of the compute resource to use - - **job_script_path**: path to the job script on the compute resource - - **args**: optional arguments to the job script - - This command will attempt to submit a job and return its id. - """ - user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) - if not user: - raise HTTPException(status_code=404, detail="User not found") - - # look up the resource (todo: maybe ensure it's available) - resource = await status_router.adapter.get_resource(resource_id) - - # the handler can use whatever means it wants to submit the job and then fill in its id - # see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs - return await router.adapter.submit_job_script(resource, user, job_script_path, args) +# TODO: this conflicts with PUT commented out while we finalize the API design +#@router.post( +# "/job/script/{resource_id:str}", +# dependencies=[Depends(router.current_user)], +# response_model=models.Job, +# response_model_exclude_unset=True, +# responses=DEFAULT_RESPONSES, +# operation_id="launchJobScript", +#) +#async def submit_job_path( +# resource_id: str, +# job_script_path : str, +# request : Request, +# args : Annotated[List[str], Form()] = [], +# _forbid = Depends(iri_router.forbidExtraQueryParams("job_script_path")), +# ): +# """ +# Submit a job on a compute resource +# +# - **resource**: the name of the compute resource to use +# - **job_script_path**: path to the job script on the compute resource +# - **args**: optional arguments to the job script +# +# This command will attempt to submit a job and return its id. +# """ +# user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) +# if not user: +# raise HTTPException(status_code=404, detail="User not found") +# +# # look up the resource (todo: maybe ensure it's available) +# resource = await status_router.adapter.get_resource(resource_id) +# +# # the handler can use whatever means it wants to submit the job and then fill in its id +# # see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs +# return await router.adapter.submit_job_script(resource, user, job_script_path, args) @router.put( @@ -93,6 +98,7 @@ async def update_job( job_id: str, job_spec : models.JobSpec, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ): """ Update a previously submitted job for a resource. @@ -120,14 +126,15 @@ async def update_job( response_model=models.Job, response_model_exclude_unset=True, responses=DEFAULT_RESPONSES, - operation_id="getJobs", + operation_id="getJob", ) async def get_job_status( resource_id : str, job_id : str, request : Request, - historical : bool = False, - include_spec: bool = False, + historical : StrictBool = Query(default=False, description="Whether to include historical jobs. Defaults to false"), + include_spec: StrictBool = Query(default=False, description="Whether to include the job specification. Defaults to false"), + _forbid = Depends(forbidExtraQueryParams("historical", "include_spec")), ): """Get a job's status""" user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) @@ -154,11 +161,12 @@ async def get_job_status( async def get_job_statuses( resource_id : str, request : Request, - offset : int = Query(default=0, ge=0), - limit : int = Query(default=100, le=10000), + offset : int = Query(default=0, ge=0, le=1000), + limit : int = Query(default=100, ge=0, le=1000), filters : dict[str, object] | None = None, - historical : bool = False, - include_spec: bool = False, + historical : StrictBool = Query(default=False, description="Whether to include historical jobs. Defaults to false"), + include_spec: StrictBool = Query(default=False, description="Whether to include the job specification. Defaults to false"), + _forbid = Depends(forbidExtraQueryParams("offset", "limit", "filters", "historical", "include_spec")), ): """Get multiple jobs' statuses""" user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) @@ -187,6 +195,7 @@ async def cancel_job( resource_id : str, job_id : str, request : Request, + _forbid = Depends(forbidExtraQueryParams()), ): """Cancel a job""" user = await router.adapter.get_user(request.state.current_user_id, request.state.api_key, iri_router.get_client_ip(request)) @@ -196,8 +205,6 @@ async def cancel_job( # look up the resource (todo: maybe ensure it's available) resource = await status_router.adapter.get_resource(resource_id) - try: - await router.adapter.cancel_job(resource, user, job_id) - except Exception as exc: - raise HTTPException(status_code=400, detail=f"Unable to cancel job: {str(exc)}") from exc + await router.adapter.cancel_job(resource, user, job_id) + return None diff --git a/app/routers/compute/models.py b/app/routers/compute/models.py index 167643e..a56d4fe 100644 --- a/app/routers/compute/models.py +++ b/app/routers/compute/models.py @@ -1,41 +1,42 @@ from typing import Annotated -from pydantic import BaseModel, field_serializer, ConfigDict, Field from enum import IntEnum +from pydantic import field_serializer, ConfigDict, StrictBool, Field +from ..common import IRIBaseModel -class ResourceSpec(BaseModel): +class ResourceSpec(IRIBaseModel): """ Specification of computational resources required for a job. """ - node_count: Annotated[int | None, Field(description="Number of compute nodes to allocate")] = None - process_count: Annotated[int | None, Field(description="Total number of processes to launch")] = None - processes_per_node: Annotated[int | None, Field(description="Number of processes to launch per node")] = None - cpu_cores_per_process: Annotated[int | None, Field(description="Number of CPU cores to allocate per process")] = None - gpu_cores_per_process: Annotated[int | None, Field(description="Number of GPU cores to allocate per process")] = None - exclusive_node_use: Annotated[bool, Field(description="Whether to request exclusive use of allocated nodes")] = True - memory: Annotated[int | None, Field(description="Amount of memory to allocate in bytes")] = None + node_count: Annotated[int | None, Field(ge=1, description="Number of compute nodes to allocate")] = None + process_count: Annotated[int | None, Field(ge=1, description="Total number of processes to launch")] = None + processes_per_node: Annotated[int | None, Field(ge=1, description="Number of processes to launch per node")] = None + cpu_cores_per_process: Annotated[int | None, Field(ge=1, description="Number of CPU cores to allocate per process")] = None + gpu_cores_per_process: Annotated[int | None, Field(ge=1, description="Number of GPU cores to allocate per process")] = None + exclusive_node_use: Annotated[StrictBool, Field(description="Whether to request exclusive use of allocated nodes")] = True + memory: Annotated[int | None, Field(ge=1,description="Amount of memory to allocate in bytes")] = None -class JobAttributes(BaseModel): +class JobAttributes(IRIBaseModel): """ Additional attributes and scheduling parameters for a job. """ - duration: Annotated[int | None, Field(description="Duration in seconds", ge=0, examples=[30, 60, 120])] = None - queue_name: Annotated[str | None, Field(description="Name of the queue or partition to submit the job to")] = None - account: Annotated[str | None, Field(description="Account or project to charge for resource usage")] = None - reservation_id: Annotated[str | None, Field(description="ID of a reservation to use for the job")] = None + duration: Annotated[int | None, Field(description="Duration in seconds", ge=1, examples=[30, 60, 120])] = None + queue_name: Annotated[str | None, Field(min_length=1, description="Name of the queue or partition to submit the job to")] = None + account: Annotated[str | None, Field(min_length=1, description="Account or project to charge for resource usage")] = None + reservation_id: Annotated[str | None, Field(min_length=1, description="ID of a reservation to use for the job")] = None custom_attributes: Annotated[dict[str, str], Field(description="Custom scheduler-specific attributes as key-value pairs")] = {} -class VolumeMount(BaseModel): +class VolumeMount(IRIBaseModel): """ Represents a volume mount for a container. """ - source: Annotated[str, Field(description="The source path on the host system to mount")] - target: Annotated[str, Field(description="The target path inside the container where the volume will be mounted")] - read_only: Annotated[bool, Field(description="Whether the mount should be read-only")] = True + source: Annotated[str, Field(min_length=1, description="The source path on the host system to mount")] + target: Annotated[str, Field(min_length=1, description="The target path inside the container where the volume will be mounted")] + read_only: Annotated[StrictBool, Field(description="Whether the mount should be read-only")] = True -class Container(BaseModel): +class Container(IRIBaseModel): """ Represents a container specification for job execution. @@ -44,33 +45,33 @@ class Container(BaseModel): to determine if the container should be run with MPI support. The container should by default. be run with host networking. """ - image: Annotated[str, Field(description="The container image to use (e.g., 'docker.io/library/ubuntu:latest')")] + image: Annotated[str, Field(min_length=1, description="The container image to use (e.g., 'docker.io/library/ubuntu:latest')")] volume_mounts: Annotated[list[VolumeMount], Field(description="List of volume mounts for the container")] = [] -class JobSpec(BaseModel): +class JobSpec(IRIBaseModel): """ Specification for job. """ model_config = ConfigDict(extra="forbid") - executable: Annotated[str | None, Field(description="Path to the executable to run. If container is specified, this will be used as the entrypoint to the container.")] = None + executable: Annotated[str | None, Field(min_length=1, description="Path to the executable to run. If container is specified, this will be used as the entrypoint to the container.")] = None container: Annotated[Container | None, Field(description="Container specification for containerized execution")] = None arguments: Annotated[list[str], Field(description="Command-line arguments to pass to the executable or container")] = [] - directory: Annotated[str | None, Field(description="Working directory for the job")] = None - name: Annotated[str | None, Field(description="Name of the job")] = None - inherit_environment: Annotated[bool, Field(description="Whether to inherit the environment variables from the submission environment")] = True + directory: Annotated[str | None, Field(min_length=1, description="Working directory for the job")] = None + name: Annotated[str | None, Field(min_length=1, description="Name of the job")] = None + inherit_environment: Annotated[StrictBool, Field(description="Whether to inherit the environment variables from the submission environment")] = True environment: Annotated[dict[str, str], Field(description="Environment variables to set for the job. If container is specified, these will be set inside the container.")] = {} - stdin_path: Annotated[str | None, Field(description="Path to file to use as standard input")] = None - stdout_path: Annotated[str | None, Field(description="Path to file to write standard output")] = None - stderr_path: Annotated[str | None, Field(description="Path to file to write standard error")] = None + stdin_path: Annotated[str | None, Field(min_length=1, description="Path to file to use as standard input")] = None + stdout_path: Annotated[str | None, Field(min_length=1, description="Path to file to write standard output")] = None + stderr_path: Annotated[str | None, Field(min_length=1, description="Path to file to write standard error")] = None resources: Annotated[ResourceSpec | None, Field(description="Resource requirements for the job")] = None attributes: Annotated[JobAttributes | None, Field(description="Additional job attributes such as duration, queue, and account")] = None - pre_launch: Annotated[str | None, Field(description="Script or commands to run before launching the job")] = None - post_launch: Annotated[str | None, Field(description="Script or commands to run after the job completes")] = None - launcher: Annotated[str | None, Field(description="Job launcher to use (e.g., 'mpirun', 'srun')")] = None + pre_launch: Annotated[str | None, Field(min_length=1, description="Script or commands to run before launching the job")] = None + post_launch: Annotated[str | None, Field(min_length=1, description="Script or commands to run after the job completes")] = None + launcher: Annotated[str | None, Field(min_length=1, description="Job launcher to use (e.g., 'mpirun', 'srun')")] = None -class CommandResult(BaseModel): +class CommandResult(IRIBaseModel): status : str result : str | None = None @@ -110,20 +111,19 @@ class JobState(IntEnum): """Represents a job that was canceled by a call to :func:`~psij.Job.cancel()`.""" -class JobStatus(BaseModel): +class JobStatus(IRIBaseModel): state : JobState time : float | None = None message : str | None = None exit_code : int | None = None meta_data : dict[str, object] | None = None - @field_serializer('state') def serialize_state(self, state: JobState): return state.name -class Job(BaseModel): +class Job(IRIBaseModel): id : str status : JobStatus | None = None job_spec : JobSpec | None = None diff --git a/app/routers/error_handlers.py b/app/routers/error_handlers.py index 09769ec..337b5fc 100644 --- a/app/routers/error_handlers.py +++ b/app/routers/error_handlers.py @@ -65,6 +65,12 @@ async def validation_error_handler(request: Request, exc: RequestValidationError @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): + if exc.status_code == 304: + return JSONResponse( + status_code=304, + content=None, + headers=exc.headers or {}) + if exc.status_code == 401: return problem_response( request=request, diff --git a/app/routers/facility/__init__.py b/app/routers/facility/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/facility/facility.py b/app/routers/facility/facility.py new file mode 100644 index 0000000..acd4d39 --- /dev/null +++ b/app/routers/facility/facility.py @@ -0,0 +1,44 @@ +from fastapi import Request, Depends, Query +from .. import iri_router +from ..error_handlers import DEFAULT_RESPONSES +from .import models, facility_adapter +from ..common import StrictDateTime, forbidExtraQueryParams + + +router = iri_router.IriRouter( + facility_adapter.FacilityAdapter, + prefix="/facility", + tags=["facility"], +) + +@router.get("", responses=DEFAULT_RESPONSES, operation_id="getFacility") +async def get_facility( + request: Request, + modified_since: StrictDateTime = Query(default=None), + _forbid = Depends(forbidExtraQueryParams("modified_since")), + ) -> models.Facility: + """Get facility information""" + return await router.adapter.get_facility(modified_since=modified_since) + +@router.get("/sites", responses=DEFAULT_RESPONSES, operation_id="getSites") +async def list_sites( + request: Request, + modified_since: StrictDateTime = Query(default=None), + name: str = Query(default=None, min_length=1), + offset: int = Query(default=0, ge=0, le=1000), + limit: int = Query(default=100, ge=0, le=1000), + short_name: str = Query(default=None, min_length=1), + _forbid = Depends(forbidExtraQueryParams("modified_since", "name", "offset", "limit", "short_name")), + )-> list[models.Site]: + """List sites""" + return await router.adapter.list_sites(modified_since=modified_since, name=name, offset=offset, limit=limit, short_name=short_name) + +@router.get("/sites/{site_id}", responses=DEFAULT_RESPONSES, operation_id="getSite") +async def get_site( + request: Request, + site_id: str, + modified_since: StrictDateTime = Query(default=None), + _forbid = Depends(forbidExtraQueryParams("modified_since")), + )-> models.Site: + """Get site by ID""" + return await router.adapter.get_site(site_id=site_id, modified_since=modified_since) \ No newline at end of file diff --git a/app/routers/facility/facility_adapter.py b/app/routers/facility/facility_adapter.py new file mode 100644 index 0000000..7758f24 --- /dev/null +++ b/app/routers/facility/facility_adapter.py @@ -0,0 +1,37 @@ +from abc import abstractmethod +from . import models as facility_models +from ..iri_router import AuthenticatedAdapter + + +class FacilityAdapter(AuthenticatedAdapter): + """ + Facility-specific code is handled by the implementation of this interface. + Use the `IRI_API_ADAPTER` environment variable (defaults to `app.demo_adapter.FacilityAdapter`) + to install your facility adapter before the API starts. + """ + + @abstractmethod + async def get_facility( + self: "FacilityAdapter", + modified_since: str | None = None, + ) -> facility_models.Facility | None: + pass + + @abstractmethod + async def list_sites( + self: "FacilityAdapter", + modified_since: str | None = None, + name: str | None = None, + offset: int | None = None, + limit: int | None = None, + short_name: str | None = None, + ) -> list[facility_models.Site]: + pass + + @abstractmethod + async def get_site( + self: "FacilityAdapter", + site_id: str, + modified_since: str | None = None, + ) -> facility_models.Site | None: + pass \ No newline at end of file diff --git a/app/routers/facility/models.py b/app/routers/facility/models.py new file mode 100644 index 0000000..a3a781c --- /dev/null +++ b/app/routers/facility/models.py @@ -0,0 +1,30 @@ +"""Facility-related models.""" +from typing import Optional, List +from pydantic import Field, HttpUrl +from ..common import NamedObject + + + +class Site(NamedObject): + def _self_path(self) -> str: + return f"/facility/sites/{self.id}" + short_name: Optional[str] = Field(None, description="Common or short name of the Site.") + operating_organization: str = Field(..., description="Organization operating the Site.") + country_name: Optional[str] = Field(None, description="Country name of the Location.") + locality_name: Optional[str] = Field(None, description="City or locality name of the Location.") + state_or_province_name: Optional[str] = Field(None, description="State or province name of the Location.") + street_address: Optional[str] = Field(None, description="Street address of the Location.") + unlocode: Optional[str] = Field(None, description="United Nations trade and transport location code.") + altitude: Optional[float] = Field(None, description="Altitude of the Location.") + latitude: Optional[float] = Field(None, description="Latitude of the Location.") + longitude: Optional[float] = Field(None, description="Longitude of the Location.") + resource_uris: List[HttpUrl] = Field(default_factory=list, description="URIs of Resources hosted at this Site.") + + +class Facility(NamedObject): + def _self_path(self) -> str: + return "/facility" + short_name: Optional[str] = Field(None, description="Common or short name of the Facility.") + organization_name: Optional[str] = Field(None, description="Operating organization’s name.") + support_uri: Optional[HttpUrl] = Field(None, description="Link to facility support portal.") + site_uris: List[HttpUrl] = Field(default_factory=list, description="URIs of associated Sites.") diff --git a/app/routers/filesystem/facility_adapter.py b/app/routers/filesystem/facility_adapter.py index 2c08a3c..a70efb0 100644 --- a/app/routers/filesystem/facility_adapter.py +++ b/app/routers/filesystem/facility_adapter.py @@ -1,15 +1,15 @@ import os from abc import abstractmethod +from typing import Any, Tuple from ..status import models as status_models from ..account import models as account_models from . import models as filesystem_models from ..iri_router import AuthenticatedAdapter -from typing import Any, Tuple def to_int(name, default_value): try: - return os.environ.get(name) or default_value + return int(os.environ.get(name) or default_value) except: return default_value diff --git a/app/routers/filesystem/filesystem.py b/app/routers/filesystem/filesystem.py index a111484..d583c64 100644 --- a/app/routers/filesystem/filesystem.py +++ b/app/routers/filesystem/filesystem.py @@ -354,40 +354,13 @@ async def get_view( resource_id: str, request : Request, path: Annotated[str, Query(description="File path")], - size: Annotated[ - int, - Query( - alias="size", - description="Value, in bytes, of the size of data to be retrieved from the file.", - ), - ] = facility_adapter.OPS_SIZE_LIMIT, - offset: Annotated[ - int, - Query( - alias="offset", - description="Value in bytes of the offset.", - ), - ] = 0, -) -> str: - user, resource = await _user_resource(resource_id, request) - if offset < 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="`offset` value must be an integer value equal or greater than 0", - ) - - if size <= 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="`size` value must be an integer value greater than 0", - ) + size: Annotated[int, Query(description="Value, in bytes, of the size of data to be retrieved from the file.", + ge=1, le=facility_adapter.OPS_SIZE_LIMIT)] = facility_adapter.OPS_SIZE_LIMIT, - if size > facility_adapter.OPS_SIZE_LIMIT: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"`size` value must be less than {facility_adapter.OPS_SIZE_LIMIT} bytes", - ) + offset: Annotated[int, Query( description="Value in bytes of the offset.", ge=0)] = 0 + ) -> str: + user, resource = await _user_resource(resource_id, request) return await router.task_adapter.put_task( user, @@ -397,9 +370,8 @@ async def get_view( command="view", args={ "path": path, - "size": size or facility_adapter.OPS_SIZE_LIMIT, - "offset": offset or 0, - + "size": size, + "offset": offset, } ) ) diff --git a/app/routers/filesystem/models.py b/app/routers/filesystem/models.py index b24c908..d32ad94 100644 --- a/app/routers/filesystem/models.py +++ b/app/routers/filesystem/models.py @@ -12,11 +12,10 @@ class CompressionType(str, Enum): - none = "none" - bzip2 = "bzip2" - gzip = "gzip" - xz = "xz" - + none = "none" + bzip2 = "bzip2" + gzip = "gzip" + xz = "xz" class ContentUnit(str, Enum): lines = "lines" diff --git a/app/routers/iri_router.py b/app/routers/iri_router.py index dafb970..f0b5b49 100644 --- a/app/routers/iri_router.py +++ b/app/routers/iri_router.py @@ -1,20 +1,17 @@ from abc import ABC, abstractmethod +import traceback import os import logging import importlib -import datetime from fastapi import Request, Depends, HTTPException, APIRouter from fastapi.security import APIKeyHeader -from pydantic_core import core_schema from .account.models import User + bearer_token = APIKeyHeader(name="Authorization") def get_client_ip(request : Request) -> str|None: - # logging.debug("Request headers=%s" % request.headers) - # logging.debug("client=%s" % request.client.host) - forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: return forwarded_for.split(",")[0].strip() @@ -91,6 +88,7 @@ async def current_user( user_id = await self.adapter.get_current_user(api_key, get_client_ip(request)) except Exception as exc: logging.getLogger().error(f"Error parsing IRI_API_PARAMS: {exc}") + traceback.print_exc() raise HTTPException(status_code=401, detail="Invalid or malformed Authorization parameters") from exc if not user_id: raise HTTPException(status_code=403, detail="Unauthorized access") @@ -125,62 +123,3 @@ async def get_user( Retrieve additional user information (name, email, etc.) for the given user_id. """ pass - - -def forbidExtraQueryParams(*allowedParams: str): - """Dependency to forbid extra query parameters not in allowedParams.""" - - async def checker(_req: Request): - if "*" in allowedParams: - return # Permit anything - incoming = set(_req.query_params.keys()) - allowed = set(allowedParams) - unknown = incoming - allowed - if unknown: - raise HTTPException(status_code=422, - detail=[{"type": "extra_forbidden", "loc": ["query", param], "msg": f"Unexpected query parameter: {param}"} for param in unknown]) - return checker - -class StrictDateTime: - """ - Strict ISO8601 datetime: - ✔ Accepts datetime objects - ✔ Accepts ISO8601 strings: 2025-12-06T10:00:00Z, 2025-12-06T10:00:00+00:00 - ✔ Converts 'Z' → UTC - ✔ Converts naive datetimes → UTC - ✘ Rejects integers ("0"), null, garbage strings, etc. - """ - - @classmethod - def __get_pydantic_core_schema__(cls, source, handler): - return core_schema.no_info_plain_validator_function(cls.validate) - - @staticmethod - def validate(value): - if isinstance(value, datetime.datetime): - return StrictDateTime._normalize(value) - if not isinstance(value, str): - raise ValueError("Invalid datetime value. Expected ISO8601 datetime string.") - v = value.strip() - if v.endswith("Z"): - v = v[:-1] + "+00:00" - try: - dt = datetime.datetime.fromisoformat(v) - except Exception as ex: - raise ValueError("Invalid datetime format. Expected ISO8601 string.") from ex - - return StrictDateTime._normalize(dt) - - @staticmethod - def _normalize(dt: datetime.datetime) -> datetime.datetime: - if dt.tzinfo is None: - return dt.replace(tzinfo=datetime.timezone.utc) - return dt - - @classmethod - def __get_pydantic_json_schema__(cls, schema, handler): - return { - "type": "string", - "format": "date-time", - "description": "Strict ISO8601 datetime. Only valid ISO8601 datetime strings are accepted." - } diff --git a/app/routers/status/facility_adapter.py b/app/routers/status/facility_adapter.py index 6753a47..d7358c5 100644 --- a/app/routers/status/facility_adapter.py +++ b/app/routers/status/facility_adapter.py @@ -2,6 +2,7 @@ import datetime from fastapi import Query from . import models as status_models +from ..common import Capability class FacilityAdapter(ABC): @@ -21,7 +22,9 @@ async def get_resources( description : str | None = None, group : str | None = None, modified_since : datetime.datetime | None = None, - resource_type: status_models.ResourceType = Query(default=None) + resource_type: status_models.ResourceType = Query(default=None), + current_status: status_models.Status = Query(default=None), + capability: Capability | None = None, ) -> list[status_models.Resource]: pass @@ -75,6 +78,7 @@ async def get_incidents( time_ : datetime.datetime | None = None, modified_since : datetime.datetime | None = None, resource_id : str | None = None, + resolution: status_models.Resolution | None = None, ) -> list[status_models.Incident]: pass diff --git a/app/routers/status/models.py b/app/routers/status/models.py index b1f3a8b..15a9e36 100644 --- a/app/routers/status/models.py +++ b/app/routers/status/models.py @@ -1,7 +1,10 @@ import datetime import enum -from pydantic import BaseModel, computed_field, Field +from typing import Optional +from pydantic import BaseModel, computed_field, Field, HttpUrl from ... import config +from ..common import NamedObject + class Link(BaseModel): rel : str @@ -15,38 +18,6 @@ class Status(enum.Enum): unknown = "unknown" -class NamedResource(BaseModel): - id : str - name : str - description : str - last_modified : datetime.datetime - - - @staticmethod - def find_by_id(a, id, allow_name: bool|None=False): - # Find a resource by its id. - # If allow_name is True, the id parameter can also match the resource's name. - return next((r for r in a if r.id == id or (allow_name and r.name == id)), None) - - - @staticmethod - def find(a, name, description, modified_since): - def normalize(dt: datetime) -> datetime: - # Convert naive datetimes into UTC-aware versions - if dt.tzinfo is None: - return dt.replace(tzinfo=datetime.timezone.utc) - return dt - if name: - a = [aa for aa in a if aa.name == name] - if description: - a = [aa for aa in a if description in aa.description] - if modified_since: - if modified_since.tzinfo is None: - modified_since = modified_since.replace(tzinfo=datetime.timezone.utc) - a = [aa for aa in a if normalize(aa.last_modified) >= modified_since] - return a - - class ResourceType(enum.Enum): website = "website" service = "service" @@ -57,28 +28,27 @@ class ResourceType(enum.Enum): unknown = "unknown" -class Resource(NamedResource): +class Resource(NamedObject): + + def _self_path(self) -> str: + return f"/status/resources/{self.id}" + capability_ids: list[str] = Field(exclude=True) - group: str | None - current_status: Status | None = Field("The current status comes from the status of the last event for this resource") resource_type: ResourceType + group: str | None = Field(None, description="Group this resource belongs to") + current_status: Status | None = Field(None, description="The current status comes from the status of the last event for this resource") + located_at_uri: Optional[HttpUrl] = Field(None, description="Resource located at specific Site") - @computed_field(description="The url of this object") - @property - def self_uri(self) -> str: - return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/resources/{self.id}" - - @computed_field(description="The list of past events in this incident") + @computed_field(description="The list of capabilities in this resource") @property def capability_uris(self) -> list[str]: return [f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/account/capabilities/{e}" for e in self.capability_ids] - @staticmethod def find(resources, name, description, group, modified_since, resource_type): - a = NamedResource.find(resources, name, description, modified_since) + a = NamedObject.find(resources, name, description, modified_since) if group: a = [aa for aa in a if aa.group == group] if resource_type: @@ -86,25 +56,21 @@ def find(resources, name, description, group, modified_since, resource_type): return a -class Event(NamedResource): +class Event(NamedObject): + + def _self_path(self) -> str: + return f"/status/incidents/{self.incident_id}/events/{self.id}" + occurred_at : datetime.datetime status : Status resource_id : str = Field(exclude=True) incident_id : str | None = Field(exclude=True, default=None) - - @computed_field(description="The url of this object") - @property - def self_uri(self) -> str: - return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/incidents/{self.incident_id}/events/{self.id}" - - @computed_field(description="The resource belonging to this event") @property def resource_uri(self) -> str: return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/resources/{self.resource_id}" - @computed_field(description="The event's incident") @property def incident_uri(self) -> str|None: @@ -123,7 +89,7 @@ def find( time_ : datetime.datetime | None = None, modified_since : datetime.datetime | None = None, ) -> list: - events = NamedResource.find(events, name, description, modified_since) + events = NamedObject.find(events, name, description, modified_since) if resource_id: events = [e for e in events if e.resource_id == resource_id] if status: @@ -140,6 +106,7 @@ def find( class IncidentType(enum.Enum): planned = "planned" unplanned = "unplanned" + reservation = "reservation" class Resolution(enum.Enum): @@ -150,7 +117,12 @@ class Resolution(enum.Enum): pending = "pending" -class Incident(NamedResource): + +class Incident(NamedObject): + + def _self_path(self) -> str: + return f"/status/incidents/{self.id}" + status : Status resource_ids : list[str] = Field(exclude=True) event_ids : list[str] = Field(exclude=True) @@ -159,24 +131,17 @@ class Incident(NamedResource): type : IncidentType resolution : Resolution - - @computed_field(description="The url of this object") - @property - def self_uri(self) -> str: - return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/incidents/{self.id}" - - @computed_field(description="The list of past events in this incident") @property def event_uris(self) -> list[str]: return [f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/incidents/{self.id}/events/{e}" for e in self.event_ids] - @computed_field(description="The list of resources that may be impacted by this incident") @property def resource_uris(self) -> list[str]: return [f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/status/resources/{r}" for r in self.resource_ids] + @staticmethod def find( incidents : list, name : str | None = None, @@ -189,7 +154,7 @@ def find( modified_since : datetime.datetime | None = None, resource_id : str | None = None, ) -> list: - incidents = NamedResource.find(incidents, name, description, modified_since) + incidents = NamedObject.find(incidents, name, description, modified_since) if resource_id: incidents = [e for e in incidents if resource_id in e.resource_ids] if status: diff --git a/app/routers/status/status.py b/app/routers/status/status.py index 7bb0fd0..7f4fae6 100644 --- a/app/routers/status/status.py +++ b/app/routers/status/status.py @@ -1,7 +1,9 @@ +from typing import Optional, List, Annotated from fastapi import HTTPException, Request, Query, Depends from . import models, facility_adapter from .. import iri_router from ..error_handlers import DEFAULT_RESPONSES +from ..common import StrictDateTime, forbidExtraQueryParams, AllocationUnit router = iri_router.IriRouter( facility_adapter.FacilityAdapter, @@ -21,13 +23,15 @@ async def get_resources( name : str = Query(default=None, min_length=1), description : str = Query(default=None, min_length=1), group : str = Query(default=None, min_length=1), - offset : int = Query(default=0, ge=0), - limit : int = Query(default=100, le=1000), - modified_since: iri_router.StrictDateTime = Query(default=None), + offset : int = Query(default=0, ge=0, le=1000), + limit : int = Query(default=100, ge=0, le=1000), + modified_since: StrictDateTime = Query(default=None), resource_type: models.ResourceType = Query(default=None), - _forbid = Depends(iri_router.forbidExtraQueryParams("name", "description", "group", "offset", "limit", "modified_since", "resource_type")), + current_status: models.Status = Query(default=None), + capability: List[AllocationUnit] = Query(default=None, min_length=1), + _forbid = Depends(forbidExtraQueryParams("name", "description", "group", "offset", "limit", "modified_since", "resource_type", "current_status", "capability", multiParams={"capability"})), ) -> list[models.Resource]: - return await router.adapter.get_resources(offset, limit, name, description, group, modified_since, resource_type) + return await router.adapter.get_resources(offset, limit, name, description, group, modified_since, resource_type, current_status, capability) @router.get( @@ -47,7 +51,7 @@ async def get_resource( return item -@router.get( +@router.get( "/incidents", summary="Get all incidents without their events", description="Get a list of all incidents. Each incident will be returned without its events. You can optionally filter the returned list by specifying attributes.", @@ -60,16 +64,18 @@ async def get_incidents( description : str = Query(default=None, min_length=1), status : models.Status = Query(default=None), type_: models.IncidentType = Query(alias="type", default=None), - from_: iri_router.StrictDateTime = Query(alias="from", default=None), - time_ : iri_router.StrictDateTime = Query(alias="time", default=None), - to : iri_router.StrictDateTime = Query(default=None), - modified_since : iri_router.StrictDateTime = Query(default=None), + from_: StrictDateTime = Query(alias="from", default=None), + time_ : StrictDateTime = Query(alias="time", default=None), + to : StrictDateTime = Query(default=None), + modified_since : StrictDateTime = Query(default=None), resource_id : str = Query(default=None, min_length=1), - offset : int = Query(default=0, ge=0), - limit : int = Query(default=100, le=1000), - _forbid = Depends(iri_router.forbidExtraQueryParams("name", "description", "status", "type", "from", "to", "time", "modified_since", "resource_id", "offset", "limit")), + offset : int = Query(default=0, ge=0, le=1000), + limit : int = Query(default=100, ge=0, le=1000), + resolution : models.Resolution = Query(default=None), + _forbid = Depends(forbidExtraQueryParams("name", "description", "status", "type", "from", "to", "time", "modified_since", "resource_id", + "offset", "limit", "resolution", "resource_uris", "event_uris", multiParams={"resource_uris", "event_uris"})), ) -> list[models.Incident]: - return await router.adapter.get_incidents(offset, limit, name, description, status, type_, from_, to, time_, modified_since, resource_id) + return await router.adapter.get_incidents(offset, limit, name, description, status, type_, from_, to, time_, modified_since, resource_id, resolution) @router.get( @@ -104,13 +110,13 @@ async def get_events( name : str = Query(default=None, min_length=1), description : str = Query(default=None, min_length=1), status : models.Status = Query(default=None), - from_: iri_router.StrictDateTime = Query(alias="from", default=None), - time_ : iri_router.StrictDateTime = Query(alias="time", default=None), - to : iri_router.StrictDateTime = Query(default=None), - modified_since : iri_router.StrictDateTime = Query(default=None), - offset : int = Query(default=0, ge=0), - limit : int = Query(default=100, le=1000), - _forbid = Depends(iri_router.forbidExtraQueryParams("resource_id", "name", "description", "status", "from", "to", "time", "modified_since", "offset", "limit")), + from_: StrictDateTime = Query(alias="from", default=None), + time_ : StrictDateTime = Query(alias="time", default=None), + to : StrictDateTime = Query(default=None), + modified_since : StrictDateTime = Query(default=None), + offset : int = Query(default=0, ge=0, le=1000), + limit : int = Query(default=100, ge=0, le=1000), + _forbid = Depends(forbidExtraQueryParams("resource_id", "name", "description", "status", "from", "to", "time", "modified_since", "offset", "limit")), ) -> list[models.Event]: return await router.adapter.get_events(incident_id, offset, limit, resource_id, name, description, status, from_, to, time_, modified_since) diff --git a/app/routers/task/models.py b/app/routers/task/models.py index da3c5cc..cea9787 100644 --- a/app/routers/task/models.py +++ b/app/routers/task/models.py @@ -1,19 +1,18 @@ -from pydantic import BaseModel import enum +from pydantic import BaseModel class TaskStatus(str, enum.Enum): - pending = "pending" - active = "active" - completed = "completed" - failed = "failed" - canceled = "canceled" - + pending = "pending" + active = "active" + completed = "completed" + failed = "failed" + canceled = "canceled" class TaskCommand(BaseModel): - router: str - command: str - args: dict + router: str + command: str + args: dict class Task(BaseModel): diff --git a/pyproject.toml b/pyproject.toml index 03858a4..63f6c02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,13 @@ [project] name = "iri-api-python" version = "0.1.0" -requires-python = ">=3.12" +requires-python = ">=3.14,<3.15" dependencies = [ - "fastapi[standard]>=0.100.0", - "uvicorn[standard]>=0.22.0", - "humps>=0.2.2" -] + "fastapi[standard]>=0.128.0,<0.129.0", + "uvicorn[standard]>=0.40.0,<0.41.0", + "humps>=0.2.2,<0.3.0", + "opentelemetry-api>=1.39.1,<1.40.0", + "opentelemetry-sdk>=1.39.1,<1.40.0", + "opentelemetry-instrumentation-fastapi>=0.60b1,<0.61b0", + "opentelemetry-exporter-otlp>=1.39.1,<1.40.0" +] \ No newline at end of file