From cf73a305c8fb2a09544cc2a09f9cf3007b2a890a Mon Sep 17 00:00:00 2001 From: Malak El Kouri Date: Tue, 17 Mar 2026 17:07:05 +0100 Subject: [PATCH 1/3] add logic to replicate file content --- datadog_sync/model/synthetics_tests.py | 72 ++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 5 deletions(-) diff --git a/datadog_sync/model/synthetics_tests.py b/datadog_sync/model/synthetics_tests.py index 00e5cef7..4d9d2fa3 100644 --- a/datadog_sync/model/synthetics_tests.py +++ b/datadog_sync/model/synthetics_tests.py @@ -4,6 +4,10 @@ # Copyright 2019 Datadog, Inc. from __future__ import annotations + +import aiohttp +import certifi +import ssl from copy import deepcopy from typing import TYPE_CHECKING, Optional, List, Dict, Tuple, cast @@ -13,6 +17,8 @@ if TYPE_CHECKING: from datadog_sync.utils.custom_client import CustomClient +_FILE_DOWNLOAD_PATH = "/api/v2/synthetics/tests/{}/files/download" + class SyntheticsTests(BaseResource): resource_type = "synthetics_tests" @@ -51,6 +57,9 @@ class SyntheticsTests(BaseResource): "status", # Exclude status to prevent overwriting manual changes during sync "stepCount", "steps.public_id", + "config.request.files.bucketKey", + "steps.params.files.bucketKey", + "config.steps.request.files.bucketKey", ], non_nullable_attr=[ "options.monitor_options.on_missing_data", @@ -235,6 +244,44 @@ def _replace_variable_public_id(resource: Dict, source_public_id: str, dest_publ replaced = True return replaced + @staticmethod + def _get_files_with_bucket_key(resource: Dict) -> List[Tuple[Dict, str]]: + """Return (file_dict, bucket_key_prefix) for all files that have a bucketKey. + + Covers: + - API test request files: config.request.files[] (prefix: api-upload-file) + - Multistep API test step files: config.steps[].request.files[] (prefix: api-upload-file) + - Browser test step files: steps[].params.files[] (prefix: browser-upload-file-step) + """ + files = [] + for f in resource.get("config", {}).get("request", {}).get("files", []): + if "bucketKey" in f: + files.append((f, "api-upload-file")) + for step in resource.get("config", {}).get("steps", []): + for f in step.get("request", {}).get("files", []): + if "bucketKey" in f: + files.append((f, "api-upload-file")) + for step in resource.get("steps", []): + for f in step.get("params", {}).get("files", []): + if "bucketKey" in f: + files.append((f, "browser-upload-file-step")) + return files + + async def _download_file(self, source_public_id: str, bucket_key: str) -> bytes: + """Download a file from the source org via the presigned URL endpoint.""" + source_client = self.config.source_client + presigned_url = await source_client.post( + _FILE_DOWNLOAD_PATH.format(source_public_id), + {"bucketKey": bucket_key}, + ) + if isinstance(presigned_url, str): + presigned_url = presigned_url.strip('"') + + ssl_context = ssl.create_default_context(cafile=certifi.where()) + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session: + async with session.get(presigned_url) as response: + return await response.read() + async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: destination_client = self.config.destination_client test_type = resource["type"] @@ -244,14 +291,29 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: # on destination during failover scenarios. Status can be manually changed after creation. resource["status"] = "paused" + source_public_id = _id.split("#")[0] + + # Download files from source and inject content inline so the create API + # stores the file in R2's S3 and generates a new bucketKey for the destination. + for file_dict, _prefix in self._get_files_with_bucket_key(resource): + bucket_key = file_dict.get("bucketKey", "") + try: + content = await self._download_file(source_public_id, bucket_key) + file_dict.pop("bucketKey") + file_dict["content"] = content.decode("utf-8") + file_dict["size"] = len(content) + except Exception: + self.config.logger.error(f"Failed to download file {bucket_key} from source test {source_public_id}") + resp = await self._create_test(destination_client, test_type, resource) - # Now that we have the destination public_id, fix variables that embed the source public_id. - source_public_id = _id.split("#")[0] + # Fix variables that embed the source public_id. dest_public_id = resp["public_id"] - if source_public_id != dest_public_id and self._replace_variable_public_id( - resource, source_public_id, dest_public_id - ): + needs_update = False + if source_public_id != dest_public_id: + needs_update = self._replace_variable_public_id(resource, source_public_id, dest_public_id) + + if needs_update: resp = await self._update_test(destination_client, dest_public_id, resource) # Persist metadata in state so destination JSON has it and diffs compare correctly. From 1799536298024cd3cdef534757d6b0c2fd75f15c Mon Sep 17 00:00:00 2001 From: Malak El Kouri Date: Wed, 18 Mar 2026 12:07:45 +0100 Subject: [PATCH 2/3] fix bucketkey and content field --- datadog_sync/model/synthetics_tests.py | 67 +++++++++++++++++--------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/datadog_sync/model/synthetics_tests.py b/datadog_sync/model/synthetics_tests.py index 4d9d2fa3..43381036 100644 --- a/datadog_sync/model/synthetics_tests.py +++ b/datadog_sync/model/synthetics_tests.py @@ -245,27 +245,27 @@ def _replace_variable_public_id(resource: Dict, source_public_id: str, dest_publ return replaced @staticmethod - def _get_files_with_bucket_key(resource: Dict) -> List[Tuple[Dict, str]]: - """Return (file_dict, bucket_key_prefix) for all files that have a bucketKey. + def _get_file_lists_with_bucket_keys(resource: Dict) -> List[Tuple[List, str]]: + """Return (files_list, bucket_key_prefix) for all file lists that contain files with a bucketKey. Covers: - API test request files: config.request.files[] (prefix: api-upload-file) - Multistep API test step files: config.steps[].request.files[] (prefix: api-upload-file) - Browser test step files: steps[].params.files[] (prefix: browser-upload-file-step) """ - files = [] - for f in resource.get("config", {}).get("request", {}).get("files", []): - if "bucketKey" in f: - files.append((f, "api-upload-file")) + result = [] + request_files = resource.get("config", {}).get("request", {}).get("files", []) + if any("bucketKey" in f for f in request_files): + result.append((request_files, "api-upload-file")) for step in resource.get("config", {}).get("steps", []): - for f in step.get("request", {}).get("files", []): - if "bucketKey" in f: - files.append((f, "api-upload-file")) + step_files = step.get("request", {}).get("files", []) + if any("bucketKey" in f for f in step_files): + result.append((step_files, "api-upload-file")) for step in resource.get("steps", []): - for f in step.get("params", {}).get("files", []): - if "bucketKey" in f: - files.append((f, "browser-upload-file-step")) - return files + step_files = step.get("params", {}).get("files", []) + if any("bucketKey" in f for f in step_files): + result.append((step_files, "browser-upload-file-step")) + return result async def _download_file(self, source_public_id: str, bucket_key: str) -> bytes: """Download a file from the source org via the presigned URL endpoint.""" @@ -282,6 +282,33 @@ async def _download_file(self, source_public_id: str, bucket_key: str) -> bytes: async with session.get(presigned_url) as response: return await response.read() + async def _replicate_files(self, source_public_id: str, resource: Dict) -> None: + """Download files from source and inject content inline. + + The create/update API stores the file in R2 and generates a new + bucketKey for the destination. Files whose download fails are + removed from the list so the API never receives an invalid entry. + """ + for files_list, _prefix in self._get_file_lists_with_bucket_keys(resource): + to_remove = [] + for file_dict in files_list: + bucket_key = file_dict.get("bucketKey", "") + if not bucket_key: + continue + try: + content = await self._download_file(source_public_id, bucket_key) + file_dict.pop("bucketKey") + file_dict["content"] = content.decode("utf-8") + file_dict["size"] = len(content) + except Exception: + self.config.logger.error( + f"Failed to download file {bucket_key} from source test {source_public_id}; " + "removing file from payload" + ) + to_remove.append(file_dict) + for f in to_remove: + files_list.remove(f) + async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: destination_client = self.config.destination_client test_type = resource["type"] @@ -293,17 +320,7 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: source_public_id = _id.split("#")[0] - # Download files from source and inject content inline so the create API - # stores the file in R2's S3 and generates a new bucketKey for the destination. - for file_dict, _prefix in self._get_files_with_bucket_key(resource): - bucket_key = file_dict.get("bucketKey", "") - try: - content = await self._download_file(source_public_id, bucket_key) - file_dict.pop("bucketKey") - file_dict["content"] = content.decode("utf-8") - file_dict["size"] = len(content) - except Exception: - self.config.logger.error(f"Failed to download file {bucket_key} from source test {source_public_id}") + await self._replicate_files(source_public_id, resource) resp = await self._create_test(destination_client, test_type, resource) @@ -329,6 +346,8 @@ async def update_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: dest_public_id = self.config.state.destination[self.resource_type][_id]["public_id"] self._replace_variable_public_id(resource, source_public_id, dest_public_id) + await self._replicate_files(source_public_id, resource) + resp = await self._update_test(destination_client, dest_public_id, resource) # Persist metadata in state so destination JSON has it and diffs compare correctly. if resource.get("metadata"): From be9438133d6c18b48edcd126a7631d4b7e86c534 Mon Sep 17 00:00:00 2001 From: Malak El Kouri Date: Wed, 18 Mar 2026 18:02:47 +0100 Subject: [PATCH 3/3] fix --- datadog_sync/model/synthetics_tests.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datadog_sync/model/synthetics_tests.py b/datadog_sync/model/synthetics_tests.py index 43381036..813874e8 100644 --- a/datadog_sync/model/synthetics_tests.py +++ b/datadog_sync/model/synthetics_tests.py @@ -7,9 +7,11 @@ import aiohttp import certifi +import json import ssl from copy import deepcopy from typing import TYPE_CHECKING, Optional, List, Dict, Tuple, cast +from yarl import URL from datadog_sync.utils.base_resource import BaseResource, ResourceConfig, TaggingConfig from datadog_sync.model.synthetics_mobile_applications_versions import SyntheticsMobileApplicationsVersions @@ -57,10 +59,11 @@ class SyntheticsTests(BaseResource): "status", # Exclude status to prevent overwriting manual changes during sync "stepCount", "steps.public_id", - "config.request.files.bucketKey", - "steps.params.files.bucketKey", - "config.steps.request.files.bucketKey", ], + deep_diff_config={ + "ignore_order": True, + "exclude_regex_paths": [r".*\['bucketKey'\]"], + }, non_nullable_attr=[ "options.monitor_options.on_missing_data", "options.monitor_options.notify_audit", @@ -279,7 +282,7 @@ async def _download_file(self, source_public_id: str, bucket_key: str) -> bytes: ssl_context = ssl.create_default_context(cafile=certifi.where()) async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session: - async with session.get(presigned_url) as response: + async with session.get(URL(presigned_url, encoded=True)) as response: return await response.read() async def _replicate_files(self, source_public_id: str, resource: Dict) -> None: @@ -296,14 +299,15 @@ async def _replicate_files(self, source_public_id: str, resource: Dict) -> None: if not bucket_key: continue try: - content = await self._download_file(source_public_id, bucket_key) + raw = await self._download_file(source_public_id, bucket_key) + content = json.loads(raw) file_dict.pop("bucketKey") - file_dict["content"] = content.decode("utf-8") + file_dict["content"] = content file_dict["size"] = len(content) - except Exception: + except Exception as e: self.config.logger.error( f"Failed to download file {bucket_key} from source test {source_public_id}; " - "removing file from payload" + f"removing file from payload. Error: {e}" ) to_remove.append(file_dict) for f in to_remove: