Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 7 additions & 36 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from app.api.dependencies import admin_user
from app.core.correlation import CorrelationContext
from app.domain.enums import EventType
from app.domain.enums import EventType, ExportFormat
from app.domain.events import EventFilter
from app.domain.replay import ReplayFilter
from app.domain.user import User
Expand All @@ -23,7 +23,6 @@
EventStatsResponse,
)
from app.schemas_pydantic.common import ErrorResponse
from app.schemas_pydantic.execution import ExecutionResult
from app.services.admin import AdminEventsService

router = APIRouter(
Expand Down Expand Up @@ -53,30 +52,9 @@ async def get_event_stats(
return EventStatsResponse.model_validate(stats)


@router.get("/export/csv")
async def export_events_csv(
service: FromDishka[AdminEventsService],
event_types: Annotated[list[EventType] | None, Query(description="Event types (repeat param for multiple)")] = None,
start_time: Annotated[datetime | None, Query(description="Start time")] = None,
end_time: Annotated[datetime | None, Query(description="End time")] = None,
limit: Annotated[int, Query(ge=1, le=50000)] = 10000,
) -> StreamingResponse:
"""Export filtered events as a downloadable CSV file."""
export_filter = EventFilter(
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_csv_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)


@router.get("/export/json")
async def export_events_json(
@router.get("/export/{export_format}")
async def export_events(
export_format: ExportFormat,
service: FromDishka[AdminEventsService],
event_types: Annotated[list[EventType] | None, Query(description="Event types (repeat param for multiple)")] = None,
aggregate_id: Annotated[str | None, Query(description="Aggregate ID filter")] = None,
Expand All @@ -87,7 +65,7 @@ async def export_events_json(
end_time: Annotated[datetime | None, Query(description="End time")] = None,
limit: Annotated[int, Query(ge=1, le=50000)] = 10000,
) -> StreamingResponse:
"""Export events as JSON with comprehensive filtering."""
"""Export filtered events as a downloadable file."""
export_filter = EventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
Expand All @@ -97,7 +75,7 @@ async def export_events_json(
start_time=start_time,
end_time=end_time,
)
result = await service.export_events_json_content(event_filter=export_filter, limit=limit)
result = await service.export_events(event_filter=export_filter, limit=limit, export_format=export_format)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
Expand Down Expand Up @@ -152,14 +130,7 @@ async def get_replay_status(session_id: str, service: FromDishka[AdminEventsServ
if not status:
raise HTTPException(status_code=404, detail="Replay session not found")

return EventReplayStatusResponse(
**status.session.model_dump(),
estimated_completion=status.estimated_completion,
execution_results=[
ExecutionResult.model_validate(er)
for er in status.execution_results
],
)
return EventReplayStatusResponse.model_validate(status)


@router.delete("/{event_id}", responses={404: {"model": ErrorResponse, "description": "Event not found"}})
Expand Down
10 changes: 3 additions & 7 deletions backend/app/api/routes/admin/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def get_system_settings(
service: FromDishka[AdminSettingsService],
) -> SystemSettingsSchema:
"""Get the current system-wide settings."""
result = await service.get_system_settings(admin.username)
result = await service.get_system_settings(admin.user_id)
return SystemSettingsSchema.model_validate(result)


Expand All @@ -46,11 +46,7 @@ async def update_system_settings(
) -> SystemSettingsSchema:
"""Replace system-wide settings."""
domain_settings = SystemSettings.model_validate(settings)
result = await service.update_system_settings(
domain_settings,
updated_by=admin.username,
user_id=admin.user_id,
)
result = await service.update_system_settings(domain_settings, admin.user_id)
return SystemSettingsSchema.model_validate(result)


Expand All @@ -64,5 +60,5 @@ async def reset_system_settings(
service: FromDishka[AdminSettingsService],
) -> SystemSettingsSchema:
"""Reset system-wide settings to defaults."""
result = await service.reset_system_settings(admin.username, admin.user_id)
result = await service.reset_system_settings(admin.user_id)
return SystemSettingsSchema.model_validate(result)
28 changes: 12 additions & 16 deletions backend/app/api/routes/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from app.api.dependencies import admin_user
from app.db.repositories import AdminUserRepository
from app.domain.enums import UserRole
from app.domain.rate_limit import RateLimitRule, UserRateLimit
from app.domain.rate_limit import UserRateLimitUpdate
from app.domain.user import User
from app.domain.user import UserUpdate as DomainUserUpdate
from app.schemas_pydantic.admin_user_overview import AdminUserOverview
Expand Down Expand Up @@ -44,7 +44,7 @@ async def list_users(
) -> UserListResponse:
"""List all users with optional search and role filtering."""
result = await admin_user_service.list_users(
admin_username=admin.username,
admin_user_id=admin.user_id,
limit=limit,
offset=offset,
search=search,
Expand All @@ -64,7 +64,7 @@ async def create_user(
admin_user_service: FromDishka[AdminUserService],
) -> UserResponse:
"""Create a new user (admin only)."""
domain_user = await admin_user_service.create_user(admin_username=admin.username, user_data=user_data)
domain_user = await admin_user_service.create_user(admin_user_id=admin.user_id, user_data=user_data)
return UserResponse.model_validate(domain_user)


Expand All @@ -79,7 +79,7 @@ async def get_user(
admin_user_service: FromDishka[AdminUserService],
) -> UserResponse:
"""Get a user by ID."""
user = await admin_user_service.get_user(admin_username=admin.username, user_id=user_id)
user = await admin_user_service.get_user(admin_user_id=admin.user_id, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")

Expand Down Expand Up @@ -125,7 +125,7 @@ async def update_user(
domain_update = DomainUserUpdate.model_validate(user_update)

updated_user = await admin_user_service.update_user(
admin_username=admin.username, user_id=user_id, update=domain_update
admin_user_id=admin.user_id, user_id=user_id, update=domain_update
)
if not updated_user:
raise HTTPException(status_code=500, detail="Failed to update user")
Expand All @@ -150,7 +150,7 @@ async def delete_user(
raise HTTPException(status_code=400, detail="Cannot delete your own account")

result = await admin_user_service.delete_user(
admin_username=admin.username, user_id=user_id, cascade=cascade
admin_user_id=admin.user_id, user_id=user_id, cascade=cascade
)
return DeleteUserResponse.model_validate(result)

Expand All @@ -168,7 +168,7 @@ async def reset_user_password(
) -> MessageResponse:
"""Reset a user's password."""
success = await admin_user_service.reset_user_password(
admin_username=admin.username, user_id=user_id, new_password=password_request.new_password
admin_user_id=admin.user_id, user_id=user_id, new_password=password_request.new_password
)
if not success:
raise HTTPException(status_code=500, detail="Failed to reset password")
Expand All @@ -182,7 +182,7 @@ async def get_user_rate_limits(
user_id: str,
) -> UserRateLimitsResponse:
"""Get rate limit configuration for a user."""
result = await admin_user_service.get_user_rate_limits(admin_username=admin.username, user_id=user_id)
result = await admin_user_service.get_user_rate_limits(admin_user_id=admin.user_id, user_id=user_id)
return UserRateLimitsResponse.model_validate(result)


Expand All @@ -194,13 +194,9 @@ async def update_user_rate_limits(
request: RateLimitUpdateRequest,
) -> RateLimitUpdateResponse:
"""Update rate limit rules for a user."""
config = UserRateLimit(
user_id=user_id,
rules=[RateLimitRule(**r.model_dump()) for r in request.rules],
**request.model_dump(exclude={"rules"}),
)
update = UserRateLimitUpdate.model_validate(request)
result = await admin_user_service.update_user_rate_limits(
admin_username=admin.username, user_id=user_id, config=config
admin_user_id=admin.user_id, user_id=user_id, update=update
)
return RateLimitUpdateResponse.model_validate(result)

Expand All @@ -212,7 +208,7 @@ async def reset_user_rate_limits(
user_id: str,
) -> MessageResponse:
"""Reset a user's rate limits to defaults."""
await admin_user_service.reset_user_rate_limits(admin_username=admin.username, user_id=user_id)
await admin_user_service.reset_user_rate_limits(admin_user_id=admin.user_id, user_id=user_id)
return MessageResponse(message=f"Rate limits reset successfully for user {user_id}")


Expand All @@ -228,7 +224,7 @@ async def unlock_user(
lockout_service: FromDishka[LoginLockoutService],
) -> UnlockResponse:
"""Unlock a user account that was locked due to failed login attempts."""
user = await admin_user_service.get_user(admin_username=admin.username, user_id=user_id)
user = await admin_user_service.get_user(admin_user_id=admin.user_id, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
await lockout_service.unlock_user(user.username)
Expand Down
6 changes: 1 addition & 5 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from app.schemas_pydantic.dlq import (
DLQBatchRetryResponse,
DLQMessageDetail,
DLQMessageResponse,
DLQMessagesResponse,
DLQTopicSummaryResponse,
ManualRetryRequest,
Expand All @@ -41,10 +40,7 @@ async def get_dlq_messages(
status=status, topic=topic, event_type=event_type, limit=limit, offset=offset
)

# Convert domain messages to response models using model_validate
messages = [DLQMessageResponse.model_validate(msg) for msg in result.messages]

return DLQMessagesResponse(messages=messages, total=result.total, offset=result.offset, limit=result.limit)
return DLQMessagesResponse.model_validate(result)


@router.get(
Expand Down
68 changes: 25 additions & 43 deletions backend/app/api/routes/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,29 @@

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Query, Request, Response
from fastapi import APIRouter, Depends, Query, Response

from app.api.dependencies import current_user
from app.domain.enums import NotificationChannel, NotificationStatus
from app.domain.notification import DomainSubscriptionUpdate
from app.domain.user import User
from app.schemas_pydantic.notification import (
DeleteNotificationResponse,
NotificationListResponse,
NotificationResponse,
NotificationSubscription,
SubscriptionsResponse,
SubscriptionUpdate,
UnreadCountResponse,
)
from app.services.auth_service import AuthService
from app.services.notification_service import NotificationService

router = APIRouter(prefix="/notifications", tags=["notifications"], route_class=DishkaRoute)


@router.get("", response_model=NotificationListResponse)
async def get_notifications(
request: Request,
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
auth_service: FromDishka[AuthService],
status: Annotated[NotificationStatus | None, Query()] = None,
include_tags: Annotated[list[str] | None, Query(description="Only notifications with any of these tags")] = None,
exclude_tags: Annotated[list[str] | None, Query(description="Exclude notifications with any of these tags")] = None,
Expand All @@ -35,100 +35,82 @@ async def get_notifications(
offset: Annotated[int, Query(ge=0)] = 0,
) -> NotificationListResponse:
"""List notifications for the authenticated user."""
current_user = await auth_service.get_current_user(request)
result = await notification_service.list_notifications(
user_id=current_user.user_id,
user_id=user.user_id,
status=status,
limit=limit,
offset=offset,
include_tags=include_tags,
exclude_tags=exclude_tags,
tag_prefix=tag_prefix,
)
return NotificationListResponse(
notifications=[NotificationResponse.model_validate(n) for n in result.notifications],
total=result.total,
unread_count=result.unread_count,
)
return NotificationListResponse.model_validate(result)


@router.put("/{notification_id}/read", status_code=204)
async def mark_notification_read(
notification_id: str,
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
request: Request,
auth_service: FromDishka[AuthService],
) -> Response:
"""Mark a single notification as read."""
current_user = await auth_service.get_current_user(request)
await notification_service.mark_as_read(notification_id=notification_id, user_id=current_user.user_id)
await notification_service.mark_as_read(notification_id=notification_id, user_id=user.user_id)
return Response(status_code=204)


@router.post("/mark-all-read", status_code=204)
async def mark_all_read(
notification_service: FromDishka[NotificationService], request: Request, auth_service: FromDishka[AuthService]
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
) -> Response:
"""Mark all notifications as read."""
current_user = await auth_service.get_current_user(request)
await notification_service.mark_all_as_read(current_user.user_id)
await notification_service.mark_all_as_read(user.user_id)
return Response(status_code=204)


@router.get("/subscriptions", response_model=SubscriptionsResponse)
async def get_subscriptions(
notification_service: FromDishka[NotificationService], request: Request, auth_service: FromDishka[AuthService]
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
) -> SubscriptionsResponse:
"""Get all notification channel subscriptions for the authenticated user."""
current_user = await auth_service.get_current_user(request)
subscriptions_dict = await notification_service.get_subscriptions(current_user.user_id)
return SubscriptionsResponse(
subscriptions=[NotificationSubscription.model_validate(s) for s in subscriptions_dict.values()]
)
result = await notification_service.get_subscriptions(user.user_id)
return SubscriptionsResponse.model_validate(result)


@router.put("/subscriptions/{channel}", response_model=NotificationSubscription)
async def update_subscription(
channel: NotificationChannel,
subscription: SubscriptionUpdate,
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
request: Request,
auth_service: FromDishka[AuthService],
) -> NotificationSubscription:
"""Update subscription settings for a notification channel."""
current_user = await auth_service.get_current_user(request)
update_data = DomainSubscriptionUpdate.model_validate(subscription)
updated_sub = await notification_service.update_subscription(
user_id=current_user.user_id,
user_id=user.user_id,
channel=channel,
enabled=subscription.enabled,
webhook_url=subscription.webhook_url,
slack_webhook=subscription.slack_webhook,
severities=subscription.severities,
include_tags=subscription.include_tags,
exclude_tags=subscription.exclude_tags,
update_data=update_data,
)
return NotificationSubscription.model_validate(updated_sub)


@router.get("/unread-count", response_model=UnreadCountResponse)
async def get_unread_count(
notification_service: FromDishka[NotificationService], request: Request, auth_service: FromDishka[AuthService]
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
) -> UnreadCountResponse:
"""Get the count of unread notifications."""
current_user = await auth_service.get_current_user(request)
count = await notification_service.get_unread_count(current_user.user_id)

count = await notification_service.get_unread_count(user.user_id)
return UnreadCountResponse(unread_count=count)


@router.delete("/{notification_id}", response_model=DeleteNotificationResponse)
async def delete_notification(
notification_id: str,
user: Annotated[User, Depends(current_user)],
notification_service: FromDishka[NotificationService],
request: Request,
auth_service: FromDishka[AuthService],
) -> DeleteNotificationResponse:
"""Delete a notification."""
current_user = await auth_service.get_current_user(request)
await notification_service.delete_notification(user_id=current_user.user_id, notification_id=notification_id)
await notification_service.delete_notification(user_id=user.user_id, notification_id=notification_id)
return DeleteNotificationResponse(message="Notification deleted")
Loading
Loading