Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#5015](https://github.com/open-telemetry/opentelemetry-python/pull/5015))
- `opentelemetry-sdk`: cache TracerConfig into the tracer, this changes an internal interface. Only one Tracer with the same instrumentation scope will be created
([#5007](https://github.com/open-telemetry/opentelemetry-python/pull/5007))
- `opentelemetry-exporter-otlp-proto-grpc`, `opentelemetry-exporter-otlp-proto-http`: add docker-tests coverage of metrics export
([#5030](https://github.com/open-telemetry/opentelemetry-python/pull/5030))

## Version 1.40.0/0.61b0 (2026-03-04)

Expand Down
6 changes: 2 additions & 4 deletions tests/opentelemetry-docker-tests/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3'

services:
otopencensus:
image: rafaeljesus/opencensus-collector:latest
Expand All @@ -8,7 +6,7 @@ services:
- "8888:8888"
- "55678:55678"
otcollector:
image: otel/opentelemetry-collector:0.31.0
image: otel/opentelemetry-collector:0.149.0
ports:
- "4317:4317"
- "4318:55681"
- "4318:4318"
124 changes: 124 additions & 0 deletions tests/opentelemetry-docker-tests/tests/otlpexporter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from abc import ABC, abstractmethod

from opentelemetry.context import attach, detach, set_value
from opentelemetry.sdk.metrics._internal.export import (
MetricExportResult,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.metrics._internal.point import (
Metric,
NumberDataPoint,
Sum,
)
from opentelemetry.sdk.metrics.export import (
MetricsData,
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.util.instrumentation import InstrumentationScope


class ExportStatusSpanProcessor(SimpleSpanProcessor):
Expand All @@ -29,13 +46,37 @@ def on_end(self, span):
detach(token)


class ExportStatusMetricReader(PeriodicExportingMetricReader):
def __init__(self, exporter, **kwargs):
# Very short export interval for testing
super().__init__(exporter, export_interval_millis=1, **kwargs)
self.export_status = []

def _receive_metrics(self, metrics_data, timeout_millis=10_000, **kwargs):
token = attach(set_value("suppress_instrumentation", True))
try:
export_result = self._exporter.export(
metrics_data, timeout_millis=timeout_millis
)
self.export_status.append(export_result)
except Exception:
self.export_status.append(MetricExportResult.FAILURE)
finally:
detach(token)


class BaseTestOTLPExporter(ABC):
@abstractmethod
def get_span_processor(self):
pass

@abstractmethod
def get_metric_reader(self):
pass

# pylint: disable=no-member
def test_export(self):
"""Test span export"""
with self.tracer.start_as_current_span("foo"):
with self.tracer.start_as_current_span("bar"):
with self.tracer.start_as_current_span("baz"):
Expand All @@ -46,3 +87,86 @@ def test_export(self):
for export_status in self.span_processor.export_status:
self.assertEqual(export_status.name, "SUCCESS")
self.assertEqual(export_status.value, 0)

def test_metrics_export(self):
"""Test metrics export from full metrics SDK pipeline"""
counter = self.meter.create_counter("test_counter")
histogram = self.meter.create_histogram("test_histogram")
up_down_counter = self.meter.create_up_down_counter(
"test_up_down_counter"
)

counter.add(1, {"key1": "value1"})
counter.add(2, {"key2": "value2"})
histogram.record(1.5, {"key3": "value3"})
histogram.record(2.5, {"key4": "value4"})
up_down_counter.add(3, {"key5": "value5"})
up_down_counter.add(-1, {"key6": "value6"})
self.metric_reader.force_flush(timeout_millis=5000)
time.sleep(0.1)

# Verify at least one export happened
self.assertTrue(len(self.metric_reader.export_status) >= 1)
# Verify all exports succeeded
for export_status in self.metric_reader.export_status:
self.assertEqual(export_status.name, "SUCCESS")
self.assertEqual(export_status.value, 0)

@abstractmethod
def test_metrics_export_batch_size_two(self):
"""Test metrics max_export_batch_size=2 directly through exporter"""

def _create_test_metrics_data(self, num_data_points=6):
"""Create test metrics data with specified number of data points."""
data_points = []
for i in range(num_data_points):
dp = NumberDataPoint(
attributes={"key": f"value{i}"},
start_time_unix_nano=1000000 + i,
time_unix_nano=2000000 + i,
value=i + 1.0,
)
data_points.append(dp)
metric = Metric(
name="otel_test_counter_foobar",
description="Test counter metric for batch verification",
unit="1",
data=Sum(
data_points=data_points,
aggregation_temporality=1, # CUMULATIVE
is_monotonic=True,
),
)
scope_metrics = ScopeMetrics(
scope=InstrumentationScope(name="test_scope"),
metrics=[metric],
schema_url=None,
)
resource_metrics = ResourceMetrics(
resource=Resource.create({"service.name": "test-service"}),
scope_metrics=[scope_metrics],
schema_url=None,
)

return MetricsData(resource_metrics=[resource_metrics]), data_points

def _verify_batch_export_result(
self, result, data_points, batch_counter, max_batch_size=2
):
"""Verify export result and batch count for export batching tests."""
self.assertEqual(
result.name, "SUCCESS", f"Expected SUCCESS, got: {result}"
)
self.assertEqual(
result.value, 0, f"Expected result code 0, got: {result.value}"
)

expected_batches = (
len(data_points) + max_batch_size - 1
) // max_batch_size
self.assertEqual(
batch_counter.export_call_count,
expected_batches,
f"Expected {expected_batches} export calls with max_export_batch_size={max_batch_size} and {len(data_points)} data points, "
f"but got {batch_counter.export_call_count} calls",
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.test.globals_test import (
reset_metrics_globals,
reset_trace_globals,
)
from opentelemetry.test.test_base import TestBase

from . import BaseTestOTLPExporter, ExportStatusSpanProcessor
from . import (
BaseTestOTLPExporter,
ExportStatusMetricReader,
ExportStatusSpanProcessor,
)


class BatchCountingGRPCExporter(OTLPMetricExporter):
"""gRPC exporter that counts actual batch export calls for testing."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.export_call_count = 0

def _export(self, *args, **kwargs):
self.export_call_count += 1
return super()._export(*args, **kwargs)


class TestOTLPGRPCExporter(BaseTestOTLPExporter, TestBase):
Expand All @@ -29,11 +53,37 @@ def get_span_processor(self):
OTLPSpanExporter(insecure=True, timeout=1)
)

def get_metric_reader(self):
return ExportStatusMetricReader(
OTLPMetricExporter(
insecure=True, timeout=1, max_export_batch_size=2
)
)

def setUp(self):
super().setUp()

reset_trace_globals()
trace.set_tracer_provider(TracerProvider())
self.tracer = trace.get_tracer(__name__)
self.span_processor = self.get_span_processor()

trace.get_tracer_provider().add_span_processor(self.span_processor)

reset_metrics_globals()
self.metric_reader = self.get_metric_reader()
meter_provider = MeterProvider(metric_readers=[self.metric_reader])
metrics.set_meter_provider(meter_provider)
self.meter = metrics.get_meter(__name__)

def test_metrics_export_batch_size_two(self):
"""Test metrics max_export_batch_size=2 directly through gRPC exporter"""
batch_counter = BatchCountingGRPCExporter(
endpoint="localhost:4317", insecure=True, max_export_batch_size=2
)
metrics_data, data_points = self._create_test_metrics_data(
num_data_points=6
)
result = batch_counter.export(metrics_data)
self._verify_batch_export_result(
result, data_points, batch_counter, max_batch_size=2
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.test.globals_test import (
reset_metrics_globals,
reset_trace_globals,
)
from opentelemetry.test.test_base import TestBase

from . import BaseTestOTLPExporter, ExportStatusSpanProcessor
from . import (
BaseTestOTLPExporter,
ExportStatusMetricReader,
ExportStatusSpanProcessor,
)


class BatchCountingHTTPExporter(OTLPMetricExporter):
"""HTTP exporter that counts actual batch export calls for testing."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.export_call_count = 0

def _export(self, *args, **kwargs):
self.export_call_count += 1
return super()._export(*args, **kwargs)


class TestOTLPHTTPExporter(BaseTestOTLPExporter, TestBase):
# pylint: disable=no-self-use
def get_span_processor(self):
return ExportStatusSpanProcessor(OTLPSpanExporter())

def get_metric_reader(self):
return ExportStatusMetricReader(
OTLPMetricExporter(max_export_batch_size=2)
)

def setUp(self):
super().setUp()

reset_trace_globals()
trace.set_tracer_provider(TracerProvider())
self.tracer = trace.get_tracer(__name__)
self.span_processor = self.get_span_processor()

trace.get_tracer_provider().add_span_processor(self.span_processor)

reset_metrics_globals()
self.metric_reader = self.get_metric_reader()
meter_provider = MeterProvider(metric_readers=[self.metric_reader])
metrics.set_meter_provider(meter_provider)
self.meter = metrics.get_meter(__name__)

def test_metrics_export_batch_size_two(self):
"""Test metrics max_export_batch_size=2 directly through HTTP exporter"""
batch_counter = BatchCountingHTTPExporter(
endpoint="http://localhost:4318/v1/metrics",
max_export_batch_size=2,
)
metrics_data, data_points = self._create_test_metrics_data(
num_data_points=6
)
result = batch_counter.export(metrics_data)
self._verify_batch_export_result(
result, data_points, batch_counter, max_batch_size=2
)
Loading