Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 90 additions & 5 deletions datadog_sync/model/synthetics_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
# Copyright 2019 Datadog, Inc.

from __future__ import annotations

import aiohttp
import certifi
import json
import ssl
from copy import deepcopy
from typing import TYPE_CHECKING, Optional, List, Dict, Tuple, cast
from yarl import URL

from datadog_sync.utils.base_resource import BaseResource, ResourceConfig, TaggingConfig
from datadog_sync.model.synthetics_mobile_applications_versions import SyntheticsMobileApplicationsVersions

if TYPE_CHECKING:
from datadog_sync.utils.custom_client import CustomClient

_FILE_DOWNLOAD_PATH = "/api/v2/synthetics/tests/{}/files/download"


class SyntheticsTests(BaseResource):
resource_type = "synthetics_tests"
Expand Down Expand Up @@ -52,6 +60,10 @@ class SyntheticsTests(BaseResource):
"stepCount",
"steps.public_id",
],
deep_diff_config={
"ignore_order": True,
"exclude_regex_paths": [r".*\['bucketKey'\]"],
},
non_nullable_attr=[
"options.monitor_options.on_missing_data",
"options.monitor_options.notify_audit",
Expand Down Expand Up @@ -235,6 +247,72 @@ def _replace_variable_public_id(resource: Dict, source_public_id: str, dest_publ
replaced = True
return replaced

@staticmethod
def _get_file_lists_with_bucket_keys(resource: Dict) -> List[Tuple[List, str]]:
"""Return (files_list, bucket_key_prefix) for all file lists that contain files with a bucketKey.

Covers:
- API test request files: config.request.files[] (prefix: api-upload-file)
- Multistep API test step files: config.steps[].request.files[] (prefix: api-upload-file)
- Browser test step files: steps[].params.files[] (prefix: browser-upload-file-step)
"""
result = []
request_files = resource.get("config", {}).get("request", {}).get("files", [])
if any("bucketKey" in f for f in request_files):
result.append((request_files, "api-upload-file"))
for step in resource.get("config", {}).get("steps", []):
step_files = step.get("request", {}).get("files", [])
if any("bucketKey" in f for f in step_files):
result.append((step_files, "api-upload-file"))
for step in resource.get("steps", []):
step_files = step.get("params", {}).get("files", [])
if any("bucketKey" in f for f in step_files):
result.append((step_files, "browser-upload-file-step"))
return result

async def _download_file(self, source_public_id: str, bucket_key: str) -> bytes:
"""Download a file from the source org via the presigned URL endpoint."""
source_client = self.config.source_client
presigned_url = await source_client.post(
_FILE_DOWNLOAD_PATH.format(source_public_id),
{"bucketKey": bucket_key},
)
if isinstance(presigned_url, str):
presigned_url = presigned_url.strip('"')

ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
async with session.get(URL(presigned_url, encoded=True)) as response:
return await response.read()

async def _replicate_files(self, source_public_id: str, resource: Dict) -> None:
"""Download files from source and inject content inline.

The create/update API stores the file in R2 and generates a new
bucketKey for the destination. Files whose download fails are
removed from the list so the API never receives an invalid entry.
"""
for files_list, _prefix in self._get_file_lists_with_bucket_keys(resource):
to_remove = []
for file_dict in files_list:
bucket_key = file_dict.get("bucketKey", "")
if not bucket_key:
continue
try:
raw = await self._download_file(source_public_id, bucket_key)
content = json.loads(raw)
file_dict.pop("bucketKey")
file_dict["content"] = content
file_dict["size"] = len(content)
except Exception as e:
self.config.logger.error(
f"Failed to download file {bucket_key} from source test {source_public_id}; "
f"removing file from payload. Error: {e}"
)
to_remove.append(file_dict)
for f in to_remove:
files_list.remove(f)

async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
destination_client = self.config.destination_client
test_type = resource["type"]
Expand All @@ -244,14 +322,19 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
# on destination during failover scenarios. Status can be manually changed after creation.
resource["status"] = "paused"

source_public_id = _id.split("#")[0]

await self._replicate_files(source_public_id, resource)

resp = await self._create_test(destination_client, test_type, resource)

# Now that we have the destination public_id, fix variables that embed the source public_id.
source_public_id = _id.split("#")[0]
# Fix variables that embed the source public_id.
dest_public_id = resp["public_id"]
if source_public_id != dest_public_id and self._replace_variable_public_id(
resource, source_public_id, dest_public_id
):
needs_update = False
if source_public_id != dest_public_id:
needs_update = self._replace_variable_public_id(resource, source_public_id, dest_public_id)

if needs_update:
resp = await self._update_test(destination_client, dest_public_id, resource)

# Persist metadata in state so destination JSON has it and diffs compare correctly.
Expand All @@ -267,6 +350,8 @@ async def update_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
dest_public_id = self.config.state.destination[self.resource_type][_id]["public_id"]
self._replace_variable_public_id(resource, source_public_id, dest_public_id)

await self._replicate_files(source_public_id, resource)

resp = await self._update_test(destination_client, dest_public_id, resource)
# Persist metadata in state so destination JSON has it and diffs compare correctly.
if resource.get("metadata"):
Expand Down
Loading