diff --git a/api_schemas/csv_schemas/base_csv_schema.py b/api_schemas/csv_schemas/base_csv_schema.py new file mode 100644 index 00000000..5e6fe731 --- /dev/null +++ b/api_schemas/csv_schemas/base_csv_schema.py @@ -0,0 +1,14 @@ +from typing import Annotated + +from pydantic import AliasChoices, AliasPath, BaseModel, ConfigDict, Field + + +class BaseCsvSchema(BaseModel): + model_config = ConfigDict(from_attributes=True) + + __column_order__: Annotated[list[str] | None, Field(exclude=True)] = None + + +def CsvField(name: str | None = None, from_path: str | AliasPath | AliasChoices | None = None, exclude: bool = False): + """Wrapper for `Field` with `serialization_alias`, `validation_alias` and `exclude`. Can be exchanged for normal `Field` when more advanced settings are necessary.""" + return Field(serialization_alias=name, validation_alias=from_path, exclude=exclude) diff --git a/api_schemas/csv_schemas/candidate_csv_schema.py b/api_schemas/csv_schemas/candidate_csv_schema.py new file mode 100644 index 00000000..9197a1f5 --- /dev/null +++ b/api_schemas/csv_schemas/candidate_csv_schema.py @@ -0,0 +1,18 @@ +from pydantic import AliasPath, computed_field +from api_schemas.csv_schemas.base_csv_schema import BaseCsvSchema, CsvField + + +class CandidatesCsvSchema(BaseCsvSchema): + __column_order__ = ["stil_id", "last_name", "first_name", "name", "email", "post_name", "council_name"] + + first_name: str = CsvField("Förnamn", from_path=AliasPath("candidate", "user", "first_name")) + last_name: str = CsvField("Efternamn", from_path=AliasPath("candidate", "user", "last_name")) + stil_id: str | None = CsvField("Stil-ID", from_path=AliasPath("candidate", "user", "stil_id")) + email: str = CsvField("E-post", from_path=AliasPath("candidate", "user", "email")) + post_name: str = CsvField("Post", from_path=AliasPath("post", "name_sv")) + council_name: str = CsvField("Utskott", from_path=AliasPath("post", "council", "name_sv")) + + @computed_field(alias="Namn", return_type=str) + @property + def name(self): + return f"{self.first_name} {self.last_name}" diff --git a/api_schemas/csv_schemas/event_user_csv_schema.py b/api_schemas/csv_schemas/event_user_csv_schema.py new file mode 100644 index 00000000..ecb6182f --- /dev/null +++ b/api_schemas/csv_schemas/event_user_csv_schema.py @@ -0,0 +1,58 @@ +from pydantic import AliasPath, computed_field, field_validator, validator +from api_schemas.csv_schemas.base_csv_schema import BaseCsvSchema, CsvField +from helpers.types import DRINK_PACKAGES + + +class EventUserCsvSchema(BaseCsvSchema): + __column_order__ = [ + "stil_id", + "last_name", + "first_name", + "name", + "email", + "food_prefs", + "drinkPackage", + "group_name", + "priority", + ] + + first_name: str = CsvField("Förnamn", from_path=AliasPath("user", "first_name")) + last_name: str = CsvField("Efternamn", from_path=AliasPath("user", "last_name")) + stil_id: str | None = CsvField("Stil-ID", from_path=AliasPath("user", "stil_id")) + email: str = CsvField("E-post", from_path=AliasPath("user", "email")) + standard_food_preferences: list[str] | None = CsvField( + from_path=AliasPath("user", "standard_food_preferences"), exclude=True + ) + other_food_preferences: str | None = CsvField(from_path=AliasPath("user", "other_food_preferences"), exclude=True) + drinkPackage: str = CsvField("Dryckespaket") + group_name: str | None = CsvField("Grupp") + priority: str = CsvField("Prioritet") + + @computed_field(alias="Namn", return_type=str) + @property + def name(self): + return f"{self.first_name} {self.last_name}" + + @computed_field(alias="Matpreferens", return_type=str) + @property + def food_prefs(self): + res: list[str] = [] + if self.standard_food_preferences: + res += self.standard_food_preferences + if self.other_food_preferences: + res.append(self.other_food_preferences) + + return ", ".join(res) + + @field_validator("drinkPackage", mode="before") + @classmethod + def validate_drink_package(cls, value: DRINK_PACKAGES) -> str: + match value: + case "Alcohol": + return "Alkohol" + case "AlcoholFree": + return "Alkoholfritt" + case "None": + return "Inget" + case _: + return value diff --git a/helpers/csv_response_factory.py b/helpers/csv_response_factory.py new file mode 100644 index 00000000..6d397e12 --- /dev/null +++ b/helpers/csv_response_factory.py @@ -0,0 +1,56 @@ +from io import StringIO +from typing import Generic, TypeVar + +from fastapi.responses import StreamingResponse +import pandas as pd + +from api_schemas.csv_schemas.base_csv_schema import BaseCsvSchema + + +T = TypeVar("T", bound=BaseCsvSchema) + + +class CsvResponseFactory(Generic[T]): + def __init__(self, none_str: str = "") -> None: + self.__columns: dict[str, list[str]] = {} + self.__headers_initialized = False + self.none_str = none_str + + def append(self, row: T) -> None: + if not self.__headers_initialized: + self.__initialize_headers(row) + + dump = row.model_dump(by_alias=True) + for k in self.__columns.keys(): + self.__columns[k].append(str(dump.get(k) or self.none_str)) + + def __initialize_headers(self, row: T) -> None: + model_fields = { + k: v.serialization_alias or v.alias or k + for k, v in filter(lambda t: not t[1].exclude, row.model_fields.items()) + } + model_computed_fields = {k: v.alias or k for k, v in row.model_computed_fields.items()} + mappings = {**model_fields, **model_computed_fields} + + order = row.__column_order__ + if not order: + order = list(mappings.keys()) + + for c in order: + if c not in mappings: + continue + self.__columns[mappings[c]] = [] + + self.__headers_initialized = True + + def to_csv(self) -> StringIO: + df = pd.DataFrame(data=self.__columns) + csv_file = StringIO() + df.to_csv(csv_file, index=False) + return csv_file + + def to_response(self, filename: str = "data.csv") -> StreamingResponse: + csv_file = self.to_csv() + response = StreamingResponse(iter([csv_file.getvalue()]), media_type="text/csv") + response.headers["Content-Disposition"] = f"attachment; filename={filename}" + return response diff --git a/routes/candidate_router.py b/routes/candidate_router.py index a99a8369..d58ef1c5 100644 --- a/routes/candidate_router.py +++ b/routes/candidate_router.py @@ -1,8 +1,10 @@ from typing import Annotated -from fastapi import APIRouter, HTTPException, status +from fastapi import APIRouter, HTTPException, Response, status from datetime import datetime, timezone + from api_schemas.candidate_schema import CandidateRead, CandidatePostRead +from api_schemas.csv_schemas.candidate_csv_schema import CandidatesCsvSchema from database import DB_dependency from db_models.candidate_model import Candidate_DB from db_models.candidate_post_model import Candidation_DB @@ -10,6 +12,7 @@ from db_models.sub_election_model import SubElection_DB from db_models.election_post_model import ElectionPost_DB from db_models.user_model import User_DB +from helpers.csv_response_factory import CsvResponseFactory from user.permission import Permission @@ -42,6 +45,21 @@ def get_all_sub_election_candidates(sub_election_id: int, db: DB_dependency): return candidates +@candidate_router.get("/sub-election/{sub_election_id}/csv", dependencies=[Permission.require("view", "Election")]) +def get_all_sub_election_candidations_csv(sub_election_id: int, db: DB_dependency): + sub_election = db.query(SubElection_DB).filter(SubElection_DB.sub_election_id == sub_election_id).one_or_none() + if sub_election is None: + raise HTTPException(404, detail="Sub election not found") + + factory: CsvResponseFactory[CandidatesCsvSchema] = CsvResponseFactory() + + for c in sub_election.candidations: + row = CandidatesCsvSchema.model_validate(c) + factory.append(row) + + return factory.to_response("candidations.csv") + + @candidate_router.get( "/my-candidations/{election_id}", response_model=list[CandidatePostRead], dependencies=[Permission.member()] ) diff --git a/routes/event_router.py b/routes/event_router.py index 5bbdd198..dcca34f9 100644 --- a/routes/event_router.py +++ b/routes/event_router.py @@ -1,9 +1,9 @@ from datetime import datetime -from io import StringIO import os from fastapi import APIRouter, File, HTTPException, Response, UploadFile, status -from fastapi.responses import FileResponse, StreamingResponse +from fastapi.responses import FileResponse from psycopg import IntegrityError +from api_schemas.csv_schemas.event_user_csv_schema import EventUserCsvSchema from api_schemas.event_signup_schemas import EventSignupRead from api_schemas.tag_schema import EventTagRead from database import DB_dependency @@ -12,6 +12,7 @@ from db_models.event_user_model import EventUser_DB from db_models.user_model import User_DB from db_models.event_tag_model import EventTag_DB +from helpers.csv_response_factory import CsvResponseFactory from helpers.image_checker import validate_image from db_models.post_model import Post_DB from services.event_service import create_new_event, delete_event, update_event @@ -21,8 +22,6 @@ from pathlib import Path -import pandas as pd - event_router = APIRouter() @@ -320,7 +319,7 @@ def get_event_tags(db: DB_dependency, event_id: int): return event_tags -@event_router.get("/get-event-csv/{event_id}", dependencies=[Permission.require("manage", "Event")]) +@event_router.get("/event-signups/confirmed/{event_id}/csv", dependencies=[Permission.require("manage", "Event")]) def get_event_csv(db: DB_dependency, event_id: int): event = db.query(Event_DB).filter(Event_DB.id == event_id).one_or_none() @@ -330,46 +329,11 @@ def get_event_csv(db: DB_dependency, event_id: int): event_users = event.event_users event_users.sort(key=lambda e_user: e_user.user.last_name) - names: list[str] = [] - telephone_numbers: list[str] = [] - email_addresses: list[str] = [] - food_preferences: list[str] = [] - drink_packages: list[str] = [] - groups: list[str] = [] - priorities: list[str] = [] + factory: CsvResponseFactory[EventUserCsvSchema] = CsvResponseFactory() for event_user in event_users: if event_user.confirmed_status is True: - user = event_user.user - names.append(f"{user.first_name} {user.last_name}") - telephone_numbers.append(user.telephone_number) - email_addresses.append(user.email) - if user.standard_food_preferences and user.other_food_preferences: - user_food_prefs = ", ".join(user.standard_food_preferences) + ", " + user.other_food_preferences - elif user.standard_food_preferences: - user_food_prefs = ", ".join(user.standard_food_preferences) - elif user.other_food_preferences: - user_food_prefs = user.other_food_preferences - else: - user_food_prefs = "" - food_preferences.append(user_food_prefs) - drink_packages.append(event_user.drinkPackage or "None") - groups.append(event_user.group_name or "") - priorities.append(event_user.priority) - - d = { - "Namn": names, - "Telefonnummer": telephone_numbers, - "E-post": email_addresses, - "Matpreferens": food_preferences, - "Dryckespaket": drink_packages, - "Grupp": groups, - "Prioritet": priorities, - } - - df = pd.DataFrame(data=d) - csv_file = StringIO() - df.to_csv(csv_file, index=False) - response = StreamingResponse(iter([csv_file.getvalue()]), media_type="text/csv") - response.headers["Content-Disposition"] = "attachment; filename=event.csv" - return response + row = EventUserCsvSchema.model_validate(event_user) + factory.append(row) + + return factory.to_response("event.csv")