Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from datetime import datetime
from typing import Annotated
from uuid import uuid4

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse

from app.api.dependencies import admin_user
from app.core.correlation import CorrelationContext
from app.domain.enums import EventType, ExportFormat
from app.domain.events import EventFilter
from app.domain.replay import ReplayFilter
Expand Down Expand Up @@ -58,7 +58,6 @@ async def export_events(
service: FromDishka[AdminEventsService],
event_types: Annotated[list[EventType] | None, Query(description="Event types (repeat param for multiple)")] = None,
aggregate_id: Annotated[str | None, Query(description="Aggregate ID filter")] = None,
correlation_id: Annotated[str | None, Query(description="Correlation ID filter")] = None,
user_id: Annotated[str | None, Query(description="User ID filter")] = None,
service_name: Annotated[str | None, Query(description="Service name filter")] = None,
start_time: Annotated[datetime | None, Query(description="Start time")] = None,
Expand All @@ -69,7 +68,6 @@ async def export_events(
export_filter = EventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
service_name=service_name,
start_time=start_time,
Expand Down Expand Up @@ -105,11 +103,11 @@ async def replay_events(
request: EventReplayRequest, background_tasks: BackgroundTasks, service: FromDishka[AdminEventsService]
) -> EventReplayResponse:
"""Replay events by filter criteria, with optional dry-run mode."""
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
replay_id = f"replay-{uuid4().hex}"
result = await service.prepare_or_schedule_replay(
replay_filter=ReplayFilter.model_validate(request),
dry_run=request.dry_run,
replay_correlation_id=replay_correlation_id,
replay_id=replay_id,
target_service=request.target_service,
)

Expand Down
94 changes: 37 additions & 57 deletions backend/app/api/routes/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import timedelta

import structlog
from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Request, Response
Expand Down Expand Up @@ -40,18 +40,16 @@ async def login(
security_service: FromDishka[SecurityService],
runtime_settings: FromDishka[RuntimeSettingsLoader],
lockout_service: FromDishka[LoginLockoutService],
logger: FromDishka[logging.Logger],
logger: FromDishka[structlog.stdlib.BoundLogger],
form_data: OAuth2PasswordRequestForm = Depends(),
) -> LoginResponse:
"""Authenticate and receive session cookies."""
logger.info(
"Login attempt",
extra={
"username": form_data.username,
"client_ip": get_client_ip(request),
"endpoint": "/login",
"user_agent": request.headers.get("user-agent"),
},
username=form_data.username,
client_ip=get_client_ip(request),
endpoint="/login",
user_agent=request.headers.get("user-agent"),
)

if await lockout_service.check_locked(form_data.username):
Expand All @@ -65,11 +63,9 @@ async def login(
if not user:
logger.warning(
"Login failed - user not found",
extra={
"username": form_data.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
username=form_data.username,
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
)
locked = await lockout_service.record_failed_attempt(form_data.username)
if locked:
Expand All @@ -86,11 +82,9 @@ async def login(
if not security_service.verify_password(form_data.password, user.hashed_password):
logger.warning(
"Login failed - invalid password",
extra={
"username": form_data.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
username=form_data.username,
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
)
locked = await lockout_service.record_failed_attempt(form_data.username)
if locked:
Expand All @@ -111,12 +105,10 @@ async def login(

logger.info(
"Login successful",
extra={
"username": user.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
"token_expires_in_minutes": session_timeout,
},
username=user.username,
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
token_expires_in_minutes=session_timeout,
)

access_token_expires = timedelta(minutes=session_timeout)
Expand Down Expand Up @@ -169,17 +161,15 @@ async def register(
user_repo: FromDishka[UserRepository],
security_service: FromDishka[SecurityService],
runtime_settings: FromDishka[RuntimeSettingsLoader],
logger: FromDishka[logging.Logger],
logger: FromDishka[structlog.stdlib.BoundLogger],
) -> UserResponse:
"""Register a new user account."""
logger.info(
"Registration attempt",
extra={
"username": user.username,
"client_ip": get_client_ip(request),
"endpoint": "/register",
"user_agent": request.headers.get("user-agent"),
},
username=user.username,
client_ip=get_client_ip(request),
endpoint="/register",
user_agent=request.headers.get("user-agent"),
)

effective = await runtime_settings.get_effective_settings()
Expand All @@ -191,11 +181,9 @@ async def register(
if db_user:
logger.warning(
"Registration failed - username taken",
extra={
"username": user.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
username=user.username,
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
)
raise HTTPException(status_code=409, detail="Username already registered")

Expand All @@ -212,11 +200,9 @@ async def register(

logger.info(
"Registration successful",
extra={
"username": created_user.username,
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
username=created_user.username,
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
)

return UserResponse.model_validate(created_user)
Expand All @@ -227,18 +213,16 @@ async def get_current_user_profile(
request: Request,
response: Response,
auth_service: FromDishka[AuthService],
logger: FromDishka[logging.Logger],
logger: FromDishka[structlog.stdlib.BoundLogger],
) -> UserResponse:
"""Get the authenticated user's profile."""
current_user = await auth_service.get_current_user(request)

logger.info(
"User profile request",
extra={
"username": current_user.username,
"client_ip": get_client_ip(request),
"endpoint": "/me",
},
username=current_user.username,
client_ip=get_client_ip(request),
endpoint="/me",
)

# Set cache control headers
Expand All @@ -252,16 +236,14 @@ async def get_current_user_profile(
async def logout(
request: Request,
response: Response,
logger: FromDishka[logging.Logger],
logger: FromDishka[structlog.stdlib.BoundLogger],
) -> MessageResponse:
"""Log out and clear session cookies."""
logger.info(
"Logout attempt",
extra={
"client_ip": get_client_ip(request),
"endpoint": "/logout",
"user_agent": request.headers.get("user-agent"),
},
client_ip=get_client_ip(request),
endpoint="/logout",
user_agent=request.headers.get("user-agent"),
)

# Clear the httpOnly cookie
Expand All @@ -278,10 +260,8 @@ async def logout(

logger.info(
"Logout successful",
extra={
"client_ip": get_client_ip(request),
"user_agent": request.headers.get("user-agent"),
},
client_ip=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
)

return MessageResponse(message="Logout successful")
22 changes: 10 additions & 12 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute, inject
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request
from opentelemetry import trace

from app.api.dependencies import admin_user, current_user
from app.core.tracing import EventAttributes, add_span_attributes
from app.core.utils import get_client_ip
from app.domain.enums import EventType, ExecutionStatus, UserRole
from app.domain.events import DomainEvent
Expand Down Expand Up @@ -57,17 +57,15 @@ async def create_execution(
idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None,
) -> ExecutionResponse:
"""Submit a script for execution in an isolated Kubernetes pod."""
add_span_attributes(
**{
"http.method": "POST",
"http.route": "/api/v1/execute",
"execution.language": execution.lang,
"execution.language_version": execution.lang_version,
"execution.script_length": len(execution.script),
EventAttributes.USER_ID: current_user.user_id,
"client.address": get_client_ip(request),
}
)
trace.get_current_span().set_attributes({
"http.method": "POST",
"http.route": "/api/v1/execute",
"execution.language": execution.lang,
"execution.language_version": execution.lang_version,
"execution.script_length": len(execution.script),
"user.id": current_user.user_id,
"client.address": get_client_ip(request),
})

exec_result = await execution_service.execute_script_idempotent(
script=execution.script,
Expand Down
Loading
Loading