Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
from typing import Any
from collections import defaultdict
Expand Down Expand Up @@ -49,7 +50,33 @@ def is_v1_payload(data: dict):
return get_request_content(data).get("api_version") == "v1"


@rfc("https://docs.google.com/document/d/1Fai2gZlkvfa_WQs_yJsmi3nUwiOsMtNu2LFBnbTHJRA/edit#heading=h.llmi584vls7r")
def _first_lifecycle_per_runtime(
telemetry_data: list[dict[str, Any]],
skip_request_types: frozenset[str],
) -> list[tuple[str, str]]:
"""Extract (runtime_id, request_type) for the first lifecycle message per runtime.

Sorts by (runtime_id, seq_id, batch_index) so the first per runtime is correct.
"""
messages: list[tuple[str, int, int, str]] = []
for data in telemetry_data:
content: dict[str, Any] = data["request"]["content"]
runtime_id: str = content.get("runtime_id", "")
seq_id: int = content.get("seq_id", 0)
if content.get("request_type") == "message-batch":
for i, msg in enumerate(content.get("payload", [])):
rt: str | None = msg.get("request_type")
if rt and rt not in skip_request_types:
messages.append((runtime_id, seq_id, i, rt))
else:
rt = content.get("request_type")
if rt and rt not in skip_request_types:
messages.append((runtime_id, seq_id, 0, rt))

messages.sort(key=lambda m: (m[0], m[1], m[2]))
return [(rid, next(grp)[3]) for rid, grp in itertools.groupby(messages, key=lambda m: m[0])]


@features.telemetry_instrumentation
class Test_Telemetry:
"""Test that instrumentation telemetry is sent"""
Expand Down Expand Up @@ -214,38 +241,23 @@ def test_app_started_sent_exactly_once(self):
assert count == 1, f"runtime id {runtime_id} did not reported exactly one app-started event"

@features.telemetry_app_started_event
def test_app_started_is_first_message(self):
"""Request type app-started is the first telemetry message or the first message in the first batch"""
telemetry_data = list(interfaces.library.get_telemetry_data(flatten_message_batches=False))
def test_app_started_is_first_message(self) -> None:
"""Request type app-started is the first telemetry message by seq_id (per runtime_id).

Uses seq_id ordering for deterministic testing, where the first captured batch has to be the first one sent.
"""
skip_request_types: frozenset[str] = frozenset({"sketches", "generate-metrics", "logs", "distributions"})
telemetry_data: list[dict[str, Any]] = list(
interfaces.library.get_telemetry_data(flatten_message_batches=False)
)
assert len(telemetry_data) > 0, "No telemetry messages"
for batch in telemetry_data:
if batch["request"]["content"].get("request_type") == "message-batch":
if all(
message.get("request_type") in ["sketches", "generate-metrics", "logs"]
for message in batch["request"]["content"]["payload"]
):
# In some cases (e.g. with the trace exporter) a telemetry payload without app-lifecycles messages can be sent first.
# If the batch contains only messages not related to app-lifecycle we can ignore it.
continue
first_message = batch["request"]["content"]["payload"][0]
assert first_message.get("request_type") == "app-started", (
"app-started was not the first message in the first batch"
)
return
else:
# In theory, app-started must have seq_id 1, but tracers may skip seq_ids if sending messages fail.
# So we will check that app-started is the first message by seq_id, rather than strictly seq_id 1.
telemetry_data = sorted(telemetry_data, key=lambda x: x["request"]["content"]["seq_id"])
app_started = [
d for d in telemetry_data if d["request"]["content"].get("request_type") == "app-started"
]
assert app_started, "app-started message not found"
min_seq_id = min(d["request"]["content"]["seq_id"] for d in telemetry_data)
assert app_started[0]["request"]["content"]["seq_id"] == min_seq_id, (
"app-started is not the first message by seq_id"
)
return
raise ValueError("app-started message not found")

first_per_runtime: list[tuple[str, str]] = _first_lifecycle_per_runtime(telemetry_data, skip_request_types)
assert first_per_runtime, "No lifecycle telemetry messages found (only sketches/generate-metrics/logs?)"
for runtime_id, first_type in first_per_runtime:
assert first_type == "app-started", (
f"runtime_id {runtime_id}: first lifecycle message is {first_type!r}, expected app-started"
)

@bug(context.agent_version > "7.53.0", reason="APMAPI-926")
def test_proxy_forwarding(self):
Expand Down
Loading