diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d5f0c4452f..0a76a2f5d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-exporter-otlp-proto-http`: Handle HTTP 413 (Payload Too Large) responses in trace and log exporters by splitting the batch in half and retrying each half recursively + ([#5032](https://github.com/open-telemetry/opentelemetry-python/pull/5032)) - `opentelemetry-sdk`: Add shared `_parse_headers` helper for declarative config OTLP exporters ([#5021](https://github.com/open-telemetry/opentelemetry-python/pull/5021)) - `opentelemetry-api`: Replace a broad exception in attribute cleaning tests to satisfy pylint in the `lint-opentelemetry-api` CI job diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 0658d0968e6..4cf2fc61b41 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -34,6 +34,10 @@ def _is_retryable(resp: requests.Response) -> bool: return False +def _is_payload_too_large(resp: requests.Response) -> bool: + return resp.status_code == 413 + + def _load_session_from_envvar( cred_envvar: Literal[ _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 7aea76be8d2..e53265ecfad 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -31,6 +31,7 @@ Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( + _is_payload_too_large, _is_retryable, _load_session_from_envvar, ) @@ -183,8 +184,14 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE - serialized_data = encode_logs(batch).SerializeToString() deadline_sec = time() + self._timeout + return self._export_batch(batch, deadline_sec) + + def _export_batch( + self, batch: Sequence[ReadableLogRecord], deadline_sec: float + ) -> LogRecordExportResult: + serialized_data = encode_logs(batch).SerializeToString() + for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) @@ -201,6 +208,33 @@ def export( retryable = _is_retryable(resp) status_code = resp.status_code + if _is_payload_too_large(resp): + # 413 handling always returns here; will not fall through + # to the 'if not retryable' check below. + if len(batch) <= 1: + _logger.error( + "Single log record exceeds backend payload size limit, dropping log record" + ) + return LogRecordExportResult.FAILURE + if time() >= deadline_sec: + _logger.error( + "Payload too large but deadline expired, dropping %d log records", + len(batch), + ) + return LogRecordExportResult.FAILURE + mid = len(batch) // 2 + _logger.warning( + "Payload too large (%d log records), splitting into two batches", + len(batch), + ) + first = self._export_batch(list(batch[:mid]), deadline_sec) + if first != LogRecordExportResult.SUCCESS: + return LogRecordExportResult.FAILURE + second = self._export_batch( + list(batch[mid:]), deadline_sec + ) + return second + if not retryable: _logger.error( "Failed to export logs batch code: %s, reason: %s", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index d02f94adf05..798277a2199 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -33,6 +33,7 @@ Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( + _is_payload_too_large, _is_retryable, _load_session_from_envvar, ) @@ -176,8 +177,14 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE - serialized_data = encode_spans(spans).SerializePartialToString() deadline_sec = time() + self._timeout + return self._export_batch(spans, deadline_sec) + + def _export_batch( + self, spans: Sequence[ReadableSpan], deadline_sec: float + ) -> SpanExportResult: + serialized_data = encode_spans(spans).SerializePartialToString() + for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) @@ -194,6 +201,33 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: retryable = _is_retryable(resp) status_code = resp.status_code + if _is_payload_too_large(resp): + # 413 handling always returns here; will not fall through + # to the 'if not retryable' check below. + if len(spans) <= 1: + _logger.error( + "Single span exceeds backend payload size limit, dropping span" + ) + return SpanExportResult.FAILURE + if time() >= deadline_sec: + _logger.error( + "Payload too large but deadline expired, dropping %d spans", + len(spans), + ) + return SpanExportResult.FAILURE + mid = len(spans) // 2 + _logger.warning( + "Payload too large (%d spans), splitting into two batches", + len(spans), + ) + first = self._export_batch(list(spans[:mid]), deadline_sec) + if first != SpanExportResult.SUCCESS: + return SpanExportResult.FAILURE + second = self._export_batch( + list(spans[mid:]), deadline_sec + ) + return second + if not retryable: _logger.error( "Failed to export span batch code: %s, reason: %s", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index c86ac1f6ba1..d6204c0ca3d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -525,7 +525,7 @@ def test_timeout_set_correctly(self, mock_post): def export_side_effect(*args, **kwargs): # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. - self.assertAlmostEqual(0.4, kwargs["timeout"], 2) + self.assertAlmostEqual(0.4, kwargs["timeout"], 1) return resp mock_post.side_effect = export_side_effect @@ -562,3 +562,136 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + @patch.object(Session, "post") + def test_413_splits_batch_and_succeeds(self, mock_post): + """When backend returns 413, the exporter should split the batch in half and retry each half.""" + exporter = OTLPLogExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_ok = Response() + resp_ok.status_code = 200 + + mock_post.side_effect = [resp_413, resp_ok, resp_ok] + + log_data = self._get_sdk_log_data() + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export(log_data) + + self.assertEqual(result, LogRecordExportResult.SUCCESS) + # 1 initial call (413) + 2 split calls + self.assertEqual(mock_post.call_count, 3) + self.assertTrue( + any( + "Payload too large" in record.message + for record in warning.records + ) + ) + + @patch.object(Session, "post") + def test_413_single_log_returns_failure(self, mock_post): + """When a single log record is too large, the exporter should return FAILURE.""" + exporter = OTLPLogExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + mock_post.return_value = resp_413 + + log_data = self._get_sdk_log_data()[:1] + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export(log_data) + + self.assertEqual(result, LogRecordExportResult.FAILURE) + self.assertEqual(mock_post.call_count, 1) + self.assertTrue( + any( + "Single log record exceeds backend payload size limit" + in record.message + for record in warning.records + ) + ) + + @patch.object(Session, "post") + def test_413_recursive_splitting(self, mock_post): + """When a split half still returns 413, the exporter should continue splitting recursively.""" + exporter = OTLPLogExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_ok = Response() + resp_ok.status_code = 200 + + log_data = self._get_sdk_log_data() # returns 3 logs + + # 3 logs: first 413 → split [0],[1,2] + # [0] → ok + # [1,2] → 413 → split [1],[2] → ok, ok + mock_post.side_effect = [resp_413, resp_ok, resp_413, resp_ok, resp_ok] + + with self.assertLogs(level=WARNING): + result = exporter.export(log_data) + + self.assertEqual(result, LogRecordExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 5) + + @patch.object(Session, "post") + def test_413_partial_failure(self, mock_post): + """When the first half fails with a non-retryable error, the second half is not attempted (short-circuit).""" + exporter = OTLPLogExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_400 = Response() + resp_400.status_code = 400 + resp_400.reason = "Bad Request" + + log_data = self._get_sdk_log_data() + + # First call returns 413, first half gets 400 → short-circuit + mock_post.side_effect = [resp_413, resp_400] + + with self.assertLogs(level=WARNING): + result = exporter.export(log_data) + + self.assertEqual(result, LogRecordExportResult.FAILURE) + self.assertEqual(mock_post.call_count, 2) + + @patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter.time", + ) + @patch.object(Session, "post") + def test_413_deadline_expired_returns_failure(self, mock_post, mock_time): + """When a 413 is received but the deadline has expired, return FAILURE without splitting.""" + mock_time.side_effect = [100.0, 100.0, 100.6] + exporter = OTLPLogExporter(timeout=0.5) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + mock_post.return_value = resp_413 + + log_data = self._get_sdk_log_data() + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export(log_data) + + self.assertEqual(result, LogRecordExportResult.FAILURE) + self.assertEqual(mock_post.call_count, 1) + self.assertTrue( + any( + "deadline expired" in record.message + for record in warning.records + ) + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 5f61344bbf1..c4f182f4efc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -345,7 +345,7 @@ def test_timeout_set_correctly(self, mock_post): def export_side_effect(*args, **kwargs): # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. - self.assertAlmostEqual(0.4, kwargs["timeout"], 2) + self.assertAlmostEqual(0.4, kwargs["timeout"], 1) return resp mock_post.side_effect = export_side_effect @@ -380,3 +380,205 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + @patch.object(Session, "post") + def test_413_splits_batch_and_succeeds(self, mock_post): + """When backend returns 413, the exporter should split the batch in half and retry each half.""" + exporter = OTLPSpanExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_ok = Response() + resp_ok.status_code = 200 + + # First call returns 413, subsequent calls succeed + mock_post.side_effect = [resp_413, resp_ok, resp_ok] + + span1 = _Span( + "span1", + context=Mock( + **{ + "trace_state": {"a": "b"}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + span2 = _Span( + "span2", + context=Mock( + **{ + "trace_state": {"a": "b"}, + "span_id": 10217189687419569866, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export([span1, span2]) + + self.assertEqual(result, SpanExportResult.SUCCESS) + # 1 initial call (413) + 2 split calls (each succeeds) + self.assertEqual(mock_post.call_count, 3) + self.assertIn( + "Payload too large (2 spans), splitting into two batches", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_413_single_span_returns_failure(self, mock_post): + """When a single span is too large, the exporter should return FAILURE.""" + exporter = OTLPSpanExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + mock_post.return_value = resp_413 + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export([BASIC_SPAN]) + + self.assertEqual(result, SpanExportResult.FAILURE) + self.assertEqual(mock_post.call_count, 1) + self.assertTrue( + any( + "Single span exceeds backend payload size limit" + in record.message + for record in warning.records + ) + ) + + @patch.object(Session, "post") + def test_413_recursive_splitting(self, mock_post): + """When a split half still returns 413, the exporter should continue splitting recursively.""" + exporter = OTLPSpanExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_ok = Response() + resp_ok.status_code = 200 + + spans = [] + for idx in range(4): + spans.append( + _Span( + f"span{idx}", + context=Mock( + **{ + "trace_state": {}, + "span_id": 10217189687419569865 + idx, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + ) + + # 4 spans: first 413 → split [0,1],[2,3] + # [0,1] → 413 → split [0],[1] → ok, ok + # [2,3] → ok + mock_post.side_effect = [resp_413, resp_413, resp_ok, resp_ok, resp_ok] + + with self.assertLogs(level=WARNING): + result = exporter.export(spans) + + self.assertEqual(result, SpanExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 5) + + @patch.object(Session, "post") + def test_413_partial_failure(self, mock_post): + """When the first half fails with a non-retryable error, the second half is not attempted (short-circuit).""" + exporter = OTLPSpanExporter(timeout=10) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + resp_400 = Response() + resp_400.status_code = 400 + resp_400.reason = "Bad Request" + + span1 = _Span( + "span1", + context=Mock( + **{ + "trace_state": {}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + span2 = _Span( + "span2", + context=Mock( + **{ + "trace_state": {}, + "span_id": 10217189687419569866, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + # First call returns 413, first half gets 400 (non-retryable) → short-circuit, second half never attempted + mock_post.side_effect = [resp_413, resp_400] + + with self.assertLogs(level=WARNING): + result = exporter.export([span1, span2]) + + self.assertEqual(result, SpanExportResult.FAILURE) + # Only 2 calls: initial 413 + first half 400. Second half never attempted. + self.assertEqual(mock_post.call_count, 2) + + @patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.time", + ) + @patch.object(Session, "post") + def test_413_deadline_expired_returns_failure(self, mock_post, mock_time): + """When a 413 is received but the deadline has expired, return FAILURE without splitting.""" + # time() calls: export() deadline_sec setup, _export timeout calc, deadline check in 413 handler + mock_time.side_effect = [100.0, 100.0, 100.6] + exporter = OTLPSpanExporter(timeout=0.5) + + resp_413 = Response() + resp_413.status_code = 413 + resp_413.reason = "Request Entity Too Large" + + mock_post.return_value = resp_413 + + span1 = _Span( + "span1", + context=Mock( + **{ + "trace_state": {}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + span2 = _Span( + "span2", + context=Mock( + **{ + "trace_state": {}, + "span_id": 10217189687419569866, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + with self.assertLogs(level=WARNING) as warning: + result = exporter.export([span1, span2]) + + self.assertEqual(result, SpanExportResult.FAILURE) + self.assertEqual(mock_post.call_count, 1) + self.assertTrue( + any( + "deadline expired" in record.message + for record in warning.records + ) + )