diff --git a/bats_ai/core/tasks/nabat/nabat_data_retrieval.py b/bats_ai/core/tasks/nabat/nabat_data_retrieval.py index 0d38cbf4..d507bc64 100644 --- a/bats_ai/core/tasks/nabat/nabat_data_retrieval.py +++ b/bats_ai/core/tasks/nabat/nabat_data_retrieval.py @@ -172,53 +172,45 @@ def nabat_recording_initialize(self, recording_id: int, survey_event_id: int, ap def create_nabat_recording_from_response(response_data, recording_id, survey_event_id): - try: - # Extract the batch data from the response - nabat_recording_data = response_data["data"] - - # Optional fields - recording_location_data = nabat_recording_data["surveyEventById"][ - "eventGeometryByEventGeometryId" - ]["geom"]["geojson"] - file_name = nabat_recording_data["acousticFileById"]["fileName"] - - # Create geometry for the recording location if available - if recording_location_data: - coordinates = recording_location_data.get("coordinates", []) - recording_location = ( - Point(coordinates[0], coordinates[1]) if len(coordinates) == 2 else None - ) - else: - recording_location = None - - # Create the NABatRecording instance - nabat_recording = NABatRecording.objects.create( - recording_id=recording_id, - survey_event_id=survey_event_id, - name=file_name, - recording_location=recording_location, + # Extract the batch data from the response + nabat_recording_data = response_data["data"] + + # Optional fields + recording_location_data = nabat_recording_data["surveyEventById"][ + "eventGeometryByEventGeometryId" + ]["geom"]["geojson"] + file_name = nabat_recording_data["acousticFileById"]["fileName"] + + # Create geometry for the recording location if available + if recording_location_data: + coordinates = recording_location_data.get("coordinates", []) + recording_location = ( + Point(coordinates[0], coordinates[1]) if len(coordinates) == 2 else None ) + else: + recording_location = None + + # Create the NABatRecording instance + nabat_recording = NABatRecording.objects.create( + recording_id=recording_id, + survey_event_id=survey_event_id, + name=file_name, + recording_location=recording_location, + ) - acoustic_batches_nodes = nabat_recording_data["surveyEventById"][ - "acousticBatchesBySurveyEventId" - ]["nodes"] - if len(acoustic_batches_nodes) > 0: - batch_data = acoustic_batches_nodes[0]["acousticFileBatchesByBatchId"]["nodes"] - for node in batch_data: - species_id = node.get("manualId", False) - if species_id is not False: - annotation = NABatRecordingAnnotation.objects.create( - nabat_recording=nabat_recording, - user_email=node["vetter"], - ) - species = Species.objects.get(pk=species_id) - annotation.species.add(species) - - return nabat_recording - - except KeyError: - logger.exception("Missing key") - raise - except Exception: - logger.exception("Error creating NABatRecording") - raise + acoustic_batches_nodes = nabat_recording_data["surveyEventById"][ + "acousticBatchesBySurveyEventId" + ]["nodes"] + if len(acoustic_batches_nodes) > 0: + batch_data = acoustic_batches_nodes[0]["acousticFileBatchesByBatchId"]["nodes"] + for node in batch_data: + species_id = node.get("manualId", False) + if species_id is not False: + annotation = NABatRecordingAnnotation.objects.create( + nabat_recording=nabat_recording, + user_email=node["vetter"], + ) + species = Species.objects.get(pk=species_id) + annotation.species.add(species) + + return nabat_recording diff --git a/bats_ai/core/views/grts_cells.py b/bats_ai/core/views/grts_cells.py index 02337ad2..417bd739 100644 --- a/bats_ai/core/views/grts_cells.py +++ b/bats_ai/core/views/grts_cells.py @@ -2,6 +2,7 @@ from django.contrib.gis.geos import Point, Polygon from django.http import HttpRequest, JsonResponse +from django.shortcuts import get_list_or_404 from ninja import Query from ninja.pagination import RouterPaginated @@ -34,88 +35,82 @@ def get_grid_cell_id( @router.get("/{pk}") def get_cell_center(request: HttpRequest, pk: int, quadrant: str | None = None): - try: - cells = GRTSCells.objects.filter(grts_cell_id=pk) + cells = get_list_or_404(GRTSCells, grts_cell_id=pk) - # Define a custom order for sample_frame_id - custom_order = GRTSCells.sort_order() # Define your custom order here + # Define a custom order for sample_frame_id + custom_order = GRTSCells.sort_order() # Define your custom order here - # Define a custom key function to sort cells based on the custom order - def custom_sort_key(cell): - return custom_order.index(cell.sample_frame_id) + # Define a custom key function to sort cells based on the custom order + def custom_sort_key(cell): + return custom_order.index(cell.sample_frame_id) - # Sort the cells queryset based on the custom order - sorted_cells = sorted(cells, key=custom_sort_key) - cell = sorted_cells[0] - geom_4326 = cell.geom_4326 + # Sort the cells queryset based on the custom order + sorted_cells = sorted(cells, key=custom_sort_key) + cell = sorted_cells[0] + geom_4326 = cell.geom_4326 - # Get the centroid of the entire cell polygon - center = geom_4326.centroid + # Get the centroid of the entire cell polygon + center = geom_4326.centroid - if quadrant: - # If quadrant is specified, divide the cell polygon into quadrants - min_x, min_y, max_x, max_y = geom_4326.extent - mid_x = (min_x + max_x) / 2 - mid_y = (min_y + max_y) / 2 + if quadrant: + # If quadrant is specified, divide the cell polygon into quadrants + min_x, min_y, max_x, max_y = geom_4326.extent + mid_x = (min_x + max_x) / 2 + mid_y = (min_y + max_y) / 2 - # Determine the bounding box coordinates of the specified quadrant - if quadrant.upper() == "NW": - bbox = (min_x, mid_y, mid_x, max_y) - elif quadrant.upper() == "SE": - bbox = (mid_x, min_y, max_x, mid_y) - elif quadrant.upper() == "SW": - bbox = (min_x, min_y, mid_x, mid_y) - elif quadrant.upper() == "NE": - bbox = (mid_x, mid_y, max_x, max_y) + # Determine the bounding box coordinates of the specified quadrant + if quadrant.upper() == "NW": + bbox = (min_x, mid_y, mid_x, max_y) + elif quadrant.upper() == "SE": + bbox = (mid_x, min_y, max_x, mid_y) + elif quadrant.upper() == "SW": + bbox = (min_x, min_y, mid_x, mid_y) + elif quadrant.upper() == "NE": + bbox = (mid_x, mid_y, max_x, max_y) - quadrant_polygon = Polygon.from_bbox(bbox) + quadrant_polygon = Polygon.from_bbox(bbox) - # Intersect the cell polygon with the specified quadrant's polygon - quadrant_polygon = geom_4326.intersection(quadrant_polygon) + # Intersect the cell polygon with the specified quadrant's polygon + quadrant_polygon = geom_4326.intersection(quadrant_polygon) - # Get the centroid of the intersected polygon - center = quadrant_polygon.centroid + # Get the centroid of the intersected polygon + center = quadrant_polygon.centroid - # Get the latitude and longitude of the centroid - center_latitude = center.y - center_longitude = center.x + # Get the latitude and longitude of the centroid + center_latitude = center.y + center_longitude = center.x - return JsonResponse({"latitude": center_latitude, "longitude": center_longitude}) - except GRTSCells.DoesNotExist: - return JsonResponse({"error": f"Cell with cellId={pk} does not exist"}, status=200) + return JsonResponse({"latitude": center_latitude, "longitude": center_longitude}) @router.get("/{pk}/bbox") def get_grts_cell_bbox(request: HttpRequest, pk: int): - try: - cells = GRTSCells.objects.filter(grts_cell_id=pk) - custom_order = GRTSCells.sort_order() - - def custom_sort_key(cell): - return custom_order.index(cell.sample_frame_id) - - sorted_cells = sorted(cells, key=custom_sort_key) - cell = sorted_cells[0] - geom = cell.geom_4326 - - min_x, min_y, max_x, max_y = geom.extent - - geojson = { - "type": "Feature", - "geometry": { - "type": "Polygon", - "coordinates": [ - [min_x, min_y], - [min_x, max_y], - [max_x, max_y], - [max_x, min_y], - ], - }, - "properties": { - "grts_cell_id": pk, - "annotationType": "rectangle", - }, - } - return JsonResponse(geojson) - except (GRTSCells.DoesNotExist, IndexError): - return JsonResponse({"error": f"Cell with id {pk} does not exist"}, status=200) + cells = get_list_or_404(GRTSCells, grts_cell_id=pk) + custom_order = GRTSCells.sort_order() + + def custom_sort_key(cell): + return custom_order.index(cell.sample_frame_id) + + sorted_cells = sorted(cells, key=custom_sort_key) + cell = sorted_cells[0] + geom = cell.geom_4326 + + min_x, min_y, max_x, max_y = geom.extent + + geojson = { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [ + [min_x, min_y], + [min_x, max_y], + [max_x, max_y], + [max_x, min_y], + ], + }, + "properties": { + "grts_cell_id": pk, + "annotationType": "rectangle", + }, + } + return JsonResponse(geojson) diff --git a/bats_ai/core/views/nabat/nabat_configuration.py b/bats_ai/core/views/nabat/nabat_configuration.py index 50344774..ff54c989 100644 --- a/bats_ai/core/views/nabat/nabat_configuration.py +++ b/bats_ai/core/views/nabat/nabat_configuration.py @@ -10,6 +10,7 @@ from django.db import transaction from django.db.models import Count from django.http import HttpRequest, JsonResponse +from django.shortcuts import get_object_or_404 from django.utils.timezone import now from ninja import Query, Router, Schema from ninja.pagination import paginate @@ -161,10 +162,7 @@ def recording_annotations( if not request.user.is_authenticated or not request.user.is_superuser: return JsonResponse({"error": "Permission denied"}, status=403) - try: - recording = NABatRecording.objects.get(pk=recording_id) - except NABatRecording.DoesNotExist: - return JsonResponse({"error": "Recording not found"}, status=404) + recording = get_object_or_404(NABatRecording, pk=recording_id) annotations = NABatRecordingAnnotation.objects.filter(nabat_recording=recording) diff --git a/bats_ai/core/views/nabat/nabat_recording.py b/bats_ai/core/views/nabat/nabat_recording.py index 0e4d498b..aadb82eb 100644 --- a/bats_ai/core/views/nabat/nabat_recording.py +++ b/bats_ai/core/views/nabat/nabat_recording.py @@ -8,6 +8,7 @@ from django.db import transaction from django.db.models import Q from django.http import HttpRequest, JsonResponse +from django.shortcuts import get_object_or_404 from ninja import Form, Schema from ninja.pagination import RouterPaginated from oauth2_provider.models import AccessToken @@ -115,13 +116,9 @@ def get_email_if_authorized( return JsonResponse( {"error": "Either recording_id or recording_pk must be provided"}, status=400 ) - try: - nabat_recording = NABatRecording.objects.get(pk=recording_pk) - recording_id = nabat_recording.recording_id - except NABatRecording.DoesNotExist: - return JsonResponse( - {"error": f"NABatRecording with id {recording_pk} does not exist"}, status=404 - ) + + nabat_recording = get_object_or_404(NABatRecording, pk=recording_pk) + recording_id = nabat_recording.recording_id # Verify access with NABat API headers = {"Authorization": f"Bearer {api_token}", "Content-Type": "application/json"} @@ -269,10 +266,7 @@ def generate_nabat_recording( @router.get("/{pk}/spectrogram", auth=admin_auth) def get_spectrogram(request: HttpRequest, pk: int): - try: - nabat_recording = NABatRecording.objects.get(pk=pk) - except NABatRecording.DoesNotExist: - return {"error": "Recording not found"} + nabat_recording = get_object_or_404(NABatRecording, pk=pk) spectrogram = nabat_recording.spectrogram @@ -311,10 +305,7 @@ def get_spectrogram_compressed( pk: int, apiToken: str, # noqa: N803 ): - try: - nabat_recording = NABatRecording.objects.get(pk=pk) - except NABatRecording.DoesNotExist: - return JsonResponse({"error": "Recording does not exist"}, status=404) + nabat_recording = get_object_or_404(NABatRecording, pk=pk) email_or_response = get_email_if_authorized(request, apiToken, nabat_recording.recording_id) if isinstance(email_or_response, JsonResponse): @@ -440,15 +431,13 @@ def get_recording_annotation( if isinstance(email_or_response, JsonResponse): return email_or_response user_email = email_or_response # safe to use - try: - annotation = NABatRecordingAnnotation.objects.get(pk=pk) - if user_email: - annotation = annotation.filter(Q(user_email=user_email) | Q(user_email__isnull=True)) + annotation = get_object_or_404(NABatRecordingAnnotation, pk=pk) + + if user_email: + annotation = annotation.filter(Q(user_email=user_email) | Q(user_email__isnull=True)) - return NABatRecordingAnnotationSchema.from_orm(annotation).dict() - except NABatRecordingAnnotation.DoesNotExist: - return JsonResponse({"error": "Recording annotation not found."}, 404) + return NABatRecordingAnnotationSchema.from_orm(annotation).dict() @router.get( @@ -465,14 +454,12 @@ def get_recording_annotation_details( if isinstance(email_or_response, JsonResponse): return email_or_response user_email = email_or_response # safe to use - try: - annotation = NABatRecordingAnnotation.objects.get( - Q(pk=pk) & (Q(user_email=user_email) | Q(user_email__isnull=True)) - ) - return NABatRecordingAnnotationDetailsSchema.from_orm(annotation).dict() - except NABatRecordingAnnotation.DoesNotExist: - return JsonResponse({"error": "Recording annotation not found."}, 404) + annotation = get_object_or_404( + NABatRecordingAnnotation, Q(pk=pk) & (Q(user_email=user_email) | Q(user_email__isnull=True)) + ) + + return NABatRecordingAnnotationDetailsSchema.from_orm(annotation).dict() @router.put("recording-annotation", auth=None, response={200: str}) @@ -487,28 +474,23 @@ def create_recording_annotation(request: HttpRequest, data: NABatCreateRecording token_data = decode_jwt(data.apiToken) user_id = token_data["sub"] - try: - recording = NABatRecording.objects.get(pk=data.recordingId) - - # Create the recording annotation - annotation = NABatRecordingAnnotation.objects.create( - nabat_recording=recording, - user_email=user_email, - user_id=user_id, - comments=data.comments, - model=data.model, - confidence=data.confidence, - ) + recording = get_object_or_404(NABatRecording, pk=data.recordingId) - # Add species - for species_id in data.species: - species = Species.objects.get(pk=species_id) - annotation.species.add(species) - return "Recording annotation created successfully." - except NABatRecording.DoesNotExist: - return JsonResponse({"error": "Recording not found."}, 404) - except Species.DoesNotExist: - return JsonResponse({"error": "One or more species IDs not found."}, 404) + # Create the recording annotation + annotation = NABatRecordingAnnotation.objects.create( + nabat_recording=recording, + user_email=user_email, + user_id=user_id, + comments=data.comments, + model=data.model, + confidence=data.confidence, + ) + + # Add species + for species_id in data.species: + species = get_object_or_404(Species, pk=species_id) + annotation.species.add(species) + return "Recording annotation created successfully." @router.patch("recording-annotation/{pk}", auth=None, response={200: str}) @@ -530,29 +512,25 @@ def update_recording_annotation( if isinstance(email_or_response, JsonResponse): return email_or_response user_email = email_or_response # safe to use - try: - annotation = NABatRecordingAnnotation.objects.get(pk=pk, user_email=user_email) - # Check permission - - # Update fields if provided - if data.comments is not None: - annotation.comments = data.comments - if data.model is not None: - annotation.model = data.model - if data.confidence is not None: - annotation.confidence = data.confidence - if data.species is not None: - annotation.species.clear() # Clear existing species - for species_id in data.species: - species = Species.objects.get(pk=species_id) - annotation.species.add(species) - - annotation.save() - return "Recording annotation updated successfully." - except NABatRecordingAnnotation.DoesNotExist: - return JsonResponse({"error": "Recording not found."}, 404) - except Species.DoesNotExist: - return JsonResponse({"error": "One or more species IDs not found."}, 404) + + annotation = get_object_or_404(NABatRecordingAnnotation, pk=pk, user_email=user_email) + # Check permission + + # Update fields if provided + if data.comments is not None: + annotation.comments = data.comments + if data.model is not None: + annotation.model = data.model + if data.confidence is not None: + annotation.confidence = data.confidence + if data.species is not None: + annotation.species.clear() # Clear existing species + for species_id in data.species: + species = get_object_or_404(Species, pk=species_id) + annotation.species.add(species) + + annotation.save() + return "Recording annotation updated successfully." @router.patch("recording-annotation/{pk}/push-to-nabat", auth=None, response={200: str}) @@ -567,35 +545,30 @@ def update_nabat_recording_annotation( return email_or_response user_email = email_or_response # safe to use - try: - annotation = NABatRecordingAnnotation.objects.get(pk=pk, user_email=user_email) - # Check permission - - # Update fields if provided - if data.comments is not None: - annotation.comments = data.comments - if data.model is not None: - annotation.model = data.model - if data.confidence is not None: - annotation.confidence = data.confidence - if data.species is not None: - if len(data.species) == 1: - species_id = data.species[0] - return update_nabat_species( - species_id, - data.apiToken, - annotation.nabat_recording.recording_id, - annotation.nabat_recording.survey_event_id, - ) - elif len(data.species) > 1: - return JsonResponse( - {"error": "NABat only supports one species per recording annotation."}, - status=400, - ) - except NABatRecordingAnnotation.DoesNotExist: - return JsonResponse({"error": "Recording not found."}, 404) - except Species.DoesNotExist: - return JsonResponse({"error": "One or more species IDs not found."}, 404) + annotation = get_object_or_404(NABatRecordingAnnotation, pk=pk, user_email=user_email) + # Check permission + + # Update fields if provided + if data.comments is not None: + annotation.comments = data.comments + if data.model is not None: + annotation.model = data.model + if data.confidence is not None: + annotation.confidence = data.confidence + if data.species is not None: + if len(data.species) == 1: + species_id = data.species[0] + return update_nabat_species( + species_id, + data.apiToken, + annotation.nabat_recording.recording_id, + annotation.nabat_recording.survey_event_id, + ) + else: + return JsonResponse( + {"error": "NABat only supports one species per recording annotation."}, + status=400, + ) # TODO: Determine if this will be implemented for NABat @@ -610,11 +583,9 @@ def delete_recording_annotation( if isinstance(email_or_response, JsonResponse): return email_or_response user_email = email_or_response # safe to use - try: - annotation = NABatRecordingAnnotation.objects.get(pk=pk, user_email=user_email) - # Check permission - annotation.delete() - return "Recording annotation deleted successfully." - except NABatRecordingAnnotation.DoesNotExist: - return JsonResponse({"error": "Recording not found for this user."}, 404) + annotation = get_object_or_404(NABatRecordingAnnotation, pk=pk, user_email=user_email) + + # Check permission + annotation.delete() + return "Recording annotation deleted successfully." diff --git a/bats_ai/core/views/recording.py b/bats_ai/core/views/recording.py index 23c9cdde..8ed2f539 100644 --- a/bats_ai/core/views/recording.py +++ b/bats_ai/core/views/recording.py @@ -10,6 +10,7 @@ from django.contrib.postgres.aggregates import ArrayAgg from django.core.files.storage import default_storage from django.db.models import Count, Exists, OuterRef, Prefetch, Q, QuerySet +from django.shortcuts import get_object_or_404 from ninja import File, Form, Query, Schema # Django-Ninja accesses additional params directly, so we need to ignore the type checker. @@ -347,10 +348,7 @@ def create_recording( @router.patch("/{pk}") def update_recording(request: HttpRequest, pk: int, recording_data: RecordingUploadSchema): - try: - recording = Recording.objects.get(pk=pk, owner=request.user) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + recording = get_object_or_404(Recording, pk=pk, owner=request.user) if recording_data.name: recording.name = recording_data.name @@ -459,23 +457,15 @@ def delete_recording( request, pk: int, ): - try: - recording = Recording.objects.get(pk=pk) + recording = get_object_or_404(Recording, pk=pk) - # Check if the user owns the recording - if recording.owner == request.user: - # Delete the annotation - recording.delete() - return {"message": "Recording deleted successfully"} - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } - - except Recording.DoesNotExist: - return {"error": "Recording not found"} - except Annotations.DoesNotExist: - return {"error": "Annotation not found"} + # Check if the user owns the recording + if recording.owner == request.user: + # Delete the annotation + recording.delete() + return {"message": "Recording deleted successfully"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.get("/", response=RecordingPaginatedResponse) @@ -647,68 +637,56 @@ def get_unsubmitted_neighbors( @router.get("/{pk}/") def get_recording(request: HttpRequest, pk: int): - # Filter recordings based on the owner's id or public=True - try: - recordings = ( - Recording.objects.filter(pk=pk) - .annotate(tags_text=ArrayAgg("tags__text", filter=Q(tags__text__isnull=False))) - .values() - ) - if len(recordings) > 0: - recording = recordings[0] - - user = User.objects.get(id=recording["owner_id"]) - recording["owner_username"] = user.username - recording["audio_file_presigned_url"] = default_storage.url(recording["audio_file"]) - recording["hasSpectrogram"] = Recording.objects.get(id=recording["id"]).has_spectrogram - if recording["recording_location"]: - recording["recording_location"] = json.loads(recording["recording_location"].json) - annotation_owners = ( - Annotations.objects.filter(recording_id=recording["id"]) - .values_list("owner", flat=True) - .distinct() - ) - recording_annotation_owners = ( - RecordingAnnotation.objects.filter(recording_id=recording["id"]) - .values_list("owner", flat=True) - .distinct() - ) + recording = get_object_or_404( + Recording.objects.annotate( + tags_text=ArrayAgg("tags__text", filter=Q(tags__text__isnull=False)) + ).values(), + pk=pk, + ) - # Combine the sets of owners and count unique entries - unique_users_with_annotations = len( - set(annotation_owners).union(set(recording_annotation_owners)) - ) - recording["userAnnotations"] = unique_users_with_annotations - user_has_annotations = ( - Annotations.objects.filter( - recording_id=recording["id"], owner=request.user - ).exists() - or RecordingAnnotation.objects.filter( - recording_id=recording["id"], owner=request.user - ).exists() - ) - recording["userMadeAnnotations"] = user_has_annotations - # Only expose file-level annotations owned by the current user - file_annotations = RecordingAnnotation.objects.filter( - recording=pk, owner=request.user - ).order_by("confidence") - recording["fileAnnotations"] = [ - RecordingAnnotationSchema.from_orm(fileAnnotation).dict() - for fileAnnotation in file_annotations - ] - return recording - else: - return {"error": "Recording not found"} - except Recording.DoesNotExist: - return {"error": "Recording not found"} + user = User.objects.get(id=recording["owner_id"]) + recording["owner_username"] = user.username + recording["audio_file_presigned_url"] = default_storage.url(recording["audio_file"]) + recording["hasSpectrogram"] = len(recording["spectrograms"]) > 0 + if recording["recording_location"]: + recording["recording_location"] = json.loads(recording["recording_location"].json) + annotation_owners = ( + Annotations.objects.filter(recording_id=recording["id"]) + .values_list("owner", flat=True) + .distinct() + ) + recording_annotation_owners = ( + RecordingAnnotation.objects.filter(recording_id=recording["id"]) + .values_list("owner", flat=True) + .distinct() + ) + + # Combine the sets of owners and count unique entries + unique_users_with_annotations = len( + set(annotation_owners).union(set(recording_annotation_owners)) + ) + recording["userAnnotations"] = unique_users_with_annotations + user_has_annotations = ( + Annotations.objects.filter(recording_id=recording["id"], owner=request.user).exists() + or RecordingAnnotation.objects.filter( + recording_id=recording["id"], owner=request.user + ).exists() + ) + recording["userMadeAnnotations"] = user_has_annotations + # Only expose file-level annotations owned by the current user + file_annotations = RecordingAnnotation.objects.filter( + recording=pk, owner=request.user + ).order_by("confidence") + recording["fileAnnotations"] = [ + RecordingAnnotationSchema.from_orm(fileAnnotation).dict() + for fileAnnotation in file_annotations + ] + return recording @router.get("/{recording_id}/recording-annotations") def get_recording_annotations(request: HttpRequest, recording_id: int): - try: - recording = Recording.objects.get(pk=recording_id) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + recording = get_object_or_404(Recording, pk=recording_id) if recording.owner != request.user and not recording.public: return {"error": "Permission denied. You do not own this recording, and it is not public."} # Only return file-level annotations owned by the current user (same as pulse) @@ -727,10 +705,7 @@ def get_recording_annotations(request: HttpRequest, recording_id: int): @router.get("/{pk}/spectrogram") def get_spectrogram(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + recording = get_object_or_404(Recording, pk=pk) spectrogram = recording.spectrogram @@ -799,13 +774,8 @@ def get_spectrogram(request: HttpRequest, pk: int): @router.get("/{pk}/spectrogram/compressed") def get_spectrogram_compressed(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - compressed_spectrogram = CompressedSpectrogram.objects.filter(recording=pk).first() - except compressed_spectrogram.DoesNotExist: - return {"error": "Compressed Spectrogram"} - except recording.DoesNotExist: - return {"error": "Recording does not exist"} + recording = get_object_or_404(Recording, pk=pk) + compressed_spectrogram = get_object_or_404(recording.compressedspectrogram_set) spectro_data = { "urls": compressed_spectrogram.image_url_list, @@ -870,118 +840,90 @@ def get_spectrogram_compressed(request: HttpRequest, pk: int): @router.get("/{pk}/annotations") def get_annotations(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - # Query annotations associated with the recording that are owned by the current user - annotations_qs = Annotations.objects.filter(recording=recording, owner=request.user) + recording = get_object_or_404(Recording, pk=pk) - # Serialize the annotations using AnnotationSchema - return [ - AnnotationSchema.from_orm(annotation, owner_email=request.user.email).dict() - for annotation in annotations_qs - ] + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + # Query annotations associated with the recording that are owned by the current user + annotations_qs = Annotations.objects.filter(recording=recording, owner=request.user) - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Serialize the annotations using AnnotationSchema + return [ + AnnotationSchema.from_orm(annotation, owner_email=request.user.email).dict() + for annotation in annotations_qs + ] - except Recording.DoesNotExist: - return {"error": "Recording not found"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.get("/{pk}/pulse_contours") def get_pulse_contours(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - if recording.owner == request.user or recording.public: - computed_pulse_annotation_qs = PulseMetadata.objects.filter( - recording=recording - ).order_by("index") - return [ - PulseContourSchema.from_orm(pulse) for pulse in computed_pulse_annotation_qs.all() - ] - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } - except Recording.DoesNotExist: - return {"error": "Recording not found"} + recording = get_object_or_404(Recording, pk=pk) + if recording.owner == request.user or recording.public: + computed_pulse_annotation_qs = PulseMetadata.objects.filter(recording=recording).order_by( + "index" + ) + return [PulseContourSchema.from_orm(pulse) for pulse in computed_pulse_annotation_qs.all()] + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.get("/{pk}/pulse_data") def get_pulse_data(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - if recording.owner == request.user or recording.public: - computed_pulse_annotation_qs = PulseMetadata.objects.filter( - recording=recording - ).order_by("index") - return [ - PulseMetadataSchema.from_orm(pulse) for pulse in computed_pulse_annotation_qs.all() - ] - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } - except Recording.DoesNotExist: - return {"error": "Recording not found"} + recording = get_object_or_404(Recording, pk=pk) + if recording.owner == request.user or recording.public: + computed_pulse_annotation_qs = PulseMetadata.objects.filter(recording=recording).order_by( + "index" + ) + return [PulseMetadataSchema.from_orm(pulse) for pulse in computed_pulse_annotation_qs.all()] + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.get("/{pk}/annotations/other_users") def get_other_user_annotations(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) - - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or request.user.is_superuser: - # Query annotations associated with the recording that are owned by other users - annotations_qs = Annotations.objects.filter(recording=recording).exclude( - owner=request.user - ) - sequence_qs = SequenceAnnotations.objects.filter(recording=recording).exclude( - owner=request.user - ) - - # Create a dictionary to store annotations for each user - annotations_by_user = {} + recording = get_object_or_404(Recording, pk=pk) + + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or request.user.is_superuser: + # Query annotations associated with the recording that are owned by other users + annotations_qs = Annotations.objects.filter(recording=recording).exclude(owner=request.user) + sequence_qs = SequenceAnnotations.objects.filter(recording=recording).exclude( + owner=request.user + ) - # Serialize the annotations using AnnotationSchema - for annotation in annotations_qs: - user_email = annotation.owner.email + # Create a dictionary to store annotations for each user + annotations_by_user = {} - # If user_email is not already a key in the dictionary, initialize it with - # an empty list - annotations_by_user.setdefault(user_email, {"annotations": [], "sequence": []}) + # Serialize the annotations using AnnotationSchema + for annotation in annotations_qs: + user_email = annotation.owner.email - # Append the annotation to the list for the corresponding user_email - annotations_by_user[user_email]["annotations"].append( - AnnotationSchema.from_orm(annotation, owner_email=user_email).dict() - ) + # If user_email is not already a key in the dictionary, initialize it with + # an empty list + annotations_by_user.setdefault(user_email, {"annotations": [], "sequence": []}) - for annotation in sequence_qs: - user_email = annotation.owner.email + # Append the annotation to the list for the corresponding user_email + annotations_by_user[user_email]["annotations"].append( + AnnotationSchema.from_orm(annotation, owner_email=user_email).dict() + ) - # If user_email is not already a key in the dictionary, initialize it with - # an empty list - annotations_by_user.setdefault(user_email, {"annotations": [], "sequence": []}) + for annotation in sequence_qs: + user_email = annotation.owner.email - # Append the annotation to the list for the corresponding user_email - annotations_by_user[user_email]["sequence"].append( - SequenceAnnotationSchema.from_orm(annotation, owner_email=user_email).dict() - ) + # If user_email is not already a key in the dictionary, initialize it with + # an empty list + annotations_by_user.setdefault(user_email, {"annotations": [], "sequence": []}) - return annotations_by_user - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Append the annotation to the list for the corresponding user_email + annotations_by_user[user_email]["sequence"].append( + SequenceAnnotationSchema.from_orm(annotation, owner_email=user_email).dict() + ) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + return annotations_by_user + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.get("/{pk}/annotations/user/{userId}") @@ -990,24 +932,18 @@ def get_user_annotations( pk: int, userId: int, # noqa: N803 ): - try: - recording = Recording.objects.get(pk=pk) - - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - # Query annotations associated with the recording that are owned by the current user - annotations_qs = Annotations.objects.filter(recording=recording, owner=userId) + recording = get_object_or_404(Recording, pk=pk) - # Serialize the annotations using AnnotationSchema - return [AnnotationSchema.from_orm(annotation).dict() for annotation in annotations_qs] + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + # Query annotations associated with the recording that are owned by the current user + annotations_qs = Annotations.objects.filter(recording=recording, owner=userId) - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Serialize the annotations using AnnotationSchema + return [AnnotationSchema.from_orm(annotation).dict() for annotation in annotations_qs] - except Recording.DoesNotExist: - return {"error": "Recording not found"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.put("/{pk}/annotations") @@ -1017,38 +953,28 @@ def put_annotation( annotation: AnnotationSchema, species_ids: list[int], ): - try: - recording = Recording.objects.get(pk=pk) - if recording.owner == request.user or recording.public: - # Create a new annotation - new_annotation = Annotations.objects.create( - recording=recording, - owner=request.user, - start_time=annotation.start_time, - end_time=annotation.end_time, - low_freq=annotation.low_freq, - high_freq=annotation.high_freq, - comments=annotation.comments, - type=annotation.type, - ) + recording = get_object_or_404(Recording, pk=pk) + if recording.owner == request.user or recording.public: + # Create a new annotation + new_annotation = Annotations.objects.create( + recording=recording, + owner=request.user, + start_time=annotation.start_time, + end_time=annotation.end_time, + low_freq=annotation.low_freq, + high_freq=annotation.high_freq, + comments=annotation.comments, + type=annotation.type, + ) - # Add species to the annotation based on the provided species_ids - for species_id in species_ids: - try: - species_obj = Species.objects.get(pk=species_id) - new_annotation.species.add(species_obj) - except Species.DoesNotExist: - # Handle the case where the species with the given ID doesn't exist - return {"error": f"Species with ID {species_id} not found"} - - return {"message": "Annotation added successfully", "id": new_annotation.pk} - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Add species to the annotation based on the provided species_ids + for species_id in species_ids: + species_obj = get_object_or_404(Species, pk=species_id) + new_annotation.species.add(species_obj) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + return {"message": "Annotation added successfully", "id": new_annotation.pk} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.patch("/{recording_pk}/annotations/{annotation_pk}") @@ -1059,56 +985,42 @@ def patch_annotation( annotation: UpdateAnnotationsSchema, species_ids: list[int] | None, ): - try: - recording = Recording.objects.get(pk=recording_pk) + recording = get_object_or_404(Recording, pk=recording_pk) - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - annotation_instance = Annotations.objects.get( - pk=annotation_pk, recording=recording, owner=request.user - ) - if annotation_instance is None: - return {"error": "Annotation not found"} - - # Update annotation details - if annotation.start_time is not None: - annotation_instance.start_time = annotation.start_time - if annotation.end_time: - annotation_instance.end_time = annotation.end_time - if annotation.low_freq: - annotation_instance.low_freq = annotation.low_freq - if annotation.high_freq: - annotation_instance.high_freq = annotation.high_freq - if annotation.type: - annotation_instance.type = annotation.type - else: - annotation_instance.type = None - if annotation.comments: - annotation_instance.comments = annotation.comments - annotation_instance.save() - - # Clear existing species associations - if species_ids is not None: - annotation_instance.species.clear() - # Add species to the annotation based on the provided species_ids - for species_id in species_ids: - try: - species_obj = Species.objects.get(pk=species_id) - annotation_instance.species.add(species_obj) - except Species.DoesNotExist: - # Handle the case where the species with the given ID doesn't exist - return {"error": f"Species with ID {species_id} not found"} - - return {"message": "Annotation updated successfully", "id": annotation_instance.pk} + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + annotation_instance = get_object_or_404( + Annotations, pk=annotation_pk, recording=recording, owner=request.user + ) + + # Update annotation details + if annotation.start_time is not None: + annotation_instance.start_time = annotation.start_time + if annotation.end_time: + annotation_instance.end_time = annotation.end_time + if annotation.low_freq: + annotation_instance.low_freq = annotation.low_freq + if annotation.high_freq: + annotation_instance.high_freq = annotation.high_freq + if annotation.type: + annotation_instance.type = annotation.type else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + annotation_instance.type = None + if annotation.comments: + annotation_instance.comments = annotation.comments + annotation_instance.save() + + # Clear existing species associations + if species_ids is not None: + annotation_instance.species.clear() + # Add species to the annotation based on the provided species_ids + for species_id in species_ids: + species_obj = get_object_or_404(Species, pk=species_id) + annotation_instance.species.add(species_obj) - except Recording.DoesNotExist: - return {"error": "Recording not found"} - except Annotations.DoesNotExist: - return {"error": "Annotation not found"} + return {"message": "Annotation updated successfully", "id": annotation_instance.pk} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.patch("/{recording_pk}/sequence-annotations/{sequence_annotation_pk}") @@ -1119,76 +1031,56 @@ def patch_sequence_annotation( annotation: UpdateSequenceAnnotationSchema, species_ids: list[int] | None, ): - try: - recording = Recording.objects.get(pk=recording_pk) + recording = get_object_or_404(Recording, pk=recording_pk) - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - annotation_instance = SequenceAnnotations.objects.get( - pk=sequence_annotation_pk, recording=recording, owner=request.user - ) + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + annotation_instance = get_object_or_404( + SequenceAnnotations, pk=sequence_annotation_pk, recording=recording, owner=request.user + ) - # Update annotation details - if annotation.start_time is not None: - annotation_instance.start_time = annotation.start_time - if annotation.end_time: - annotation_instance.end_time = annotation.end_time - if annotation.comments: - annotation_instance.comments = annotation.comments - if annotation.type: - annotation_instance.type = annotation.type - else: - annotation_instance.type = None - annotation_instance.save() - - # Clear existing species associations - if species_ids is not None: - annotation_instance.species.clear() - # Add species to the annotation based on the provided species_ids - for species_id in species_ids: - try: - species_obj = Species.objects.get(pk=species_id) - annotation_instance.species.add(species_obj) - except Species.DoesNotExist: - # Handle the case where the species with the given ID doesn't exist - return {"error": f"Species with ID {species_id} not found"} - - return {"message": "Annotation updated successfully", "id": annotation_instance.pk} + # Update annotation details + if annotation.start_time is not None: + annotation_instance.start_time = annotation.start_time + if annotation.end_time: + annotation_instance.end_time = annotation.end_time + if annotation.comments: + annotation_instance.comments = annotation.comments + if annotation.type: + annotation_instance.type = annotation.type else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + annotation_instance.type = None + annotation_instance.save() - except Recording.DoesNotExist: - return {"error": "Recording not found"} - except Annotations.DoesNotExist: - return {"error": "Annotation not found"} + # Clear existing species associations + if species_ids is not None: + annotation_instance.species.clear() + # Add species to the annotation based on the provided species_ids + for species_id in species_ids: + species_obj = get_object_or_404(Species, pk=species_id) + annotation_instance.species.add(species_obj) + + return {"message": "Annotation updated successfully", "id": annotation_instance.pk} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.delete("/{recording_pk}/annotations/{annotation_pk}") def delete_annotation(request, recording_pk: int, annotation_pk: int): - try: - recording = Recording.objects.get(pk=recording_pk) + recording = get_object_or_404(Recording, pk=recording_pk) - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - annotation_instance = Annotations.objects.get( - pk=annotation_pk, recording=recording, owner=request.user - ) + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + annotation_instance = get_object_or_404( + Annotations, pk=annotation_pk, recording=recording, owner=request.user + ) - # Delete the annotation - annotation_instance.delete() + # Delete the annotation + annotation_instance.delete() - return {"message": "Annotation deleted successfully"} - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } - - except Recording.DoesNotExist: - return {"error": "Recording not found"} - except Annotations.DoesNotExist: - return {"error": "Annotation not found"} + return {"message": "Annotation deleted successfully"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} # SEQUENCE ANNOTATIONS @@ -1196,29 +1088,21 @@ def delete_annotation(request, recording_pk: int, annotation_pk: int): @router.get("/{pk}/sequence-annotations") def get_sequence_annotations(request: HttpRequest, pk: int): - try: - recording = Recording.objects.get(pk=pk) + recording = get_object_or_404(Recording, pk=pk) - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - # Query annotations associated with the recording that are owned by the current user - annotations_qs = SequenceAnnotations.objects.filter( - recording=recording, owner=request.user - ) + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + # Query annotations associated with the recording that are owned by the current user + annotations_qs = SequenceAnnotations.objects.filter(recording=recording, owner=request.user) - # Serialize the annotations using AnnotationSchema - return [ - SequenceAnnotationSchema.from_orm(annotation, owner_email=request.user.email).dict() - for annotation in annotations_qs - ] - - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Serialize the annotations using AnnotationSchema + return [ + SequenceAnnotationSchema.from_orm(annotation, owner_email=request.user.email).dict() + for annotation in annotations_qs + ] - except Recording.DoesNotExist: - return {"error": "Recording not found"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.put("/{pk}/sequence-annotations") @@ -1228,50 +1112,36 @@ def put_sequence_annotation( annotation: SequenceAnnotationSchema, species_ids: list[int] | None, ): - try: - recording = Recording.objects.get(pk=pk) - if recording.owner == request.user or recording.public: - # Create a new annotation - new_annotation = SequenceAnnotations.objects.create( - recording=recording, - owner=request.user, - start_time=annotation.start_time, - end_time=annotation.end_time, - type=annotation.type, - comments=annotation.comments, - ) - - return {"message": "Annotation added successfully", "id": new_annotation.pk} - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + recording = get_object_or_404(Recording, pk=pk) + if recording.owner == request.user or recording.public: + # Create a new annotation + new_annotation = SequenceAnnotations.objects.create( + recording=recording, + owner=request.user, + start_time=annotation.start_time, + end_time=annotation.end_time, + type=annotation.type, + comments=annotation.comments, + ) - except Recording.DoesNotExist: - return {"error": "Recording not found"} + return {"message": "Annotation added successfully", "id": new_annotation.pk} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} @router.delete("/{recording_pk}/sequence-annotations/{sequence_annotation_pk}") def delete_sequence_annotation(request, recording_pk: int, sequence_annotation_pk: int): - try: - recording = Recording.objects.get(pk=recording_pk) + recording = get_object_or_404(Recording, pk=recording_pk) - # Check if the user owns the recording or if the recording is public - if recording.owner == request.user or recording.public: - annotation_instance = SequenceAnnotations.objects.get( - pk=sequence_annotation_pk, recording=recording, owner=request.user - ) - - # Delete the annotation - annotation_instance.delete() + # Check if the user owns the recording or if the recording is public + if recording.owner == request.user or recording.public: + annotation_instance = get_object_or_404( + SequenceAnnotations, pk=sequence_annotation_pk, recording=recording, owner=request.user + ) - return {"message": "Annotation deleted successfully"} - else: - return { - "error": "Permission denied. You do not own this recording, and it is not public." - } + # Delete the annotation + annotation_instance.delete() - except Recording.DoesNotExist: - return {"error": "Recording not found"} - except Annotations.DoesNotExist: - return {"error": "Annotation not found"} + return {"message": "Annotation deleted successfully"} + else: + return {"error": "Permission denied. You do not own this recording, and it is not public."} diff --git a/bats_ai/core/views/recording_annotation.py b/bats_ai/core/views/recording_annotation.py index 5be4e47f..bd0cc77b 100644 --- a/bats_ai/core/views/recording_annotation.py +++ b/bats_ai/core/views/recording_annotation.py @@ -3,6 +3,7 @@ import logging from typing import TYPE_CHECKING +from django.shortcuts import get_object_or_404 from ninja import Router, Schema from ninja.errors import HttpError @@ -92,155 +93,130 @@ class UpdateRecordingAnnotationSchema(Schema): @router.get("/{pk}", response=RecordingAnnotationSchema) def get_recording_annotation(request: HttpRequest, pk: int): - try: - annotation = RecordingAnnotation.objects.get(pk=pk) + annotation = get_object_or_404(RecordingAnnotation, pk=pk) - # Check permission - if annotation.recording.owner != request.user and not annotation.recording.public: - raise HttpError(403, "Permission denied.") + # Check permission + if annotation.recording.owner != request.user and not annotation.recording.public: + raise HttpError(403, "Permission denied.") - return RecordingAnnotationSchema.from_orm(annotation).dict() - except RecordingAnnotation.DoesNotExist as e: - raise HttpError(404, "Recording annotation not found.") from e + return RecordingAnnotationSchema.from_orm(annotation).dict() @router.get("/{pk}/details", response=RecordingAnnotationDetailsSchema) def get_recording_annotation_details(request: HttpRequest, pk: int): - try: - annotation = RecordingAnnotation.objects.get(pk=pk) + annotation = get_object_or_404(RecordingAnnotation, pk=pk) - # Check permission - if annotation.recording.owner != request.user and not annotation.recording.public: - raise HttpError(403, "Permission denied.") + # Check permission + if annotation.recording.owner != request.user and not annotation.recording.public: + raise HttpError(403, "Permission denied.") - return RecordingAnnotationDetailsSchema.from_orm(annotation).dict() - except RecordingAnnotation.DoesNotExist as e: - raise HttpError(404, "Recording annotation not found.") from e + return RecordingAnnotationDetailsSchema.from_orm(annotation).dict() @router.put("/", response={200: str}) def create_recording_annotation(request: HttpRequest, data: CreateRecordingAnnotationSchema): - try: - recording = Recording.objects.get(pk=data.recordingId) - - # Check permission - if recording.owner != request.user and not recording.public: - raise HttpError(403, "Permission denied.") - - # Create the recording annotation - annotation = RecordingAnnotation.objects.create( - recording=recording, - owner=request.user, - comments=data.comments, - model=data.model, - confidence=data.confidence, + recording = get_object_or_404(Recording, pk=data.recordingId) + + # Check permission + if recording.owner != request.user and not recording.public: + raise HttpError(403, "Permission denied.") + + # Create the recording annotation + annotation = RecordingAnnotation.objects.create( + recording=recording, + owner=request.user, + comments=data.comments, + model=data.model, + confidence=data.confidence, + ) + + # Add species in order (through model allows duplicates) + for order, species_id in enumerate(data.species): + species = get_object_or_404(Species, pk=species_id) + RecordingAnnotationSpecies.objects.create( + recording_annotation=annotation, + species=species, + order=order, ) - # Add species in order (through model allows duplicates) + return "Recording annotation created successfully." + + +@router.patch("/{pk}", response={200: str}) +def update_recording_annotation( + request: HttpRequest, pk: int, data: UpdateRecordingAnnotationSchema +): + annotation = get_object_or_404( + RecordingAnnotation.objects.select_related("recording", "recording__owner"), pk=pk + ) + + # Check permission + if annotation.owner != request.user: + raise HttpError(403, "Permission denied.") + + if annotation.recording.owner != request.user and not annotation.recording.public: + raise HttpError(403, "Permission denied.") + + # Update fields if provided + if data.comments is not None: + annotation.comments = data.comments + if data.model is not None: + annotation.model = data.model + if data.confidence is not None: + annotation.confidence = data.confidence + if data.species is not None: + # Rebuild ordered species with duplicates via through model + unique_ids = set(data.species) + id_to_species = {s.pk: s for s in Species.objects.filter(pk__in=unique_ids)} + if len(id_to_species) != len(unique_ids): + raise HttpError(404, "One or more species IDs not found.") + RecordingAnnotationSpecies.objects.filter(recording_annotation=annotation).delete() for order, species_id in enumerate(data.species): - species = Species.objects.get(pk=species_id) + species = id_to_species[species_id] RecordingAnnotationSpecies.objects.create( recording_annotation=annotation, species=species, order=order, ) - return "Recording annotation created successfully." - except Recording.DoesNotExist as e: - raise HttpError(404, "Recording not found.") from e - except Species.DoesNotExist as e: - raise HttpError(404, "One or more species IDs not found.") from e - - -@router.patch("/{pk}", response={200: str}) -def update_recording_annotation( - request: HttpRequest, pk: int, data: UpdateRecordingAnnotationSchema -): - try: - annotation = RecordingAnnotation.objects.select_related( - "recording", "recording__owner" - ).get(pk=pk) - - # Check permission - if annotation.owner != request.user: - raise HttpError(403, "Permission denied.") - - if annotation.recording.owner != request.user and not annotation.recording.public: - raise HttpError(403, "Permission denied.") - - # Update fields if provided - if data.comments is not None: - annotation.comments = data.comments - if data.model is not None: - annotation.model = data.model - if data.confidence is not None: - annotation.confidence = data.confidence - if data.species is not None: - # Rebuild ordered species with duplicates via through model - unique_ids = set(data.species) - id_to_species = {s.pk: s for s in Species.objects.filter(pk__in=unique_ids)} - if len(id_to_species) != len(unique_ids): - raise HttpError(404, "One or more species IDs not found.") - RecordingAnnotationSpecies.objects.filter(recording_annotation=annotation).delete() - for order, species_id in enumerate(data.species): - species = id_to_species[species_id] - RecordingAnnotationSpecies.objects.create( - recording_annotation=annotation, - species=species, - order=order, - ) - - annotation.save() - return "Recording annotation updated successfully." - except RecordingAnnotation.DoesNotExist as e: - raise HttpError(404, "Recording annotation not found.") from e - except Species.DoesNotExist as e: - raise HttpError(404, "One or more species IDs not found.") from e + annotation.save() + return "Recording annotation updated successfully." # DELETE Endpoint @router.delete("/{pk}", response={200: str}) def delete_recording_annotation(request: HttpRequest, pk: int): - try: - configuration = Configuration.objects.first() - vetting_enabled = ( - configuration.mark_annotations_completed_enabled if configuration else False + configuration = Configuration.objects.first() + vetting_enabled = configuration.mark_annotations_completed_enabled if configuration else False + annotation = get_object_or_404(RecordingAnnotation, pk=pk) + + # Check permission: only the annotation owner may delete their own + if annotation.owner != request.user: + raise HttpError(403, "Permission denied.") + + # In vetting mode, non-staff may only delete blank annotations (no species) + if vetting_enabled and not request.user.is_staff and annotation.species.exists(): + raise HttpError( + 403, + "Permission denied. Only blank annotations can be deleted while vetting is enabled.", ) - annotation = RecordingAnnotation.objects.get(pk=pk) - - # Check permission: only the annotation owner may delete their own - if annotation.owner != request.user: - raise HttpError(403, "Permission denied.") - - # In vetting mode, non-staff may only delete blank annotations (no species) - if vetting_enabled and not request.user.is_staff and annotation.species.exists(): - raise HttpError( - 403, - "Permission denied. Only blank annotations can be deleted " - "while vetting is enabled.", - ) - annotation.delete() - return "Recording annotation deleted successfully." - except RecordingAnnotation.DoesNotExist as e: - raise HttpError(404, "Recording annotation not found.") from e + annotation.delete() + return "Recording annotation deleted successfully." # Submit endpoint @router.patch("/{pk}/submit", response={200: dict}) def submit_recording_annotation(request: HttpRequest, pk: int): - try: - annotation = RecordingAnnotation.objects.get(pk=pk) - - # Check permission - if annotation.owner != request.user: - raise HttpError(403, "Permission denied.") - - annotation.submitted = True - annotation.save() - return { - "id": pk, - "submitted": annotation.submitted, - } - except RecordingAnnotation.DoesNotExist as e: - raise HttpError(404, "Recording annotation not found.") from e + annotation = get_object_or_404(RecordingAnnotation, pk=pk) + + # Check permission + if annotation.owner != request.user: + raise HttpError(403, "Permission denied.") + + annotation.submitted = True + annotation.save() + return { + "id": pk, + "submitted": annotation.submitted, + }