From 441391d3405aeb5a3160192f684bc4a17e74e373 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Fri, 6 Mar 2026 16:45:51 -0600 Subject: [PATCH 1/7] Add bounds to how many response body bytes senders read --- .../current_vs_latest/opentelemetry-api.txt | 16 +------ .../opentelemetry-common.txt | 2 +- .../opentelemetry-context.txt | 2 +- .../opentelemetry-exporter-common.txt | 2 +- .../opentelemetry-exporter-logging-otlp.txt | 2 +- .../opentelemetry-exporter-logging.txt | 2 +- .../opentelemetry-exporter-otlp-common.txt | 2 +- .../opentelemetry-exporter-otlp.txt | 2 +- ...y-exporter-sender-grpc-managed-channel.txt | 2 +- .../opentelemetry-exporter-sender-jdk.txt | 2 +- .../opentelemetry-exporter-sender-okhttp.txt | 2 +- .../opentelemetry-exporter-zipkin.txt | 2 +- .../opentelemetry-extension-kotlin.txt | 2 +- ...ntelemetry-extension-trace-propagators.txt | 2 +- .../opentelemetry-opentracing-shim.txt | 2 +- .../opentelemetry-sdk-common.txt | 9 +++- ...emetry-sdk-extension-autoconfigure-spi.txt | 2 +- ...ntelemetry-sdk-extension-autoconfigure.txt | 2 +- ...ry-sdk-extension-jaeger-remote-sampler.txt | 6 ++- .../opentelemetry-sdk-logs.txt | 2 +- .../opentelemetry-sdk-metrics.txt | 42 +------------------ .../opentelemetry-sdk-testing.txt | 6 +-- .../opentelemetry-sdk-trace.txt | 6 +-- .../current_vs_latest/opentelemetry-sdk.txt | 2 +- .../internal/grpc/GrpcExporterBuilder.java | 4 +- .../grpc/ImmutableGrpcSenderConfig.java | 9 +++- .../internal/http/HttpExporterBuilder.java | 4 +- .../http/ImmutableHttpSenderConfig.java | 9 +++- .../otlp/trace/OltpExporterBenchmark.java | 3 +- .../sender/jdk/internal/JdkHttpSender.java | 25 +++++++++-- .../jdk/internal/JdkHttpSenderProvider.java | 3 +- .../jdk/internal/JdkHttpSenderTest.java | 9 ++-- .../okhttp/internal/OkHttpGrpcSender.java | 15 ++++++- .../internal/OkHttpGrpcSenderProvider.java | 3 +- .../okhttp/internal/OkHttpHttpSender.java | 17 +++++++- .../internal/OkHttpHttpSenderProvider.java | 3 +- .../okhttp/internal/OkHttpGrpcSenderTest.java | 12 ++++-- .../internal/OkHttpGrpcSuppressionTest.java | 3 +- .../internal/OkHttpHttpSuppressionTest.java | 3 +- .../sampler/JaegerRemoteSamplerBuilder.java | 15 ++++++- .../sdk/common/export/GrpcSender.java | 3 ++ .../sdk/common/export/GrpcSenderConfig.java | 10 +++++ .../sdk/common/export/HttpSender.java | 3 ++ .../sdk/common/export/HttpSenderConfig.java | 10 +++++ 44 files changed, 172 insertions(+), 112 deletions(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt index afe1c3edc71..3b94487fdf0 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt @@ -1,14 +1,2 @@ -Comparing source compatibility of opentelemetry-api-1.61.0-SNAPSHOT.jar against opentelemetry-api-1.59.0.jar -*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.logs.LogRecordBuilder (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - +++ NEW METHOD: PUBLIC(+) io.opentelemetry.api.logs.LogRecordBuilder setException(java.lang.Throwable) -*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.trace.TraceFlags (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.api.trace.TraceFlagsBuilder builder() - +++ NEW METHOD: PUBLIC(+) boolean isTraceIdRandom() -+++ NEW INTERFACE: PUBLIC(+) ABSTRACT(+) io.opentelemetry.api.trace.TraceFlagsBuilder (not serializable) - +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. - +++ NEW SUPERCLASS: java.lang.Object - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.api.trace.TraceFlags build() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.api.trace.TraceFlagsBuilder setRandomTraceId(boolean) - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.api.trace.TraceFlagsBuilder setSampled(boolean) +Comparing source compatibility of opentelemetry-api-1.61.0-SNAPSHOT.jar against opentelemetry-api-1.60.0.jar +No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-common.txt index 9358bf987f0..9eacb01e56e 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-common.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-common-1.61.0-SNAPSHOT.jar against opentelemetry-common-1.59.0.jar +Comparing source compatibility of opentelemetry-common-1.61.0-SNAPSHOT.jar against opentelemetry-common-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-context.txt b/docs/apidiffs/current_vs_latest/opentelemetry-context.txt index f0ee106af16..26cc33fee5e 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-context.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-context.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-context-1.61.0-SNAPSHOT.jar against opentelemetry-context-1.59.0.jar +Comparing source compatibility of opentelemetry-context-1.61.0-SNAPSHOT.jar against opentelemetry-context-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-common.txt index 0ee3f52d4cb..64767f99f6c 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-common.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-common-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-common-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-common-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-common-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging-otlp.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging-otlp.txt index e224f928fe8..9b4ba923034 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging-otlp.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging-otlp.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-logging-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-logging-otlp-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-logging-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-logging-otlp-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging.txt index 04807b136e2..889d805af0b 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-logging.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-logging-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-logging-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-logging-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-logging-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp-common.txt index b17225aef33..30ba3efb279 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp-common.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-otlp-common-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-common-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-otlp-common-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-common-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt index 17d78fa1550..01ed5a88dd7 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt index 865936d5b2f..15171a0d643 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-sender-grpc-managed-channel-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-grpc-managed-channel-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-sender-grpc-managed-channel-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-grpc-managed-channel-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-jdk.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-jdk.txt index 134ee5c724e..e6dfcf92873 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-jdk.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-jdk.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-sender-jdk-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-jdk-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-sender-jdk-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-jdk-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-okhttp.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-okhttp.txt index 215228018d3..4e9a85f1cef 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-okhttp.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-okhttp.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-sender-okhttp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-okhttp-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-sender-okhttp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-sender-okhttp-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-zipkin.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-zipkin.txt index 828b89ea1ef..79f51ef95d7 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-zipkin.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-zipkin.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-exporter-zipkin-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-zipkin-1.59.0.jar +Comparing source compatibility of opentelemetry-exporter-zipkin-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-zipkin-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-extension-kotlin.txt b/docs/apidiffs/current_vs_latest/opentelemetry-extension-kotlin.txt index 8c0bdb85881..24bf6e2f784 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-extension-kotlin.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-extension-kotlin.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-extension-kotlin-1.61.0-SNAPSHOT.jar against opentelemetry-extension-kotlin-1.59.0.jar +Comparing source compatibility of opentelemetry-extension-kotlin-1.61.0-SNAPSHOT.jar against opentelemetry-extension-kotlin-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-extension-trace-propagators.txt b/docs/apidiffs/current_vs_latest/opentelemetry-extension-trace-propagators.txt index e98f8a72519..aaaaa11493f 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-extension-trace-propagators.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-extension-trace-propagators.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-extension-trace-propagators-1.61.0-SNAPSHOT.jar against opentelemetry-extension-trace-propagators-1.59.0.jar +Comparing source compatibility of opentelemetry-extension-trace-propagators-1.61.0-SNAPSHOT.jar against opentelemetry-extension-trace-propagators-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-opentracing-shim.txt b/docs/apidiffs/current_vs_latest/opentelemetry-opentracing-shim.txt index af980e1d026..36c28ce4b36 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-opentracing-shim.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-opentracing-shim.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-opentracing-shim-1.61.0-SNAPSHOT.jar against opentelemetry-opentracing-shim-1.59.0.jar +Comparing source compatibility of opentelemetry-opentracing-shim-1.61.0-SNAPSHOT.jar against opentelemetry-opentracing-shim-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt index 2de8e95945c..3b9e58d7b95 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt @@ -1,2 +1,7 @@ -Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.59.0.jar -No changes. \ No newline at end of file +Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.0.jar +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure-spi.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure-spi.txt index c779aab340d..154f3a00f10 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure-spi.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure-spi.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-extension-autoconfigure-spi-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-autoconfigure-spi-1.59.0.jar +Comparing source compatibility of opentelemetry-sdk-extension-autoconfigure-spi-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-autoconfigure-spi-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure.txt index e1a50c54b04..c422e64d32a 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-autoconfigure.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-extension-autoconfigure-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-autoconfigure-1.59.0.jar +Comparing source compatibility of opentelemetry-sdk-extension-autoconfigure-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-autoconfigure-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt index 89709668946..28fbae6e5ea 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt @@ -1,2 +1,4 @@ -Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.59.0.jar -No changes. \ No newline at end of file +Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.0.jar +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt index 523cbf1118c..ad892f61e4d 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-logs-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.59.0.jar +Comparing source compatibility of opentelemetry-sdk-logs-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.60.0.jar No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt index 6b290795edf..0c73f6c8d0d 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt @@ -1,40 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-metrics-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.59.0.jar -*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.Aggregation (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - === UNCHANGED METHOD: PUBLIC STATIC io.opentelemetry.sdk.metrics.Aggregation base2ExponentialBucketHistogram(int, int) - +++ NEW ANNOTATION: java.lang.Deprecated - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.Aggregation base2ExponentialBucketHistogram(io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions) - === UNCHANGED METHOD: PUBLIC STATIC io.opentelemetry.sdk.metrics.Aggregation explicitBucketHistogram(java.util.List) - +++ NEW ANNOTATION: java.lang.Deprecated - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.Aggregation explicitBucketHistogram(io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions) -+++ NEW CLASS: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions (not serializable) - +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. - +++ NEW SUPERCLASS: java.lang.Object - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder builder() - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions getDefault() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) int getMaxBuckets() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) int getMaxScale() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) boolean getRecordMinMax() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder toBuilder() -+++ NEW CLASS: PUBLIC(+) ABSTRACT(+) STATIC(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder (not serializable) - +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. - +++ NEW SUPERCLASS: java.lang.Object - +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions build() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder setMaxBuckets(int) - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder setMaxScale(int) - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.Base2ExponentialHistogramOptions$Builder setRecordMinMax(boolean) -+++ NEW CLASS: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions (not serializable) - +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. - +++ NEW SUPERCLASS: java.lang.Object - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions$Builder builder() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.List getBucketBoundaries() - +++ NEW ANNOTATION: javax.annotation.Nullable - +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions getDefault() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) boolean getRecordMinMax() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions$Builder toBuilder() -+++ NEW CLASS: PUBLIC(+) ABSTRACT(+) STATIC(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions$Builder (not serializable) - +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. - +++ NEW SUPERCLASS: java.lang.Object - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions build() - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions$Builder setBucketBoundaries(java.util.List) - +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.ExplicitBucketHistogramOptions$Builder setRecordMinMax(boolean) +Comparing source compatibility of opentelemetry-sdk-metrics-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.60.0.jar +No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt index 1b66c0c10d6..5b9ed274e3f 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt @@ -1,4 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-testing-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-testing-1.59.0.jar -*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.testing.assertj.LogRecordDataAssert (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.assertj.LogRecordDataAssert hasException(java.lang.Throwable) +Comparing source compatibility of opentelemetry-sdk-testing-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-testing-1.60.0.jar +No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index 5f2f3ee42e6..6efcc55bd43 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,4 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-trace-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.59.0.jar -*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.trace.IdGenerator (not serializable) - === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 - +++ NEW METHOD: PUBLIC(+) boolean generatesRandomTraceIds() +Comparing source compatibility of opentelemetry-sdk-trace-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.60.0.jar +No changes. \ No newline at end of file diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk.txt index 5202901c328..f1f864eff6d 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk.txt @@ -1,2 +1,2 @@ -Comparing source compatibility of opentelemetry-sdk-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-1.59.0.jar +Comparing source compatibility of opentelemetry-sdk-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-1.60.0.jar No changes. \ No newline at end of file diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 53a3d5c7c5e..c9ddfdcf18e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -230,7 +230,9 @@ public GrpcExporter build() { isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), executorService, - grpcChannel)); + grpcChannel, + // 1kb since don't do anything with responses today + 1024L)); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); return new GrpcExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java index 66a3a2ad5f0..33a430ae920 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create( @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, @Nullable ExecutorService executorService, - @Nullable Object managedChannel) { + @Nullable Object managedChannel, + long maxResponseBodySize) { return new AutoValue_ImmutableGrpcSenderConfig( endpoint, fullMethodName, @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create( sslContext, trustManager, executorService, - managedChannel); + managedChannel, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 2e06f8caa26..f5168f68927 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -245,7 +245,9 @@ public HttpExporter build() { retryPolicy, isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), - executorService)); + executorService, + // 1kb since don't do anything with responses today + 1024L)); LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName()); return new HttpExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java index 24e5a9bb1bc..805d48e2638 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java @@ -35,7 +35,8 @@ static HttpSenderConfig create( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { return new AutoValue_ImmutableHttpSenderConfig( endpoint, contentType, @@ -47,6 +48,10 @@ static HttpSenderConfig create( retryPolicy, sslContext, trustManager, - executorService); + executorService, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 22271bc271e..5dbc8f42ba9 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -110,7 +110,8 @@ public void setUp() { null, null, null, - null), + null, + Long.MAX_VALUE), InternalTelemetryVersion.LATEST, ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), MeterProvider::noop, diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index aee4990c651..6ef88817700 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -72,6 +72,7 @@ public final class JdkHttpSender implements HttpSender { private final Supplier>> headerSupplier; @Nullable private final RetryPolicy retryPolicy; private final Predicate retryExceptionPredicate; + private final long maxResponseBodySize; // Visible for testing JdkHttpSender( @@ -82,7 +83,8 @@ public final class JdkHttpSender implements HttpSender { Duration timeout, Supplier>> headerSupplier, @Nullable RetryPolicy retryPolicy, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this.client = client; this.endpoint = endpoint; this.contentType = contentType; @@ -101,6 +103,7 @@ public final class JdkHttpSender implements HttpSender { this.executorService = executorService; this.managedExecutor = false; } + this.maxResponseBodySize = maxResponseBodySize; } JdkHttpSender( @@ -113,7 +116,8 @@ public final class JdkHttpSender implements HttpSender { @Nullable RetryPolicy retryPolicy, @Nullable ProxyOptions proxyOptions, @Nullable SSLContext sslContext, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this( configureClient(sslContext, connectTimeout, proxyOptions), endpoint, @@ -122,7 +126,8 @@ public final class JdkHttpSender implements HttpSender { timeout, headerSupplier, retryPolicy, - executorService); + executorService, + maxResponseBodySize); } private static ExecutorService newExecutor() { @@ -287,7 +292,19 @@ private static String responseStringRepresentation(HttpResponse response) { private HttpResponse sendRequest( HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { - return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray()); + return client.send( + requestBuilder.build(), + responseInfo -> + HttpResponse.BodySubscribers.mapping( + HttpResponse.BodySubscribers.ofInputStream(), + inputStream -> { + try (inputStream) { + return inputStream.readNBytes( + (int) Math.min(maxResponseBodySize, Integer.MAX_VALUE)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + })); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index 0ece6a9978d..fcd9d64e0fe 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -29,6 +29,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getProxyOptions(), httpSenderConfig.getSslContext(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index c5bddc79fb1..24f764f3857 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -66,7 +66,8 @@ void setup() throws IOException, InterruptedException { Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); } @Test @@ -121,7 +122,8 @@ void sendInternal_RetryableConnectException() throws IOException, InterruptedExc Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); assertThatThrownBy(() -> sender.sendInternal(new NoOpRequestBodyWriter())) .satisfies( @@ -177,7 +179,8 @@ void connectTimeout() { null, null, null, - null); + null, + Long.MAX_VALUE); assertThat(sender) .extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class))) diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 65a75b33ad2..9b6e25108c1 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -83,6 +83,7 @@ public final class OkHttpGrpcSender implements GrpcSender { private final HttpUrl url; @Nullable private final Compressor compressor; private final Supplier>> headersSupplier; + private final long maxResponseBodySize; /** Creates a new {@link OkHttpGrpcSender}. */ @SuppressWarnings("TooManyParameters") @@ -95,7 +96,8 @@ public OkHttpGrpcSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -133,6 +135,7 @@ public OkHttpGrpcSender( this.compressor = compressor; this.headersSupplier = headersSupplier; this.url = HttpUrl.get(endpoint); + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -169,7 +172,15 @@ public void onResponse(Call call, Response response) { // Must consume body before accessing trailers byte[] bodyBytes = null; try { - bodyBytes = getResponseMessageBytes(body.bytes()); + Buffer buffer = new Buffer(); + while (buffer.size() < maxResponseBodySize) { + long n = + body.source().read(buffer, maxResponseBodySize - buffer.size()); + if (n == -1L) { + break; + } + } + bodyBytes = getResponseMessageBytes(buffer.readByteArray()); } catch (IOException e) { bodyBytes = new byte[0]; logger.log(Level.FINE, "Failed to read response body", e); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index 09bb6eefe4f..fb4b275444f 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -31,6 +31,7 @@ public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) { grpcSenderConfig.getRetryPolicy(), grpcSenderConfig.getSslContext(), grpcSenderConfig.getTrustManager(), - grpcSenderConfig.getExecutorService()); + grpcSenderConfig.getExecutorService(), + grpcSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 16b5d90b7f4..200f1f1e514 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -39,6 +39,7 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import okio.Buffer; import okio.BufferedSink; import okio.Okio; @@ -58,6 +59,7 @@ public final class OkHttpHttpSender implements HttpSender { private final Supplier>> headerSupplier; private final MediaType mediaType; @Nullable private final Compressor compressor; + private final long maxResponseBodySize; /** Create a sender. */ @SuppressWarnings("TooManyParameters") @@ -72,7 +74,8 @@ public OkHttpHttpSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -111,6 +114,7 @@ public OkHttpHttpSender( this.mediaType = MediaType.parse(contentType); this.compressor = compressor; this.headerSupplier = headerSupplier; + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -160,7 +164,16 @@ public String getStatusMessage() { public byte[] getResponseBody() { if (bodyBytes == null) { try { - bodyBytes = body.bytes(); + Buffer buffer = new Buffer(); + while (buffer.size() < maxResponseBodySize) { + long n = + body.source() + .read(buffer, maxResponseBodySize - buffer.size()); + if (n == -1L) { + break; + } + } + bodyBytes = buffer.readByteArray(); } catch (IOException e) { bodyBytes = new byte[0]; logger.log(Level.WARNING, "Failed to read response body", e); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index 0fabe6e3bdb..581cf9bb549 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -30,6 +30,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getSslContext(), httpSenderConfig.getTrustManager(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java index 2127f92a072..a7f314c93b5 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java @@ -89,7 +89,8 @@ void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); CompletableResultCode sendResult = new CompletableResultCode(); sender.send( @@ -132,7 +133,8 @@ void shutdown_NonManagedExecutor_ReturnsImmediately() { null, null, null, - customExecutor); // Pass custom executor -> managedExecutor = false + customExecutor, // Pass custom executor -> managedExecutor = false + Long.MAX_VALUE); CompletableResultCode shutdownResult = sender.shutdown(); @@ -169,7 +171,8 @@ void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exc null, null, null, - null); // null executor = managed + null, // null executor = managed + Long.MAX_VALUE); // Start multiple requests to ensure threads are busy CountDownLatch blockCallbacks = new CountDownLatch(1); @@ -231,7 +234,8 @@ void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); // Trigger some activity sender.send(new TestMessageWriter(), response -> {}, error -> {}); diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 705f96b601d..3bc73283db7 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -39,6 +39,7 @@ OkHttpGrpcSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 95699b5aa51..c64247a9b36 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -46,6 +46,7 @@ OkHttpHttpSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index 66bdb787cdb..abec4b5992e 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -42,12 +42,14 @@ public final class JaegerRemoteSamplerBuilder { private static final Sampler INITIAL_SAMPLER = Sampler.parentBased(Sampler.traceIdRatioBased(0.001)); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); + private static final long DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024L * 1024L; private URI endpoint = DEFAULT_ENDPOINT; private Sampler initialSampler = INITIAL_SAMPLER; private int pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS; private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); private Supplier> headerSupplier = Collections::emptyMap; + private long maxResponseBodySize = DEFAULT_MAX_RESPONSE_BODY_SIZE; @Nullable private String serviceName; @@ -147,6 +149,16 @@ public JaegerRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { return this; } + /** + * Sets the maximum number of bytes to read from a sampling strategy response body. If unset, + * defaults to 1 MiB. Must be positive. + */ + public JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long bytes) { + Utils.checkArgument(bytes > 0, "maxSamplingStrategyResponseBodySize must be positive"); + this.maxResponseBodySize = bytes; + return this; + } + /** * Sets the managed channel to use when communicating with the backend. Takes precedence over * {@link #setEndpoint(String)} if both are called. @@ -195,7 +207,8 @@ private GrpcSender resolveGrpcSender() { tlsConfigHelper.getSslContext(), tlsConfigHelper.getTrustManager(), null, - grpcChannel); + grpcChannel, + maxResponseBodySize); return grpcSenderProvider.createSender(grpcSenderConfig); } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java index 500b12ad147..047d99eccec 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java @@ -27,6 +27,9 @@ public interface GrpcSender { * called when the call could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link GrpcResponse#getResponseMessage()} should contain at most + * {@link GrpcSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the message writer * @param onResponse the callback to invoke with the gRPC response * @param onError the callback to invoke when the gRPC call could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java index 53fd2eb556f..811983843c7 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java @@ -88,4 +88,14 @@ public interface GrpcSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 1 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 1024L * 1024L; + } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java index a039d4ad6d0..bf0d75524f1 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java @@ -27,6 +27,9 @@ public interface HttpSender { * called when the request could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link HttpResponse#getResponseBody()} should contain at most + * {@link HttpSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the request body message writer * @param onResponse the callback to invoke with the HTTP response * @param onError the callback to invoke when the HTTP request could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java index d67a71968a7..e3c1cb74239 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java @@ -86,4 +86,14 @@ public interface HttpSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 1 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 1024L * 1024L; + } } From 440519a5006d5d8548ece65b6410f28d81600a11 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 31 Mar 2026 13:19:20 -0500 Subject: [PATCH 2/7] Fix java 11 thread deadlock issue --- .../sender/jdk/internal/JdkHttpSender.java | 70 ++++++++----------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 6ef88817700..156dfafb9ed 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,10 +5,9 @@ package io.opentelemetry.exporter.sender.jdk.internal; -import static java.util.stream.Collectors.joining; - import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.HttpSender; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -16,19 +15,19 @@ import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -156,10 +155,8 @@ private static HttpClient configureClient( @Override public void send( - MessageWriter messageWriter, - Consumer onResponse, - Consumer onError) { - CompletableFuture> unused = + MessageWriter messageWriter, Consumer onResponse, Consumer onError) { + CompletableFuture unused = CompletableFuture.supplyAsync( () -> { try { @@ -175,12 +172,12 @@ public void send( onError.accept(throwable); return; } - onResponse.accept(toHttpResponse(httpResponse)); + onResponse.accept(httpResponse); }); } // Visible for testing - HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { + HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { long startTimeNanos = System.nanoTime(); HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(endpoint).timeout(timeout); Map> headers = headerSupplier.get(); @@ -212,7 +209,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce long attempt = 0; long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos(); - HttpResponse httpResponse = null; + HttpResponse httpResponse = null; IOException exception = null; do { if (attempt > 0) { @@ -239,7 +236,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce requestBuilder.timeout(timeout.minusNanos(System.nanoTime() - startTimeNanos)); try { httpResponse = sendRequest(requestBuilder, byteBufferPool); - boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode()); + boolean retryable = retryableStatusCodes.contains(httpResponse.getStatusCode()); if (logger.isLoggable(Level.FINER)) { logger.log( Level.FINER, @@ -278,33 +275,16 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce throw exception; } - private static String responseStringRepresentation(HttpResponse response) { - StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}"); - joiner.add("code=" + response.statusCode()); - joiner.add( - "headers=" - + response.headers().map().entrySet().stream() - .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) - .collect(joining(",", "[", "]"))); - return joiner.toString(); + private static String responseStringRepresentation(HttpResponse response) { + return "HttpResponse{code=" + response.getStatusCode() + "}"; } - private HttpResponse sendRequest( + private HttpResponse sendRequest( HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { - return client.send( - requestBuilder.build(), - responseInfo -> - HttpResponse.BodySubscribers.mapping( - HttpResponse.BodySubscribers.ofInputStream(), - inputStream -> { - try (inputStream) { - return inputStream.readNBytes( - (int) Math.min(maxResponseBodySize, Integer.MAX_VALUE)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - })); + java.net.http.HttpResponse response = + client.send(requestBuilder.build(), BodyHandlers.ofInputStream()); + return toHttpResponse(response); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); @@ -336,22 +316,30 @@ private byte[] buf() { } } - private static io.opentelemetry.sdk.common.export.HttpResponse toHttpResponse( - HttpResponse response) { - return new io.opentelemetry.sdk.common.export.HttpResponse() { + private HttpResponse toHttpResponse(java.net.http.HttpResponse response) { + int statusCode = response.statusCode(); + byte[] bodyBytes; + try (InputStream is = response.body()) { + bodyBytes = is.readNBytes((int) Math.min(maxResponseBodySize, Integer.MAX_VALUE)); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.WARNING, "Failed to read response body", e); + } + byte[] body = bodyBytes; + return new HttpResponse() { @Override public int getStatusCode() { - return response.statusCode(); + return statusCode; } @Override public String getStatusMessage() { - return String.valueOf(response.statusCode()); + return String.valueOf(statusCode); } @Override public byte[] getResponseBody() { - return response.body(); + return body; } }; } From 9746611b844f3f8f4082feccb329b0481c04007e Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 31 Mar 2026 14:25:47 -0500 Subject: [PATCH 3/7] change default body size to 4mb to reflect spec --- .../exporter/internal/grpc/GrpcExporterBuilder.java | 5 +++-- .../exporter/internal/http/HttpExporterBuilder.java | 5 +++-- .../trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java | 4 ++-- .../io/opentelemetry/sdk/common/export/GrpcSenderConfig.java | 4 ++-- .../io/opentelemetry/sdk/common/export/HttpSenderConfig.java | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index c9ddfdcf18e..867cf897c3f 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -231,8 +231,9 @@ public GrpcExporter build() { isPlainHttp ? null : tlsConfigHelper.getTrustManager(), executorService, grpcChannel, - // 1kb since don't do anything with responses today - 1024L)); + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); return new GrpcExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index f5168f68927..8937acbb474 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -246,8 +246,9 @@ public HttpExporter build() { isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), executorService, - // 1kb since don't do anything with responses today - 1024L)); + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName()); return new HttpExporter( diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index abec4b5992e..ba5429d827a 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -42,7 +42,7 @@ public final class JaegerRemoteSamplerBuilder { private static final Sampler INITIAL_SAMPLER = Sampler.parentBased(Sampler.traceIdRatioBased(0.001)); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); - private static final long DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024L * 1024L; + private static final long DEFAULT_MAX_RESPONSE_BODY_SIZE = 4 * 1024L * 1024L; private URI endpoint = DEFAULT_ENDPOINT; private Sampler initialSampler = INITIAL_SAMPLER; @@ -151,7 +151,7 @@ public JaegerRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { /** * Sets the maximum number of bytes to read from a sampling strategy response body. If unset, - * defaults to 1 MiB. Must be positive. + * defaults to 4 MiB. Must be positive. */ public JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long bytes) { Utils.checkArgument(bytes > 0, "maxSamplingStrategyResponseBodySize must be positive"); diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java index 811983843c7..35e02e09b4c 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java @@ -90,12 +90,12 @@ public interface GrpcSenderConfig { ExecutorService getExecutorService(); /** - * The maximum number of bytes to read from a response body. Defaults to 1 MiB. + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. * *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to * cause unbounded heap allocation, potentially leading to out-of-memory errors. */ default long getMaxResponseBodySize() { - return 1024L * 1024L; + return 4 * 1024L * 1024L; } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java index e3c1cb74239..83d83b9f7b0 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java @@ -88,12 +88,12 @@ public interface HttpSenderConfig { ExecutorService getExecutorService(); /** - * The maximum number of bytes to read from a response body. Defaults to 1 MiB. + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. * *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to * cause unbounded heap allocation, potentially leading to out-of-memory errors. */ default long getMaxResponseBodySize() { - return 1024L * 1024L; + return 4 * 1024L * 1024L; } } From 094bde38184199e00042863552fd7d9ef14d80c5 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Thu, 2 Apr 2026 15:31:13 -0500 Subject: [PATCH 4/7] Add tests, prevent overflow, follow anticipated guidance regarding error handling --- .../profiles/OtlpGrpcProfileExporterTest.java | 2 +- .../AbstractGrpcTelemetryExporterTest.java | 142 ++++++++++++------ .../AbstractHttpTelemetryExporterTest.java | 73 ++++++--- ...anagedChannelTelemetryExporterBuilder.java | 8 + .../testing/internal/TelemetryExporter.java | 64 ++++++++ .../sender/jdk/internal/JdkHttpSender.java | 28 +++- .../okhttp/internal/OkHttpGrpcSender.java | 116 +++++++++----- .../okhttp/internal/OkHttpHttpSender.java | 90 ++++++----- 8 files changed, 377 insertions(+), 146 deletions(-) diff --git a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java index 2ac958b47ab..3af11db2d6b 100644 --- a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java +++ b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java @@ -42,7 +42,7 @@ void usingOkHttp() throws Exception { @Override // whilst profile signal type is in development it uses a different error message @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); TelemetryExporter exporter = nonRetryingExporter(); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index 664e839314e..139baf09850 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -15,7 +16,9 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.Message; +import com.google.protobuf.Parser; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.TlsKeyPair; @@ -111,9 +114,11 @@ public abstract class AbstractGrpcTelemetryExporterTest { private static final ConcurrentLinkedQueue exportedResourceTelemetry = new ConcurrentLinkedQueue<>(); - private static final ConcurrentLinkedQueue grpcErrors = + private static final ConcurrentLinkedQueue grpcResponses = new ConcurrentLinkedQueue<>(); + private static volatile byte[] defaultResponseBytes = new byte[0]; + private static final AtomicInteger attempts = new AtomicInteger(); private static final ConcurrentLinkedQueue httpRequests = @@ -143,13 +148,7 @@ public void export( ExportLogsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceLogsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportLogsServiceResponse.parser()); } }) .addService( @@ -159,14 +158,7 @@ public void export( ExportMetricsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceMetricsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportMetricsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportMetricsServiceResponse.parser()); } }) .addService( @@ -176,14 +168,7 @@ public void export( ExportTraceServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceSpansList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportTraceServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportTraceServiceResponse.parser()); } }) .addService( @@ -193,14 +178,7 @@ public void export( ExportProfilesServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceProfilesList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportProfilesServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportProfilesServiceResponse.parser()); } }) .decompressorRegistry( @@ -239,6 +217,25 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) } }; + private static void handleExport( + StreamObserver responseObserver, Parser parser) { + GrpcServerResponse grpcResponse = grpcResponses.poll(); + if (grpcResponse != null && grpcResponse.error != null) { + responseObserver.onError(grpcResponse.error); + } else { + try { + byte[] responseBytes = + grpcResponse != null && grpcResponse.responseBytes != null + ? grpcResponse.responseBytes + : defaultResponseBytes; + responseObserver.onNext(parser.parseFrom(responseBytes)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + responseObserver.onCompleted(); + } + } + @RegisterExtension protected LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); @@ -265,6 +262,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + defaultResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -293,7 +291,7 @@ void tearDown() { @AfterEach void reset() { exportedResourceTelemetry.clear(); - grpcErrors.clear(); + grpcResponses.clear(); attempts.set(0); httpRequests.clear(); } @@ -332,6 +330,40 @@ void export() { .matches("OTel-OTLP-Exporter-Java/1\\..*")); } + @Test + // @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .isNotNull() + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); + } + @Test void multipleItems() { List telemetry = new ArrayList<>(); @@ -603,7 +635,7 @@ void doubleShutdown() { @SuppressLogger(GrpcExporter.class) void error() { GrpcStatusCode statusCode = GrpcStatusCode.INTERNAL; - addGrpcError(statusCode, null); + addGrpcResponse(statusCode, null); try (TelemetryExporter exporter = nonRetryingExporter()) { CompletableResultCode result = @@ -639,7 +671,7 @@ void error() { @Test @SuppressLogger(GrpcExporter.class) void errorWithUnknownError() { - addGrpcError(GrpcStatusCode.UNKNOWN, null); + addGrpcResponse(GrpcStatusCode.UNKNOWN, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -662,7 +694,7 @@ void errorWithUnknownError() { @Test @SuppressLogger(GrpcExporter.class) void errorWithMessage() { - addGrpcError(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); + addGrpcResponse(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -683,7 +715,7 @@ void errorWithMessage() { @Test @SuppressLogger(GrpcExporter.class) void errorWithEscapedMessage() { - addGrpcError(GrpcStatusCode.NOT_FOUND, "クマ🐻"); + addGrpcResponse(GrpcStatusCode.NOT_FOUND, "クマ🐻"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -704,7 +736,7 @@ void errorWithEscapedMessage() { @Test @SuppressLogger(GrpcExporter.class) void testExport_Unavailable() { - addGrpcError(GrpcStatusCode.UNAVAILABLE, null); + addGrpcResponse(GrpcStatusCode.UNAVAILABLE, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -726,7 +758,7 @@ void testExport_Unavailable() { @Test @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -772,7 +804,7 @@ protected void testExport_Unimplemented() { @ValueSource(ints = {1, 4, 8, 10, 11, 14, 15}) @SuppressLogger(GrpcExporter.class) void retryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -787,8 +819,8 @@ void retryableError(int code) { @Test @SuppressLogger(GrpcExporter.class) void retryableError_tooManyAttempts() { - addGrpcError(GrpcStatusCode.CANCELLED, null); - addGrpcError(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); assertThat( exporter @@ -804,7 +836,7 @@ void retryableError_tooManyAttempts() { @ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16}) @SuppressLogger(GrpcExporter.class) void nonRetryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -1273,7 +1305,27 @@ protected TelemetryExporter nonRetryingExporter() { return exporterBuilder().setEndpoint(server.httpUri().toString()).setRetryPolicy(null).build(); } - protected static void addGrpcError(GrpcStatusCode code, @Nullable String message) { - grpcErrors.add(new ArmeriaStatusException(code.getValue(), message)); + protected static void addGrpcResponse(GrpcStatusCode code, @Nullable String message) { + addGrpcResponse(code, message, null); + } + + protected static void addGrpcResponse( + GrpcStatusCode code, + @Nullable String message, + @Nullable AbstractMessageLite bodyMessage) { + grpcResponses.add( + new GrpcServerResponse( + code == GrpcStatusCode.OK ? null : new ArmeriaStatusException(code.getValue(), message), + bodyMessage == null ? null : bodyMessage.toByteArray())); + } + + private static final class GrpcServerResponse { + @Nullable final ArmeriaStatusException error; + @Nullable final byte[] responseBytes; + + GrpcServerResponse(@Nullable ArmeriaStatusException error, @Nullable byte[] responseBytes) { + this.error = error; + this.responseBytes = responseBytes; + } } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index 79927098035..ecacdd48dcb 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -13,6 +13,7 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.linecorp.armeria.common.HttpRequest; @@ -34,11 +35,8 @@ import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; -import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; -import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -116,6 +114,8 @@ public abstract class AbstractHttpTelemetryExporterTest { private static final ConcurrentLinkedQueue httpRequests = new ConcurrentLinkedQueue<>(); + private static volatile byte[] successResponseBytes = new byte[0]; + @RegisterExtension @Order(1) static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension(); @@ -135,20 +135,17 @@ protected void configure(ServerBuilder sb) { "/v1/traces", new CollectorService<>( ExportTraceServiceRequest::parseFrom, - ExportTraceServiceRequest::getResourceSpansList, - ExportTraceServiceResponse.getDefaultInstance().toByteArray())); + ExportTraceServiceRequest::getResourceSpansList)); sb.service( "/v1/metrics", new CollectorService<>( ExportMetricsServiceRequest::parseFrom, - ExportMetricsServiceRequest::getResourceMetricsList, - ExportMetricsServiceResponse.getDefaultInstance().toByteArray())); + ExportMetricsServiceRequest::getResourceMetricsList)); sb.service( "/v1/logs", new CollectorService<>( ExportLogsServiceRequest::parseFrom, - ExportLogsServiceRequest::getResourceLogsList, - ExportLogsServiceResponse.getDefaultInstance().toByteArray())); + ExportLogsServiceRequest::getResourceLogsList)); sb.http(0); sb.https(0); @@ -162,15 +159,12 @@ protected void configure(ServerBuilder sb) { private static class CollectorService implements HttpService { private final ThrowingExtractor parse; private final Function> getResourceTelemetry; - private final byte[] successResponse; private CollectorService( ThrowingExtractor parse, - Function> getResourceTelemetry, - byte[] successResponse) { + Function> getResourceTelemetry) { this.parse = parse; this.getResourceTelemetry = getResourceTelemetry; - this.successResponse = successResponse; } @Override @@ -195,7 +189,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { : HttpResponse.of( HttpStatus.OK, MediaType.parse("application/x-protobuf"), - successResponse); + successResponseBytes); }); return HttpResponse.of(responseFuture); } @@ -236,7 +230,6 @@ protected AbstractHttpTelemetryExporterTest( @BeforeAll void setUp() { - // exporter = exporterBuilder() .setEndpoint(server.httpUri() + path) @@ -248,6 +241,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + successResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -553,7 +547,7 @@ void doubleShutdown() { @SuppressLogger(HttpExporter.class) void error() { int statusCode = 500; - addHttpError(statusCode); + addHttpResponse(statusCode); CompletableResultCode result = exporter .export(Collections.singletonList(generateFakeTelemetry())) @@ -590,10 +584,41 @@ void error() { assertThat(log.getLevel()).isEqualTo(Level.WARN); } + @Test + @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addHttpResponse(200, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addHttpResponse(200, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("HTTP response body exceeded limit of")); + } + @ParameterizedTest @ValueSource(ints = {429, 502, 503, 504}) void retryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -608,8 +633,8 @@ void retryableError(int code) { @Test @SuppressLogger(HttpExporter.class) void retryableError_tooManyAttempts() { - addHttpError(502); - addHttpError(502); + addHttpResponse(502); + addHttpResponse(502); assertThat( exporter @@ -625,7 +650,7 @@ void retryableError_tooManyAttempts() { @SuppressLogger(HttpExporter.class) @ValueSource(ints = {400, 401, 403, 500, 501}) void nonRetryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -1096,7 +1121,13 @@ private List toProto(List telemetry) { .collect(Collectors.toList()); } - private static void addHttpError(int code) { + private static void addHttpResponse(int code) { httpErrors.add(HttpResponse.of(code)); } + + private static void addHttpResponse(int code, AbstractMessageLite bodyMessage) { + httpErrors.add( + HttpResponse.of( + HttpStatus.valueOf(code), MediaType.PLAIN_TEXT_UTF_8, bodyMessage.toByteArray())); + } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index cbd0168789b..ab9b4611844 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import com.google.protobuf.AbstractMessageLite; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.GrpcSslContexts; @@ -68,6 +69,8 @@ public TelemetryExporterBuilder setEndpoint(String endpoint) { // the User-Agent to be spec compliant they must manually set the user agent when building // their channel. channelBuilder.userAgent(OtlpUserAgent.getUserAgent()); + // Its user's responsibility to set the max inbound message size when building their channel. + channelBuilder.maxInboundMessageSize(4 * 1024 * 1024); return this; } @@ -230,6 +233,11 @@ public CompletableResultCode shutdown() { shutdownCallback.run(); return delegateExporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + return delegateExporter.exportResponse(minimumSize); + } }; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java index 65a37f2650f..6bb4c3132cc 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java @@ -5,8 +5,18 @@ package io.opentelemetry.exporter.otlp.testing.internal; +import com.google.common.base.Strings; +import com.google.protobuf.AbstractMessageLite; import io.opentelemetry.exporter.otlp.profiles.ProfileData; import io.opentelemetry.exporter.otlp.profiles.ProfileExporter; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesPartialSuccess; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesServiceResponse; +import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -36,6 +46,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportTraceServiceResponse.getDefaultInstance(); + } + return ExportTraceServiceResponse.newBuilder() + .setPartialSuccess( + ExportTracePartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -56,6 +79,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportMetricsServiceResponse.getDefaultInstance(); + } + return ExportMetricsServiceResponse.newBuilder() + .setPartialSuccess( + ExportMetricsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -76,6 +112,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportLogsServiceResponse.getDefaultInstance(); + } + return ExportLogsServiceResponse.newBuilder() + .setPartialSuccess( + ExportLogsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -96,6 +145,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportProfilesServiceResponse.getDefaultInstance(); + } + return ExportProfilesServiceResponse.newBuilder() + .setPartialSuccess( + ExportProfilesPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -105,6 +167,8 @@ public CompletableResultCode shutdown() { CompletableResultCode shutdown(); + AbstractMessageLite exportResponse(int minimumSize); + @Override default void close() { shutdown().join(10, TimeUnit.SECONDS); diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 156dfafb9ed..03c0cf2fa2f 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -303,7 +303,14 @@ private static boolean isRetryableException(IOException throwable) { // Known retryable HttpTimeoutException messages: "request timed out" // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed // out" - return !(throwable instanceof SSLException); + return !(throwable instanceof SSLException) + && !(throwable instanceof ResponseBodyTooLargeException); + } + + private static final class ResponseBodyTooLargeException extends IOException { + ResponseBodyTooLargeException(String message) { + super(message); + } } private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { @@ -316,15 +323,30 @@ private byte[] buf() { } } - private HttpResponse toHttpResponse(java.net.http.HttpResponse response) { + private HttpResponse toHttpResponse(java.net.http.HttpResponse response) + throws IOException { int statusCode = response.statusCode(); + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if we read more than maxResponseBodySize bytes, the body exceeded + // the limit. A body exactly at the limit will read no further (EOF is reached first). + // If maxResponseBodySize is >= Integer.MAX_VALUE, adding 1 would overflow (long) or exceed + // what readNBytes accepts (int). In that case read Integer.MAX_VALUE bytes — the overflow + // check can never trigger for such a large limit. + int readUpTo = + maxResponseBodySize >= Integer.MAX_VALUE + ? Integer.MAX_VALUE + : (int) (maxResponseBodySize + 1); byte[] bodyBytes; try (InputStream is = response.body()) { - bodyBytes = is.readNBytes((int) Math.min(maxResponseBodySize, Integer.MAX_VALUE)); + bodyBytes = is.readNBytes(readUpTo); } catch (IOException e) { bodyBytes = new byte[0]; logger.log(Level.WARNING, "Failed to read response body", e); } + if (bodyBytes.length > maxResponseBodySize) { + throw new ResponseBodyTooLargeException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes"); + } byte[] body = bodyBytes; return new HttpResponse() { @Override diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 2baab1a9598..a3410432339 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -168,48 +168,88 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - // Must consume body before accessing trailers - byte[] bodyBytes = null; - try { - Buffer buffer = new Buffer(); - while (buffer.size() < maxResponseBodySize) { - long n = - body.source().read(buffer, maxResponseBodySize - buffer.size()); - if (n == -1L) { - break; - } - } - bodyBytes = getResponseMessageBytes(buffer.readByteArray()); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.FINE, "Failed to read response body", e); - } - byte[] resolvedBodyBytes = bodyBytes; - GrpcStatusCode status = grpcStatus(response); - String description = grpcMessage(response); - onResponse.accept( - new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return status; - } - - @Override - public String getStatusDescription() { - return description; - } - - @Override - public byte[] getResponseMessage() { - return resolvedBodyBytes; - } - }); - } + handleResponse(response, onResponse); } })); } + private void handleResponse(Response response, Consumer onResponse) { + try (ResponseBody body = response.body()) { + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer buffer = new Buffer(); + try { + while (buffer.size() <= maxResponseBodySize) { + long n = body.source().read(buffer, readUpTo - buffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.FINE, "Failed to read response body", e); + } + + if (buffer.size() > maxResponseBodySize) { + onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); + return; + } + + // Must consume body before accessing trailers + byte[] bodyBytes; + try { + bodyBytes = getResponseMessageBytes(buffer.readByteArray()); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.FINE, "Failed to read response body", e); + } + byte[] resolvedBodyBytes = bodyBytes; + GrpcStatusCode status = grpcStatus(response); + String description = grpcMessage(response); + onResponse.accept( + new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return status; + } + + @Override + public String getStatusDescription() { + return description; + } + + @Override + public byte[] getResponseMessage() { + return resolvedBodyBytes; + } + }); + } + } + + private static GrpcResponse responseMessageTooLarge(long maxResponseBodySize) { + return new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return GrpcStatusCode.RESOURCE_EXHAUSTED; + } + + @Override + public String getStatusDescription() { + return "gRPC response body exceeded limit of " + maxResponseBodySize + " bytes"; + } + + @Override + public byte[] getResponseMessage() { + return new byte[0]; + } + }; + } + private static byte[] getResponseMessageBytes(byte[] bodyBytes) throws IOException { if (bodyBytes.length >= 5) { ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 200f1f1e514..9305814b1ee 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -145,48 +145,62 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - onResponse.accept( - new HttpResponse() { - @Nullable private byte[] bodyBytes; - - @Override - public int getStatusCode() { - return response.code(); - } - - @Override - public String getStatusMessage() { - return response.message(); - } - - @Override - public byte[] getResponseBody() { - if (bodyBytes == null) { - try { - Buffer buffer = new Buffer(); - while (buffer.size() < maxResponseBodySize) { - long n = - body.source() - .read(buffer, maxResponseBodySize - buffer.size()); - if (n == -1L) { - break; - } - } - bodyBytes = buffer.readByteArray(); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.WARNING, "Failed to read response body", e); - } - } - return bodyBytes; - } - }); - } + handleResponse(response, onResponse, onError); } })); } + private void handleResponse( + Response response, Consumer onResponse, Consumer onError) { + try (ResponseBody body = response.body()) { + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer buffer = new Buffer(); + try { + while (buffer.size() <= maxResponseBodySize) { + long n = body.source().read(buffer, readUpTo - buffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to read response body", e); + } + + if (buffer.size() > maxResponseBodySize) { + onError.accept( + new IOException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes")); + return; + } + + byte[] bodyBytes = buffer.readByteArray(); + onResponse.accept( + new HttpResponse() { + @Override + public int getStatusCode() { + return response.code(); + } + + @Override + public String getStatusMessage() { + return response.message(); + } + + @Override + public byte[] getResponseBody() { + return bodyBytes; + } + }); + } + } + @Override public CompletableResultCode shutdown() { client.dispatcher().cancelAll(); From 1cfb972fdd8f0734875389b7ab193543d83602b0 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Thu, 2 Apr 2026 16:20:45 -0500 Subject: [PATCH 5/7] Fix jdk java 11 bug by reading incrementally instead of via readNBytes --- .../exporter/sender/jdk/internal/JdkHttpSender.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 03c0cf2fa2f..e4c87516ce5 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -330,15 +330,22 @@ private HttpResponse toHttpResponse(java.net.http.HttpResponse resp // lets us detect overflow: if we read more than maxResponseBodySize bytes, the body exceeded // the limit. A body exactly at the limit will read no further (EOF is reached first). // If maxResponseBodySize is >= Integer.MAX_VALUE, adding 1 would overflow (long) or exceed - // what readNBytes accepts (int). In that case read Integer.MAX_VALUE bytes — the overflow - // check can never trigger for such a large limit. + // what an int can hold. In that case use Integer.MAX_VALUE — the overflow check can never + // trigger for such a large limit. int readUpTo = maxResponseBodySize >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) (maxResponseBodySize + 1); byte[] bodyBytes; try (InputStream is = response.body()) { - bodyBytes = is.readNBytes(readUpTo); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buf = new byte[4 * 0x400]; // 4KB + int n; + while (baos.size() < readUpTo + && (n = is.read(buf, 0, Math.min(buf.length, readUpTo - baos.size()))) != -1) { + baos.write(buf, 0, n); + } + bodyBytes = baos.toByteArray(); } catch (IOException e) { bodyBytes = new byte[0]; logger.log(Level.WARNING, "Failed to read response body", e); From 3aebc41bd4cfac3409f0a13d9496c4e16a0716bb Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Fri, 3 Apr 2026 09:19:56 -0500 Subject: [PATCH 6/7] Enforce response body size limit on decompressed bytes across all OTLP senders --- .../AbstractGrpcTelemetryExporterTest.java | 97 +++++++++++++++++++ .../AbstractHttpTelemetryExporterTest.java | 63 ++++++++++++ .../sender/jdk/internal/JdkHttpSender.java | 26 ++++- .../okhttp/internal/OkHttpGrpcSender.java | 84 ++++++++++------ .../okhttp/internal/OkHttpHttpSender.java | 15 ++- 5 files changed, 255 insertions(+), 30 deletions(-) diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index 139baf09850..f5292973203 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -19,8 +19,12 @@ import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.Message; import com.google.protobuf.Parser; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.TlsKeyPair; import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException; import com.linecorp.armeria.server.ServerBuilder; @@ -83,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -124,6 +129,8 @@ public abstract class AbstractGrpcTelemetryExporterTest { private static final ConcurrentLinkedQueue httpRequests = new ConcurrentLinkedQueue<>(); + private static final AtomicInteger grpcEncodingServerAttempts = new AtomicInteger(); + @RegisterExtension @Order(1) static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension(); @@ -236,6 +243,33 @@ private static void handleExport( } } + // A minimal server that returns grpc-encoding: brotli to test unsupported encoding handling. + // Both OkHttpGrpcSender (our code) and UpstreamGrpcSender (grpc-java framework) reject unknown + // encodings and fail the export with INTERNAL status. + @RegisterExtension + @Order(4) + static final ServerExtension grpcEncodingServer = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + "prefix:/", + (ctx, req) -> { + grpcEncodingServerAttempts.incrementAndGet(); + // gRPC frame: compressed flag=1, declared length=0 (no payload bytes follow) + byte[] frame = {1, 0, 0, 0, 0}; + return HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.parse("application/grpc+proto")) + .add("grpc-encoding", "brotli") + .add("grpc-status", "0") + .build(), + HttpData.wrap(frame)); + }); + sb.http(0); + } + }; + @RegisterExtension protected LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); @@ -294,6 +328,7 @@ void reset() { grpcResponses.clear(); attempts.set(0); httpRequests.clear(); + grpcEncodingServerAttempts.set(0); } @Test @@ -364,6 +399,68 @@ void responseBodyBounds() { .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); } + @Test + @SuppressLogger(GrpcExporter.class) + void responseBodyBoundsGzipCompressed() throws IOException { + // Verify the body size limit is applied to the decompressed bytes, not the compressed wire + // bytes. The server compresses the response with gzip (because the sender advertises + // grpc-accept-encoding: gzip), so the wire bytes are well under 4MB while the decompressed + // bytes exceed it. + AbstractMessageLite responseMsg = exporter.exportResponse(4 * 1024 * 1024 + 1); + byte[] responseBytes = responseMsg.toByteArray(); + // Sanity: confirm payload is over the limit uncompressed but compresses well under the limit + assertThat(responseBytes).hasSizeGreaterThan(4 * 1024 * 1024); + ByteArrayOutputStream compressedBaos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBaos)) { + gzip.write(responseBytes); + } + assertThat(compressedBaos.toByteArray()).hasSizeLessThan(4 * 1024 * 1024); + + addGrpcResponse(GrpcStatusCode.OK, null, responseMsg); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); + } + + // Verifies that a response with an unrecognized grpc-encoding fails with INTERNAL rather than + // attempting to decompress with an unsupported algorithm. + @Test + @SuppressLogger(GrpcExporter.class) + void unsupportedGrpcMessageEncoding() { + try (TelemetryExporter testExporter = + exporterBuilder() + .setEndpoint(grpcEncodingServer.httpUri().toString()) + .setRetryPolicy(null) + .build()) { + CompletableResultCode result = + testExporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(grpcEncodingServerAttempts).hasValue(1); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.INTERNAL))); + } + } + @Test void multipleItems() { List telemetry = new ArrayList<>(); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index ecacdd48dcb..3adcd0c8901 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -16,11 +16,13 @@ import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.TlsKeyPair; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; @@ -70,6 +72,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -615,6 +618,66 @@ void responseBodyBounds() { .hasMessageContaining("HTTP response body exceeded limit of")); } + @Test + @SuppressLogger(HttpExporter.class) + void responseBodyBoundsGzipCompressed() throws IOException { + // Verify the body size limit is applied to the decompressed bytes, not the compressed wire + // bytes. The compressed body is well below the 4MB limit but decompresses to just over it. + byte[] payload = new byte[4 * 1024 * 1024 + 1]; + ByteArrayOutputStream compressedBaos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBaos)) { + gzip.write(payload); + } + byte[] compressedBody = compressedBaos.toByteArray(); + + httpErrors.add( + HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.PLAIN_TEXT_UTF_8) + .add("Content-Encoding", "gzip") + .build(), + HttpData.wrap(compressedBody))); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("HTTP response body exceeded limit of")); + } + + @Test + @SuppressLogger(HttpExporter.class) + void unsupportedResponseContentEncoding() { + // Server responds with a Content-Encoding that we don't support. The export should fail + // without retry. + httpErrors.add( + HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.PLAIN_TEXT_UTF_8) + .add("Content-Encoding", "brotli") + .build(), + HttpData.wrap(new byte[10]))); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(attempts).hasValue(1); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("Unsupported Content-Encoding")); + } + @ParameterizedTest @ValueSource(ints = {429, 502, 503, 504}) void retryableError(int code) { diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index e4c87516ce5..4376a381f5a 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -40,6 +40,7 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.zip.GZIPInputStream; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -185,6 +186,8 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { headers.forEach((key, values) -> values.forEach(value -> requestBuilder.header(key, value))); } requestBuilder.header("Content-Type", contentType); + // Advertise gzip and identity response encoding support. + requestBuilder.header("Accept-Encoding", "gzip, identity"); NoCopyByteArrayOutputStream os = threadLocalBaos.get(); os.reset(); @@ -303,8 +306,11 @@ private static boolean isRetryableException(IOException throwable) { // Known retryable HttpTimeoutException messages: "request timed out" // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed // out" + // ResponseBodyTooLargeException and UnsupportedContentEncodingException are permanent errors: + // a larger body or unsupported encoding will not resolve on retry. return !(throwable instanceof SSLException) - && !(throwable instanceof ResponseBodyTooLargeException); + && !(throwable instanceof ResponseBodyTooLargeException) + && !(throwable instanceof UnsupportedContentEncodingException); } private static final class ResponseBodyTooLargeException extends IOException { @@ -313,6 +319,12 @@ private static final class ResponseBodyTooLargeException extends IOException { } } + private static final class UnsupportedContentEncodingException extends IOException { + UnsupportedContentEncodingException(String message) { + super(message); + } + } + private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { NoCopyByteArrayOutputStream() { super(retryableStatusCodes.size()); @@ -336,8 +348,18 @@ private HttpResponse toHttpResponse(java.net.http.HttpResponse resp maxResponseBodySize >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) (maxResponseBodySize + 1); + + String contentEncoding = response.headers().firstValue("Content-Encoding").orElse(null); + if (contentEncoding != null && !"gzip".equalsIgnoreCase(contentEncoding)) { + throw new UnsupportedContentEncodingException( + "Unsupported Content-Encoding: " + contentEncoding); + } + boolean decompress = "gzip".equalsIgnoreCase(contentEncoding); + byte[] bodyBytes; - try (InputStream is = response.body()) { + try (InputStream rawIs = response.body()) { + // The limit is applied to the decompressed bytes. + InputStream is = decompress ? new GZIPInputStream(rawIs) : rawIs; ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buf = new byte[4 * 0x400]; // 4KB int n; diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index a3410432339..87efbdafed4 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -32,7 +32,6 @@ import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.RetryPolicy; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -63,7 +62,6 @@ import okhttp3.ResponseBody; import okio.Buffer; import okio.GzipSource; -import okio.Okio; /** * A {@link GrpcSender} which uses OkHttp instead of grpc-java. @@ -149,6 +147,8 @@ public void send( (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); } requestBuilder.addHeader("te", "trailers"); + // Mirror grpc-java's default DecompressorRegistry which advertises identity and gzip. + requestBuilder.addHeader("grpc-accept-encoding", "identity,gzip"); if (compressor != null) { requestBuilder.addHeader("grpc-encoding", compressor.getEncoding()); } @@ -183,10 +183,10 @@ private void handleResponse(Response response, Consumer onResponse // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. long readUpTo = maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; - Buffer buffer = new Buffer(); + Buffer wireBuffer = new Buffer(); try { - while (buffer.size() <= maxResponseBodySize) { - long n = body.source().read(buffer, readUpTo - buffer.size()); + while (wireBuffer.size() <= maxResponseBodySize) { + long n = body.source().read(wireBuffer, readUpTo - wireBuffer.size()); if (n == -1L) { break; } @@ -195,18 +195,47 @@ private void handleResponse(Response response, Consumer onResponse logger.log(Level.FINE, "Failed to read response body", e); } - if (buffer.size() > maxResponseBodySize) { + if (wireBuffer.size() > maxResponseBodySize) { onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); return; } // Must consume body before accessing trailers - byte[] bodyBytes; - try { - bodyBytes = getResponseMessageBytes(buffer.readByteArray()); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.FINE, "Failed to read response body", e); + byte[] bodyBytes = new byte[0]; + byte[] wireBytes = wireBuffer.readByteArray(); + if (wireBytes.length >= 5) { + if (wireBytes[0] == 0) { + // Not compressed: slice the message payload from the gRPC frame + bodyBytes = Arrays.copyOfRange(wireBytes, 5, wireBytes.length); + } else { + // Compressed: validate the encoding and decompress with a post-decompression size limit + String encoding = response.header("grpc-encoding"); + if (!"gzip".equalsIgnoreCase(encoding)) { + onResponse.accept(responseUnsupportedGrpcEncoding(encoding)); + return; + } + try { + Buffer compressedBuffer = new Buffer(); + compressedBuffer.write(wireBytes, 5, wireBytes.length - 5); + GzipSource gzipSource = new GzipSource(compressedBuffer); + Buffer decompressedBuffer = new Buffer(); + while (decompressedBuffer.size() <= maxResponseBodySize) { + long n = gzipSource.read(decompressedBuffer, readUpTo - decompressedBuffer.size()); + if (n == -1L) { + break; + } + } + if (decompressedBuffer.size() > maxResponseBodySize) { + onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); + return; + } + bodyBytes = decompressedBuffer.readByteArray(); + } catch (IOException e) { + logger.log(Level.FINE, "Failed to decompress response body", e); + } + } + } else { + logger.log(Level.FINE, "Invalid gRPC response frame"); } byte[] resolvedBodyBytes = bodyBytes; GrpcStatusCode status = grpcStatus(response); @@ -250,22 +279,23 @@ public byte[] getResponseMessage() { }; } - private static byte[] getResponseMessageBytes(byte[] bodyBytes) throws IOException { - if (bodyBytes.length >= 5) { - ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); - bodyStream.skip(5); - if (bodyBytes[0] == 1) { - Buffer buffer = new Buffer(); - buffer.readFrom(bodyStream); - GzipSource gzipSource = new GzipSource(buffer); - bodyBytes = Okio.buffer(gzipSource).readByteArray(); - } else { - bodyBytes = Arrays.copyOfRange(bodyBytes, 5, bodyBytes.length); + private static GrpcResponse responseUnsupportedGrpcEncoding(@Nullable String encoding) { + return new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return GrpcStatusCode.INTERNAL; } - return bodyBytes; - } else { - throw new IOException("Invalid response"); - } + + @Override + public String getStatusDescription() { + return "Unsupported gRPC message encoding: " + encoding; + } + + @Override + public byte[] getResponseMessage() { + return new byte[0]; + } + }; } private static GrpcStatusCode grpcStatus(Response response) { diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 9305814b1ee..d413f2ed226 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -41,7 +41,9 @@ import okhttp3.ResponseBody; import okio.Buffer; import okio.BufferedSink; +import okio.GzipSource; import okio.Okio; +import okio.Source; /** * {@link HttpSender} which is backed by OkHttp. @@ -130,6 +132,10 @@ public void send( if (compressor != null) { requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); } + // Explicitly advertise gzip and identity encoding support. Because we set Accept-Encoding + // ourselves, OkHttp's BridgeInterceptor will not transparently decompress gzip responses + // (it only does so when it added the header), so we handle decompression ourselves below. + requestBuilder.addHeader("Accept-Encoding", "gzip, identity"); requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType)); InstrumentationUtil.suppressInstrumentation( @@ -153,6 +159,12 @@ public void onResponse(Call call, Response response) { private void handleResponse( Response response, Consumer onResponse, Consumer onError) { try (ResponseBody body = response.body()) { + String contentEncoding = response.header("Content-Encoding"); + if (contentEncoding != null && !"gzip".equalsIgnoreCase(contentEncoding)) { + onError.accept(new IOException("Unsupported Content-Encoding: " + contentEncoding)); + return; + } + boolean decompress = "gzip".equalsIgnoreCase(contentEncoding); // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body // exceeded the limit. A body exactly at the limit will only fill the buffer to @@ -163,8 +175,9 @@ private void handleResponse( maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; Buffer buffer = new Buffer(); try { + Source source = decompress ? new GzipSource(body.source()) : body.source(); while (buffer.size() <= maxResponseBodySize) { - long n = body.source().read(buffer, readUpTo - buffer.size()); + long n = source.read(buffer, readUpTo - buffer.size()); if (n == -1L) { break; } From 95ba71fcac8f68723ea8ae3588f41695c1d635d7 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Fri, 3 Apr 2026 09:53:30 -0500 Subject: [PATCH 7/7] Move away from anonymous inner classes by giving each sender an autovalue GrpcResponse, HttpResponse --- .../internal/grpc/GrpcExporterTest.java | 26 ++++++++- .../grpc-managed-channel/build.gradle.kts | 2 + .../internal}/ImmutableGrpcResponse.java | 16 ++--- .../internal/UpstreamGrpcSender.java | 1 - exporters/sender/jdk/build.gradle.kts | 2 + .../jdk/internal/ImmutableHttpResponse.java | 21 +++++++ .../sender/jdk/internal/JdkHttpSender.java | 18 +----- exporters/sender/okhttp/build.gradle.kts | 2 + .../internal/ImmutableGrpcResponse.java | 24 ++++++++ .../internal/ImmutableHttpResponse.java | 21 +++++++ .../okhttp/internal/OkHttpGrpcSender.java | 58 +++---------------- .../okhttp/internal/OkHttpHttpSender.java | 17 +----- 12 files changed, 109 insertions(+), 99 deletions(-) rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/grpc => sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal}/ImmutableGrpcResponse.java (56%) create mode 100644 exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java create mode 100644 exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java create mode 100644 exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java index 0dcfdfbc0c9..02bf091c0d2 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -28,6 +28,7 @@ import java.net.URI; import java.time.Duration; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -120,8 +121,7 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { pa.hasAttributes(expectedAttributes) .hasValue(42)))); - onResponse.accept( - ImmutableGrpcResponse.create(GrpcStatusCode.OK, null, new byte[0])); + onResponse.accept(grpcResponse(GrpcStatusCode.OK)); return null; }) @@ -133,7 +133,7 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { doAnswer( invoc -> { Consumer onResponse = invoc.getArgument(1); - onResponse.accept(ImmutableGrpcResponse.create(UNAVAILABLE, null, new byte[0])); + onResponse.accept(grpcResponse(UNAVAILABLE)); return null; }) @@ -224,4 +224,24 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { .hasBucketCounts(1)))); } } + + private static GrpcResponse grpcResponse(GrpcStatusCode statusCode) { + return new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return statusCode; + } + + @Override + @Nullable + public String getStatusDescription() { + return null; + } + + @Override + public byte[] getResponseMessage() { + return new byte[0]; + } + }; + } } diff --git a/exporters/sender/grpc-managed-channel/build.gradle.kts b/exporters/sender/grpc-managed-channel/build.gradle.kts index d6ceea653c8..9ee493fb7a1 100644 --- a/exporters/sender/grpc-managed-channel/build.gradle.kts +++ b/exporters/sender/grpc-managed-channel/build.gradle.kts @@ -9,6 +9,8 @@ description = "OpenTelemetry gRPC Upstream Sender" otelJava.moduleName.set("io.opentelemetry.exporter.sender.grpc.managedchannel.internal") dependencies { + annotationProcessor("com.google.auto.value:auto-value") + implementation(project(":exporters:common")) implementation(project(":sdk:common")) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java similarity index 56% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java rename to exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java index 1fe4c1c62f6..6bd7315f87c 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java @@ -3,25 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.grpc; +package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; import com.google.auto.value.AutoValue; import io.opentelemetry.sdk.common.export.GrpcResponse; import io.opentelemetry.sdk.common.export.GrpcStatusCode; import javax.annotation.Nullable; -/** - * Auto value implementation of {@link GrpcResponse}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ @AutoValue -public abstract class ImmutableGrpcResponse implements GrpcResponse { +abstract class ImmutableGrpcResponse implements GrpcResponse { - public static ImmutableGrpcResponse create( - GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseBody) { - return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseBody); + static ImmutableGrpcResponse create( + GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseMessage) { + return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseMessage); } @SuppressWarnings("mutable") diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java index 0ccf7f4d00e..3a755be609f 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -21,7 +21,6 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.internal.grpc.ImmutableGrpcResponse; import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.GrpcResponse; diff --git a/exporters/sender/jdk/build.gradle.kts b/exporters/sender/jdk/build.gradle.kts index 7f848b7708b..2bff4e6b2cb 100644 --- a/exporters/sender/jdk/build.gradle.kts +++ b/exporters/sender/jdk/build.gradle.kts @@ -10,6 +10,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal") otelJava.minJavaVersionSupported.set(JavaVersion.VERSION_11) dependencies { + annotationProcessor("com.google.auto.value:auto-value") + implementation(project(":exporters:common")) implementation(project(":sdk:common")) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java new file mode 100644 index 00000000000..4af860c696b --- /dev/null +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.jdk.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.HttpResponse; + +@AutoValue +abstract class ImmutableHttpResponse implements HttpResponse { + + static ImmutableHttpResponse create(int statusCode, String statusMessage, byte[] responseBody) { + return new AutoValue_ImmutableHttpResponse(statusCode, statusMessage, responseBody); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseBody(); +} diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 4376a381f5a..2ffb296de9c 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -376,23 +376,7 @@ private HttpResponse toHttpResponse(java.net.http.HttpResponse resp throw new ResponseBodyTooLargeException( "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes"); } - byte[] body = bodyBytes; - return new HttpResponse() { - @Override - public int getStatusCode() { - return statusCode; - } - - @Override - public String getStatusMessage() { - return String.valueOf(statusCode); - } - - @Override - public byte[] getResponseBody() { - return body; - } - }; + return ImmutableHttpResponse.create(statusCode, String.valueOf(statusCode), bodyBytes); } private static class ByteBufferPool { diff --git a/exporters/sender/okhttp/build.gradle.kts b/exporters/sender/okhttp/build.gradle.kts index 107270e9ada..a004fea9034 100644 --- a/exporters/sender/okhttp/build.gradle.kts +++ b/exporters/sender/okhttp/build.gradle.kts @@ -12,6 +12,8 @@ dependencies { implementation(project(":exporters:common")) implementation(project(":sdk:common")) + annotationProcessor("com.google.auto.value:auto-value") + implementation("com.squareup.okhttp3:okhttp") compileOnly("io.grpc:grpc-stub") diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java new file mode 100644 index 00000000000..cb229c2a6a2 --- /dev/null +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.GrpcResponse; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; +import javax.annotation.Nullable; + +@AutoValue +abstract class ImmutableGrpcResponse implements GrpcResponse { + + static ImmutableGrpcResponse create( + GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseMessage) { + return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseMessage); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseMessage(); +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java new file mode 100644 index 00000000000..80fafddd833 --- /dev/null +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.HttpResponse; + +@AutoValue +abstract class ImmutableHttpResponse implements HttpResponse { + + static ImmutableHttpResponse create(int statusCode, String statusMessage, byte[] responseBody) { + return new AutoValue_ImmutableHttpResponse(statusCode, statusMessage, responseBody); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseBody(); +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 87efbdafed4..90dc3c698c5 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -237,65 +237,21 @@ private void handleResponse(Response response, Consumer onResponse } else { logger.log(Level.FINE, "Invalid gRPC response frame"); } - byte[] resolvedBodyBytes = bodyBytes; - GrpcStatusCode status = grpcStatus(response); - String description = grpcMessage(response); onResponse.accept( - new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return status; - } - - @Override - public String getStatusDescription() { - return description; - } - - @Override - public byte[] getResponseMessage() { - return resolvedBodyBytes; - } - }); + ImmutableGrpcResponse.create(grpcStatus(response), grpcMessage(response), bodyBytes)); } } private static GrpcResponse responseMessageTooLarge(long maxResponseBodySize) { - return new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return GrpcStatusCode.RESOURCE_EXHAUSTED; - } - - @Override - public String getStatusDescription() { - return "gRPC response body exceeded limit of " + maxResponseBodySize + " bytes"; - } - - @Override - public byte[] getResponseMessage() { - return new byte[0]; - } - }; + return ImmutableGrpcResponse.create( + GrpcStatusCode.RESOURCE_EXHAUSTED, + "gRPC response body exceeded limit of " + maxResponseBodySize + " bytes", + new byte[0]); } private static GrpcResponse responseUnsupportedGrpcEncoding(@Nullable String encoding) { - return new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return GrpcStatusCode.INTERNAL; - } - - @Override - public String getStatusDescription() { - return "Unsupported gRPC message encoding: " + encoding; - } - - @Override - public byte[] getResponseMessage() { - return new byte[0]; - } - }; + return ImmutableGrpcResponse.create( + GrpcStatusCode.INTERNAL, "Unsupported gRPC message encoding: " + encoding, new byte[0]); } private static GrpcStatusCode grpcStatus(Response response) { diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index d413f2ed226..49c7fc10821 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -195,22 +195,7 @@ private void handleResponse( byte[] bodyBytes = buffer.readByteArray(); onResponse.accept( - new HttpResponse() { - @Override - public int getStatusCode() { - return response.code(); - } - - @Override - public String getStatusMessage() { - return response.message(); - } - - @Override - public byte[] getResponseBody() { - return bodyBytes; - } - }); + ImmutableHttpResponse.create(response.code(), response.message(), bodyBytes)); } }