Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-exporter-otlp-proto-http`: Handle HTTP 413 (Payload Too Large) responses in trace and log exporters by splitting the batch in half and retrying each half recursively
([#5032](https://github.com/open-telemetry/opentelemetry-python/pull/5032))
- `opentelemetry-sdk`: Add shared `_parse_headers` helper for declarative config OTLP exporters
([#5021](https://github.com/open-telemetry/opentelemetry-python/pull/5021))
- `opentelemetry-api`: Replace a broad exception in attribute cleaning tests to satisfy pylint in the `lint-opentelemetry-api` CI job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def _is_retryable(resp: requests.Response) -> bool:
return False


def _is_payload_too_large(resp: requests.Response) -> bool:
return resp.status_code == 413


def _load_session_from_envvar(
cred_envvar: Literal[
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
_is_payload_too_large,
_is_retryable,
_load_session_from_envvar,
)
Expand Down Expand Up @@ -183,8 +184,14 @@ def export(
_logger.warning("Exporter already shutdown, ignoring batch")
return LogRecordExportResult.FAILURE

serialized_data = encode_logs(batch).SerializeToString()
deadline_sec = time() + self._timeout
return self._export_batch(batch, deadline_sec)

def _export_batch(
self, batch: Sequence[ReadableLogRecord], deadline_sec: float
) -> LogRecordExportResult:
serialized_data = encode_logs(batch).SerializeToString()

for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
Expand All @@ -201,6 +208,33 @@ def export(
retryable = _is_retryable(resp)
status_code = resp.status_code

if _is_payload_too_large(resp):
# 413 handling always returns here; will not fall through
# to the 'if not retryable' check below.
if len(batch) <= 1:
_logger.error(
"Single log record exceeds backend payload size limit, dropping log record"
)
return LogRecordExportResult.FAILURE
if time() >= deadline_sec:
_logger.error(
"Payload too large but deadline expired, dropping %d log records",
len(batch),
)
return LogRecordExportResult.FAILURE
mid = len(batch) // 2
_logger.warning(
"Payload too large (%d log records), splitting into two batches",
len(batch),
)
first = self._export_batch(list(batch[:mid]), deadline_sec)
if first != LogRecordExportResult.SUCCESS:
return LogRecordExportResult.FAILURE
second = self._export_batch(
list(batch[mid:]), deadline_sec
)
return second

if not retryable:
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
_is_payload_too_large,
_is_retryable,
_load_session_from_envvar,
)
Expand Down Expand Up @@ -176,8 +177,14 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
_logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILURE

serialized_data = encode_spans(spans).SerializePartialToString()
deadline_sec = time() + self._timeout
return self._export_batch(spans, deadline_sec)

def _export_batch(
self, spans: Sequence[ReadableSpan], deadline_sec: float
) -> SpanExportResult:
serialized_data = encode_spans(spans).SerializePartialToString()

for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
Expand All @@ -194,6 +201,33 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
retryable = _is_retryable(resp)
status_code = resp.status_code

if _is_payload_too_large(resp):
# 413 handling always returns here; will not fall through
# to the 'if not retryable' check below.
if len(spans) <= 1:
_logger.error(
"Single span exceeds backend payload size limit, dropping span"
)
return SpanExportResult.FAILURE
if time() >= deadline_sec:
_logger.error(
"Payload too large but deadline expired, dropping %d spans",
len(spans),
)
return SpanExportResult.FAILURE
mid = len(spans) // 2
_logger.warning(
"Payload too large (%d spans), splitting into two batches",
len(spans),
)
first = self._export_batch(list(spans[:mid]), deadline_sec)
if first != SpanExportResult.SUCCESS:
return SpanExportResult.FAILURE
second = self._export_batch(
list(spans[mid:]), deadline_sec
)
return second

if not retryable:
_logger.error(
"Failed to export span batch code: %s, reason: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def test_timeout_set_correctly(self, mock_post):

def export_side_effect(*args, **kwargs):
# Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed.
self.assertAlmostEqual(0.4, kwargs["timeout"], 2)
self.assertAlmostEqual(0.4, kwargs["timeout"], 1)
return resp

mock_post.side_effect = export_side_effect
Expand Down Expand Up @@ -562,3 +562,136 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):
)

assert after - before < 0.2

@patch.object(Session, "post")
def test_413_splits_batch_and_succeeds(self, mock_post):
"""When backend returns 413, the exporter should split the batch in half and retry each half."""
exporter = OTLPLogExporter(timeout=10)

resp_413 = Response()
resp_413.status_code = 413
resp_413.reason = "Request Entity Too Large"

resp_ok = Response()
resp_ok.status_code = 200

mock_post.side_effect = [resp_413, resp_ok, resp_ok]

log_data = self._get_sdk_log_data()

with self.assertLogs(level=WARNING) as warning:
result = exporter.export(log_data)

self.assertEqual(result, LogRecordExportResult.SUCCESS)
# 1 initial call (413) + 2 split calls
self.assertEqual(mock_post.call_count, 3)
self.assertTrue(
any(
"Payload too large" in record.message
for record in warning.records
)
)

@patch.object(Session, "post")
def test_413_single_log_returns_failure(self, mock_post):
"""When a single log record is too large, the exporter should return FAILURE."""
exporter = OTLPLogExporter(timeout=10)

resp_413 = Response()
resp_413.status_code = 413
resp_413.reason = "Request Entity Too Large"

mock_post.return_value = resp_413

log_data = self._get_sdk_log_data()[:1]

with self.assertLogs(level=WARNING) as warning:
result = exporter.export(log_data)

self.assertEqual(result, LogRecordExportResult.FAILURE)
self.assertEqual(mock_post.call_count, 1)
self.assertTrue(
any(
"Single log record exceeds backend payload size limit"
in record.message
for record in warning.records
)
)

@patch.object(Session, "post")
def test_413_recursive_splitting(self, mock_post):
"""When a split half still returns 413, the exporter should continue splitting recursively."""
exporter = OTLPLogExporter(timeout=10)

resp_413 = Response()
resp_413.status_code = 413
resp_413.reason = "Request Entity Too Large"

resp_ok = Response()
resp_ok.status_code = 200

log_data = self._get_sdk_log_data() # returns 3 logs

# 3 logs: first 413 → split [0],[1,2]
# [0] → ok
# [1,2] → 413 → split [1],[2] → ok, ok
mock_post.side_effect = [resp_413, resp_ok, resp_413, resp_ok, resp_ok]

with self.assertLogs(level=WARNING):
result = exporter.export(log_data)

self.assertEqual(result, LogRecordExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 5)

@patch.object(Session, "post")
def test_413_partial_failure(self, mock_post):
"""When the first half fails with a non-retryable error, the second half is not attempted (short-circuit)."""
exporter = OTLPLogExporter(timeout=10)

resp_413 = Response()
resp_413.status_code = 413
resp_413.reason = "Request Entity Too Large"

resp_400 = Response()
resp_400.status_code = 400
resp_400.reason = "Bad Request"

log_data = self._get_sdk_log_data()

# First call returns 413, first half gets 400 → short-circuit
mock_post.side_effect = [resp_413, resp_400]

with self.assertLogs(level=WARNING):
result = exporter.export(log_data)

self.assertEqual(result, LogRecordExportResult.FAILURE)
self.assertEqual(mock_post.call_count, 2)

@patch(
"opentelemetry.exporter.otlp.proto.http._log_exporter.time",
)
@patch.object(Session, "post")
def test_413_deadline_expired_returns_failure(self, mock_post, mock_time):
"""When a 413 is received but the deadline has expired, return FAILURE without splitting."""
mock_time.side_effect = [100.0, 100.0, 100.6]
exporter = OTLPLogExporter(timeout=0.5)

resp_413 = Response()
resp_413.status_code = 413
resp_413.reason = "Request Entity Too Large"

mock_post.return_value = resp_413

log_data = self._get_sdk_log_data()

with self.assertLogs(level=WARNING) as warning:
result = exporter.export(log_data)

self.assertEqual(result, LogRecordExportResult.FAILURE)
self.assertEqual(mock_post.call_count, 1)
self.assertTrue(
any(
"deadline expired" in record.message
for record in warning.records
)
)
Loading
Loading