diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt index 6f0e244ea08..11f2ba69c13 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt @@ -1,2 +1,7 @@ Comparing source compatibility of opentelemetry-sdk-common-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.60.1.jar -No changes. \ No newline at end of file +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.GrpcSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.HttpSenderConfig (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) long getMaxResponseBodySize() diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt index f0ba1adf8a7..c641fffcf40 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-extension-jaeger-remote-sampler.txt @@ -1,2 +1,4 @@ Comparing source compatibility of opentelemetry-sdk-extension-jaeger-remote-sampler-1.61.0-SNAPSHOT.jar against opentelemetry-sdk-extension-jaeger-remote-sampler-1.60.1.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 53a3d5c7c5e..867cf897c3f 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -230,7 +230,10 @@ public GrpcExporter build() { isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), executorService, - grpcChannel)); + grpcChannel, + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); return new GrpcExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java index 66a3a2ad5f0..33a430ae920 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcSenderConfig.java @@ -37,7 +37,8 @@ public static ImmutableGrpcSenderConfig create( @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, @Nullable ExecutorService executorService, - @Nullable Object managedChannel) { + @Nullable Object managedChannel, + long maxResponseBodySize) { return new AutoValue_ImmutableGrpcSenderConfig( endpoint, fullMethodName, @@ -49,6 +50,10 @@ public static ImmutableGrpcSenderConfig create( sslContext, trustManager, executorService, - managedChannel); + managedChannel, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 2e06f8caa26..8937acbb474 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -245,7 +245,10 @@ public HttpExporter build() { retryPolicy, isPlainHttp ? null : tlsConfigHelper.getSslContext(), isPlainHttp ? null : tlsConfigHelper.getTrustManager(), - executorService)); + executorService, + // 4mb to align with spec guidance - even though we don't do anything with the + // response today, we will so better to have future-looking memory profile + 4 * 1024L * 1024L)); LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName()); return new HttpExporter( diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java index 24e5a9bb1bc..805d48e2638 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/ImmutableHttpSenderConfig.java @@ -35,7 +35,8 @@ static HttpSenderConfig create( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { return new AutoValue_ImmutableHttpSenderConfig( endpoint, contentType, @@ -47,6 +48,10 @@ static HttpSenderConfig create( retryPolicy, sslContext, trustManager, - executorService); + executorService, + maxResponseBodySize); } + + @Override + public abstract long getMaxResponseBodySize(); } diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java index 0dcfdfbc0c9..02bf091c0d2 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -28,6 +28,7 @@ import java.net.URI; import java.time.Duration; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -120,8 +121,7 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { pa.hasAttributes(expectedAttributes) .hasValue(42)))); - onResponse.accept( - ImmutableGrpcResponse.create(GrpcStatusCode.OK, null, new byte[0])); + onResponse.accept(grpcResponse(GrpcStatusCode.OK)); return null; }) @@ -133,7 +133,7 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { doAnswer( invoc -> { Consumer onResponse = invoc.getArgument(1); - onResponse.accept(ImmutableGrpcResponse.create(UNAVAILABLE, null, new byte[0])); + onResponse.accept(grpcResponse(UNAVAILABLE)); return null; }) @@ -224,4 +224,24 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { .hasBucketCounts(1)))); } } + + private static GrpcResponse grpcResponse(GrpcStatusCode statusCode) { + return new GrpcResponse() { + @Override + public GrpcStatusCode getStatusCode() { + return statusCode; + } + + @Override + @Nullable + public String getStatusDescription() { + return null; + } + + @Override + public byte[] getResponseMessage() { + return new byte[0]; + } + }; + } } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 22271bc271e..5dbc8f42ba9 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -110,7 +110,8 @@ public void setUp() { null, null, null, - null), + null, + Long.MAX_VALUE), InternalTelemetryVersion.LATEST, ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), MeterProvider::noop, diff --git a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java index 2ac958b47ab..3af11db2d6b 100644 --- a/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java +++ b/exporters/otlp/profiles/src/test/java/io/opentelemetry/exporter/otlp/profiles/OtlpGrpcProfileExporterTest.java @@ -42,7 +42,7 @@ void usingOkHttp() throws Exception { @Override // whilst profile signal type is in development it uses a different error message @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); TelemetryExporter exporter = nonRetryingExporter(); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index 664e839314e..f5292973203 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -15,9 +16,15 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.TlsKeyPair; import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException; import com.linecorp.armeria.server.ServerBuilder; @@ -80,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -111,14 +119,18 @@ public abstract class AbstractGrpcTelemetryExporterTest { private static final ConcurrentLinkedQueue exportedResourceTelemetry = new ConcurrentLinkedQueue<>(); - private static final ConcurrentLinkedQueue grpcErrors = + private static final ConcurrentLinkedQueue grpcResponses = new ConcurrentLinkedQueue<>(); + private static volatile byte[] defaultResponseBytes = new byte[0]; + private static final AtomicInteger attempts = new AtomicInteger(); private static final ConcurrentLinkedQueue httpRequests = new ConcurrentLinkedQueue<>(); + private static final AtomicInteger grpcEncodingServerAttempts = new AtomicInteger(); + @RegisterExtension @Order(1) static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension(); @@ -143,13 +155,7 @@ public void export( ExportLogsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceLogsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportLogsServiceResponse.parser()); } }) .addService( @@ -159,14 +165,7 @@ public void export( ExportMetricsServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceMetricsList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportMetricsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportMetricsServiceResponse.parser()); } }) .addService( @@ -176,14 +175,7 @@ public void export( ExportTraceServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceSpansList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportTraceServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportTraceServiceResponse.parser()); } }) .addService( @@ -193,14 +185,7 @@ public void export( ExportProfilesServiceRequest request, StreamObserver responseObserver) { exportedResourceTelemetry.addAll(request.getResourceProfilesList()); - ArmeriaStatusException grpcError = grpcErrors.poll(); - if (grpcError != null) { - responseObserver.onError(grpcError); - } else { - responseObserver.onNext( - ExportProfilesServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } + handleExport(responseObserver, ExportProfilesServiceResponse.parser()); } }) .decompressorRegistry( @@ -239,6 +224,52 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) } }; + private static void handleExport( + StreamObserver responseObserver, Parser parser) { + GrpcServerResponse grpcResponse = grpcResponses.poll(); + if (grpcResponse != null && grpcResponse.error != null) { + responseObserver.onError(grpcResponse.error); + } else { + try { + byte[] responseBytes = + grpcResponse != null && grpcResponse.responseBytes != null + ? grpcResponse.responseBytes + : defaultResponseBytes; + responseObserver.onNext(parser.parseFrom(responseBytes)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + responseObserver.onCompleted(); + } + } + + // A minimal server that returns grpc-encoding: brotli to test unsupported encoding handling. + // Both OkHttpGrpcSender (our code) and UpstreamGrpcSender (grpc-java framework) reject unknown + // encodings and fail the export with INTERNAL status. + @RegisterExtension + @Order(4) + static final ServerExtension grpcEncodingServer = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + "prefix:/", + (ctx, req) -> { + grpcEncodingServerAttempts.incrementAndGet(); + // gRPC frame: compressed flag=1, declared length=0 (no payload bytes follow) + byte[] frame = {1, 0, 0, 0, 0}; + return HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.parse("application/grpc+proto")) + .add("grpc-encoding", "brotli") + .add("grpc-status", "0") + .build(), + HttpData.wrap(frame)); + }); + sb.http(0); + } + }; + @RegisterExtension protected LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); @@ -265,6 +296,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + defaultResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -293,9 +325,10 @@ void tearDown() { @AfterEach void reset() { exportedResourceTelemetry.clear(); - grpcErrors.clear(); + grpcResponses.clear(); attempts.set(0); httpRequests.clear(); + grpcEncodingServerAttempts.set(0); } @Test @@ -332,6 +365,102 @@ void export() { .matches("OTel-OTLP-Exporter-Java/1\\..*")); } + @Test + // @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addGrpcResponse(GrpcStatusCode.OK, null, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .isNotNull() + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); + } + + @Test + @SuppressLogger(GrpcExporter.class) + void responseBodyBoundsGzipCompressed() throws IOException { + // Verify the body size limit is applied to the decompressed bytes, not the compressed wire + // bytes. The server compresses the response with gzip (because the sender advertises + // grpc-accept-encoding: gzip), so the wire bytes are well under 4MB while the decompressed + // bytes exceed it. + AbstractMessageLite responseMsg = exporter.exportResponse(4 * 1024 * 1024 + 1); + byte[] responseBytes = responseMsg.toByteArray(); + // Sanity: confirm payload is over the limit uncompressed but compresses well under the limit + assertThat(responseBytes).hasSizeGreaterThan(4 * 1024 * 1024); + ByteArrayOutputStream compressedBaos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBaos)) { + gzip.write(responseBytes); + } + assertThat(compressedBaos.toByteArray()).hasSizeLessThan(4 * 1024 * 1024); + + addGrpcResponse(GrpcStatusCode.OK, null, responseMsg); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.RESOURCE_EXHAUSTED))); + } + + // Verifies that a response with an unrecognized grpc-encoding fails with INTERNAL rather than + // attempting to decompress with an unsupported algorithm. + @Test + @SuppressLogger(GrpcExporter.class) + void unsupportedGrpcMessageEncoding() { + try (TelemetryExporter testExporter = + exporterBuilder() + .setEndpoint(grpcEncodingServer.httpUri().toString()) + .setRetryPolicy(null) + .build()) { + CompletableResultCode result = + testExporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(grpcEncodingServerAttempts).hasValue(1); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.GrpcExportException.class, + ex -> + assertThat(requireNonNull(ex.getResponse())) + .satisfies( + grpcResponse -> + assertThat(grpcResponse.getStatusCode()) + .isEqualTo(GrpcStatusCode.INTERNAL))); + } + } + @Test void multipleItems() { List telemetry = new ArrayList<>(); @@ -603,7 +732,7 @@ void doubleShutdown() { @SuppressLogger(GrpcExporter.class) void error() { GrpcStatusCode statusCode = GrpcStatusCode.INTERNAL; - addGrpcError(statusCode, null); + addGrpcResponse(statusCode, null); try (TelemetryExporter exporter = nonRetryingExporter()) { CompletableResultCode result = @@ -639,7 +768,7 @@ void error() { @Test @SuppressLogger(GrpcExporter.class) void errorWithUnknownError() { - addGrpcError(GrpcStatusCode.UNKNOWN, null); + addGrpcResponse(GrpcStatusCode.UNKNOWN, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -662,7 +791,7 @@ void errorWithUnknownError() { @Test @SuppressLogger(GrpcExporter.class) void errorWithMessage() { - addGrpcError(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); + addGrpcResponse(GrpcStatusCode.RESOURCE_EXHAUSTED, "out of quota"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -683,7 +812,7 @@ void errorWithMessage() { @Test @SuppressLogger(GrpcExporter.class) void errorWithEscapedMessage() { - addGrpcError(GrpcStatusCode.NOT_FOUND, "クマ🐻"); + addGrpcResponse(GrpcStatusCode.NOT_FOUND, "クマ🐻"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -704,7 +833,7 @@ void errorWithEscapedMessage() { @Test @SuppressLogger(GrpcExporter.class) void testExport_Unavailable() { - addGrpcError(GrpcStatusCode.UNAVAILABLE, null); + addGrpcResponse(GrpcStatusCode.UNAVAILABLE, null); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -726,7 +855,7 @@ void testExport_Unavailable() { @Test @SuppressLogger(GrpcExporter.class) protected void testExport_Unimplemented() { - addGrpcError(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); + addGrpcResponse(GrpcStatusCode.UNIMPLEMENTED, "UNIMPLEMENTED"); try (TelemetryExporter exporter = nonRetryingExporter()) { assertThat( @@ -772,7 +901,7 @@ protected void testExport_Unimplemented() { @ValueSource(ints = {1, 4, 8, 10, 11, 14, 15}) @SuppressLogger(GrpcExporter.class) void retryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -787,8 +916,8 @@ void retryableError(int code) { @Test @SuppressLogger(GrpcExporter.class) void retryableError_tooManyAttempts() { - addGrpcError(GrpcStatusCode.CANCELLED, null); - addGrpcError(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); + addGrpcResponse(GrpcStatusCode.CANCELLED, null); assertThat( exporter @@ -804,7 +933,7 @@ void retryableError_tooManyAttempts() { @ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16}) @SuppressLogger(GrpcExporter.class) void nonRetryableError(int code) { - addGrpcError(GrpcStatusCode.fromValue(code), null); + addGrpcResponse(GrpcStatusCode.fromValue(code), null); assertThat( exporter @@ -1273,7 +1402,27 @@ protected TelemetryExporter nonRetryingExporter() { return exporterBuilder().setEndpoint(server.httpUri().toString()).setRetryPolicy(null).build(); } - protected static void addGrpcError(GrpcStatusCode code, @Nullable String message) { - grpcErrors.add(new ArmeriaStatusException(code.getValue(), message)); + protected static void addGrpcResponse(GrpcStatusCode code, @Nullable String message) { + addGrpcResponse(code, message, null); + } + + protected static void addGrpcResponse( + GrpcStatusCode code, + @Nullable String message, + @Nullable AbstractMessageLite bodyMessage) { + grpcResponses.add( + new GrpcServerResponse( + code == GrpcStatusCode.OK ? null : new ArmeriaStatusException(code.getValue(), message), + bodyMessage == null ? null : bodyMessage.toByteArray())); + } + + private static final class GrpcServerResponse { + @Nullable final ArmeriaStatusException error; + @Nullable final byte[] responseBytes; + + GrpcServerResponse(@Nullable ArmeriaStatusException error, @Nullable byte[] responseBytes) { + this.error = error; + this.responseBytes = responseBytes; + } } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index 79927098035..3adcd0c8901 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -13,13 +13,16 @@ import static org.junit.jupiter.api.Named.named; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.TlsKeyPair; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; @@ -34,11 +37,8 @@ import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; -import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; -import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -72,6 +72,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -116,6 +117,8 @@ public abstract class AbstractHttpTelemetryExporterTest { private static final ConcurrentLinkedQueue httpRequests = new ConcurrentLinkedQueue<>(); + private static volatile byte[] successResponseBytes = new byte[0]; + @RegisterExtension @Order(1) static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension(); @@ -135,20 +138,17 @@ protected void configure(ServerBuilder sb) { "/v1/traces", new CollectorService<>( ExportTraceServiceRequest::parseFrom, - ExportTraceServiceRequest::getResourceSpansList, - ExportTraceServiceResponse.getDefaultInstance().toByteArray())); + ExportTraceServiceRequest::getResourceSpansList)); sb.service( "/v1/metrics", new CollectorService<>( ExportMetricsServiceRequest::parseFrom, - ExportMetricsServiceRequest::getResourceMetricsList, - ExportMetricsServiceResponse.getDefaultInstance().toByteArray())); + ExportMetricsServiceRequest::getResourceMetricsList)); sb.service( "/v1/logs", new CollectorService<>( ExportLogsServiceRequest::parseFrom, - ExportLogsServiceRequest::getResourceLogsList, - ExportLogsServiceResponse.getDefaultInstance().toByteArray())); + ExportLogsServiceRequest::getResourceLogsList)); sb.http(0); sb.https(0); @@ -162,15 +162,12 @@ protected void configure(ServerBuilder sb) { private static class CollectorService implements HttpService { private final ThrowingExtractor parse; private final Function> getResourceTelemetry; - private final byte[] successResponse; private CollectorService( ThrowingExtractor parse, - Function> getResourceTelemetry, - byte[] successResponse) { + Function> getResourceTelemetry) { this.parse = parse; this.getResourceTelemetry = getResourceTelemetry; - this.successResponse = successResponse; } @Override @@ -195,7 +192,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) { : HttpResponse.of( HttpStatus.OK, MediaType.parse("application/x-protobuf"), - successResponse); + successResponseBytes); }); return HttpResponse.of(responseFuture); } @@ -236,7 +233,6 @@ protected AbstractHttpTelemetryExporterTest( @BeforeAll void setUp() { - // exporter = exporterBuilder() .setEndpoint(server.httpUri() + path) @@ -248,6 +244,7 @@ void setUp() { .setInitialBackoff(Duration.ofMillis(1)) .build()) .build(); + successResponseBytes = exporter.exportResponse(0).toByteArray(); // Sanity check that TLS files are in PEM format. assertThat(certificate.certificateFile()) @@ -553,7 +550,7 @@ void doubleShutdown() { @SuppressLogger(HttpExporter.class) void error() { int statusCode = 500; - addHttpError(statusCode); + addHttpResponse(statusCode); CompletableResultCode result = exporter .export(Collections.singletonList(generateFakeTelemetry())) @@ -590,10 +587,101 @@ void error() { assertThat(log.getLevel()).isEqualTo(Level.WARN); } + @Test + @SuppressLogger(HttpExporter.class) + void responseBodyBounds() { + // We have a 4mb hardcoded response body limit. Responses <= 4mb succeed. Responses >= 4mb + // succeed. We can't test payloads exactly at 4mb because protobuf message lengths are finicky - + // its hard to create a message with an exact size. + + // Body below the limit, succeeds + addHttpResponse(200, exporter.exportResponse(100)); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + + // Body over the limit, fails with RESOURCE_EXHAUSTED + addHttpResponse(200, exporter.exportResponse(4 * 1024 * 1024 + 1)); + result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("HTTP response body exceeded limit of")); + } + + @Test + @SuppressLogger(HttpExporter.class) + void responseBodyBoundsGzipCompressed() throws IOException { + // Verify the body size limit is applied to the decompressed bytes, not the compressed wire + // bytes. The compressed body is well below the 4MB limit but decompresses to just over it. + byte[] payload = new byte[4 * 1024 * 1024 + 1]; + ByteArrayOutputStream compressedBaos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBaos)) { + gzip.write(payload); + } + byte[] compressedBody = compressedBaos.toByteArray(); + + httpErrors.add( + HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.PLAIN_TEXT_UTF_8) + .add("Content-Encoding", "gzip") + .build(), + HttpData.wrap(compressedBody))); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("HTTP response body exceeded limit of")); + } + + @Test + @SuppressLogger(HttpExporter.class) + void unsupportedResponseContentEncoding() { + // Server responds with a Content-Encoding that we don't support. The export should fail + // without retry. + httpErrors.add( + HttpResponse.of( + ResponseHeaders.builder(HttpStatus.OK) + .contentType(MediaType.PLAIN_TEXT_UTF_8) + .add("Content-Encoding", "brotli") + .build(), + HttpData.wrap(new byte[10]))); + CompletableResultCode result = + exporter + .export(Collections.singletonList(generateFakeTelemetry())) + .join(10, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + assertThat(attempts).hasValue(1); + assertThat(result.getFailureThrowable()) + .isInstanceOfSatisfying( + FailedExportException.HttpExportException.class, + ex -> + assertThat(ex.getCause()) + .isNotNull() + .hasMessageContaining("Unsupported Content-Encoding")); + } + @ParameterizedTest @ValueSource(ints = {429, 502, 503, 504}) void retryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -608,8 +696,8 @@ void retryableError(int code) { @Test @SuppressLogger(HttpExporter.class) void retryableError_tooManyAttempts() { - addHttpError(502); - addHttpError(502); + addHttpResponse(502); + addHttpResponse(502); assertThat( exporter @@ -625,7 +713,7 @@ void retryableError_tooManyAttempts() { @SuppressLogger(HttpExporter.class) @ValueSource(ints = {400, 401, 403, 500, 501}) void nonRetryableError(int code) { - addHttpError(code); + addHttpResponse(code); assertThat( exporter @@ -1096,7 +1184,13 @@ private List toProto(List telemetry) { .collect(Collectors.toList()); } - private static void addHttpError(int code) { + private static void addHttpResponse(int code) { httpErrors.add(HttpResponse.of(code)); } + + private static void addHttpResponse(int code, AbstractMessageLite bodyMessage) { + httpErrors.add( + HttpResponse.of( + HttpStatus.valueOf(code), MediaType.PLAIN_TEXT_UTF_8, bodyMessage.toByteArray())); + } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index cbd0168789b..ab9b4611844 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import com.google.protobuf.AbstractMessageLite; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.GrpcSslContexts; @@ -68,6 +69,8 @@ public TelemetryExporterBuilder setEndpoint(String endpoint) { // the User-Agent to be spec compliant they must manually set the user agent when building // their channel. channelBuilder.userAgent(OtlpUserAgent.getUserAgent()); + // Its user's responsibility to set the max inbound message size when building their channel. + channelBuilder.maxInboundMessageSize(4 * 1024 * 1024); return this; } @@ -230,6 +233,11 @@ public CompletableResultCode shutdown() { shutdownCallback.run(); return delegateExporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + return delegateExporter.exportResponse(minimumSize); + } }; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java index 65a37f2650f..6bb4c3132cc 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporter.java @@ -5,8 +5,18 @@ package io.opentelemetry.exporter.otlp.testing.internal; +import com.google.common.base.Strings; +import com.google.protobuf.AbstractMessageLite; import io.opentelemetry.exporter.otlp.profiles.ProfileData; import io.opentelemetry.exporter.otlp.profiles.ProfileExporter; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesPartialSuccess; +import io.opentelemetry.proto.collector.profiles.v1development.ExportProfilesServiceResponse; +import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -36,6 +46,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportTraceServiceResponse.getDefaultInstance(); + } + return ExportTraceServiceResponse.newBuilder() + .setPartialSuccess( + ExportTracePartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -56,6 +79,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportMetricsServiceResponse.getDefaultInstance(); + } + return ExportMetricsServiceResponse.newBuilder() + .setPartialSuccess( + ExportMetricsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -76,6 +112,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportLogsServiceResponse.getDefaultInstance(); + } + return ExportLogsServiceResponse.newBuilder() + .setPartialSuccess( + ExportLogsPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -96,6 +145,19 @@ public CompletableResultCode export(Collection items) { public CompletableResultCode shutdown() { return exporter.shutdown(); } + + @Override + public AbstractMessageLite exportResponse(int minimumSize) { + if (minimumSize == 0) { + return ExportProfilesServiceResponse.getDefaultInstance(); + } + return ExportProfilesServiceResponse.newBuilder() + .setPartialSuccess( + ExportProfilesPartialSuccess.newBuilder() + .setErrorMessage(Strings.repeat("x", minimumSize)) + .build()) + .build(); + } }; } @@ -105,6 +167,8 @@ public CompletableResultCode shutdown() { CompletableResultCode shutdown(); + AbstractMessageLite exportResponse(int minimumSize); + @Override default void close() { shutdown().join(10, TimeUnit.SECONDS); diff --git a/exporters/sender/grpc-managed-channel/build.gradle.kts b/exporters/sender/grpc-managed-channel/build.gradle.kts index d6ceea653c8..9ee493fb7a1 100644 --- a/exporters/sender/grpc-managed-channel/build.gradle.kts +++ b/exporters/sender/grpc-managed-channel/build.gradle.kts @@ -9,6 +9,8 @@ description = "OpenTelemetry gRPC Upstream Sender" otelJava.moduleName.set("io.opentelemetry.exporter.sender.grpc.managedchannel.internal") dependencies { + annotationProcessor("com.google.auto.value:auto-value") + implementation(project(":exporters:common")) implementation(project(":sdk:common")) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java similarity index 56% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java rename to exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java index 1fe4c1c62f6..6bd7315f87c 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ImmutableGrpcResponse.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/ImmutableGrpcResponse.java @@ -3,25 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.grpc; +package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; import com.google.auto.value.AutoValue; import io.opentelemetry.sdk.common.export.GrpcResponse; import io.opentelemetry.sdk.common.export.GrpcStatusCode; import javax.annotation.Nullable; -/** - * Auto value implementation of {@link GrpcResponse}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ @AutoValue -public abstract class ImmutableGrpcResponse implements GrpcResponse { +abstract class ImmutableGrpcResponse implements GrpcResponse { - public static ImmutableGrpcResponse create( - GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseBody) { - return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseBody); + static ImmutableGrpcResponse create( + GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseMessage) { + return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseMessage); } @SuppressWarnings("mutable") diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java index 0ccf7f4d00e..3a755be609f 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -21,7 +21,6 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.internal.grpc.ImmutableGrpcResponse; import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.GrpcResponse; diff --git a/exporters/sender/jdk/build.gradle.kts b/exporters/sender/jdk/build.gradle.kts index 7f848b7708b..2bff4e6b2cb 100644 --- a/exporters/sender/jdk/build.gradle.kts +++ b/exporters/sender/jdk/build.gradle.kts @@ -10,6 +10,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal") otelJava.minJavaVersionSupported.set(JavaVersion.VERSION_11) dependencies { + annotationProcessor("com.google.auto.value:auto-value") + implementation(project(":exporters:common")) implementation(project(":sdk:common")) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java new file mode 100644 index 00000000000..4af860c696b --- /dev/null +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/ImmutableHttpResponse.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.jdk.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.HttpResponse; + +@AutoValue +abstract class ImmutableHttpResponse implements HttpResponse { + + static ImmutableHttpResponse create(int statusCode, String statusMessage, byte[] responseBody) { + return new AutoValue_ImmutableHttpResponse(statusCode, statusMessage, responseBody); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseBody(); +} diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index aee4990c651..2ffb296de9c 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -5,10 +5,9 @@ package io.opentelemetry.exporter.sender.jdk.internal; -import static java.util.stream.Collectors.joining; - import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.Compressor; +import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.HttpSender; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -16,19 +15,19 @@ import io.opentelemetry.sdk.common.internal.DaemonThreadFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -41,6 +40,7 @@ import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.zip.GZIPInputStream; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -72,6 +72,7 @@ public final class JdkHttpSender implements HttpSender { private final Supplier>> headerSupplier; @Nullable private final RetryPolicy retryPolicy; private final Predicate retryExceptionPredicate; + private final long maxResponseBodySize; // Visible for testing JdkHttpSender( @@ -82,7 +83,8 @@ public final class JdkHttpSender implements HttpSender { Duration timeout, Supplier>> headerSupplier, @Nullable RetryPolicy retryPolicy, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this.client = client; this.endpoint = endpoint; this.contentType = contentType; @@ -101,6 +103,7 @@ public final class JdkHttpSender implements HttpSender { this.executorService = executorService; this.managedExecutor = false; } + this.maxResponseBodySize = maxResponseBodySize; } JdkHttpSender( @@ -113,7 +116,8 @@ public final class JdkHttpSender implements HttpSender { @Nullable RetryPolicy retryPolicy, @Nullable ProxyOptions proxyOptions, @Nullable SSLContext sslContext, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { this( configureClient(sslContext, connectTimeout, proxyOptions), endpoint, @@ -122,7 +126,8 @@ public final class JdkHttpSender implements HttpSender { timeout, headerSupplier, retryPolicy, - executorService); + executorService, + maxResponseBodySize); } private static ExecutorService newExecutor() { @@ -151,10 +156,8 @@ private static HttpClient configureClient( @Override public void send( - MessageWriter messageWriter, - Consumer onResponse, - Consumer onError) { - CompletableFuture> unused = + MessageWriter messageWriter, Consumer onResponse, Consumer onError) { + CompletableFuture unused = CompletableFuture.supplyAsync( () -> { try { @@ -170,12 +173,12 @@ public void send( onError.accept(throwable); return; } - onResponse.accept(toHttpResponse(httpResponse)); + onResponse.accept(httpResponse); }); } // Visible for testing - HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { + HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOException { long startTimeNanos = System.nanoTime(); HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(endpoint).timeout(timeout); Map> headers = headerSupplier.get(); @@ -183,6 +186,8 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce headers.forEach((key, values) -> values.forEach(value -> requestBuilder.header(key, value))); } requestBuilder.header("Content-Type", contentType); + // Advertise gzip and identity response encoding support. + requestBuilder.header("Accept-Encoding", "gzip, identity"); NoCopyByteArrayOutputStream os = threadLocalBaos.get(); os.reset(); @@ -207,7 +212,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce long attempt = 0; long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos(); - HttpResponse httpResponse = null; + HttpResponse httpResponse = null; IOException exception = null; do { if (attempt > 0) { @@ -234,7 +239,7 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce requestBuilder.timeout(timeout.minusNanos(System.nanoTime() - startTimeNanos)); try { httpResponse = sendRequest(requestBuilder, byteBufferPool); - boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode()); + boolean retryable = retryableStatusCodes.contains(httpResponse.getStatusCode()); if (logger.isLoggable(Level.FINER)) { logger.log( Level.FINER, @@ -273,21 +278,16 @@ HttpResponse sendInternal(MessageWriter requestBodyWriter) throws IOExce throw exception; } - private static String responseStringRepresentation(HttpResponse response) { - StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}"); - joiner.add("code=" + response.statusCode()); - joiner.add( - "headers=" - + response.headers().map().entrySet().stream() - .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) - .collect(joining(",", "[", "]"))); - return joiner.toString(); + private static String responseStringRepresentation(HttpResponse response) { + return "HttpResponse{code=" + response.getStatusCode() + "}"; } - private HttpResponse sendRequest( + private HttpResponse sendRequest( HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { - return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray()); + java.net.http.HttpResponse response = + client.send(requestBuilder.build(), BodyHandlers.ofInputStream()); + return toHttpResponse(response); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); @@ -306,7 +306,23 @@ private static boolean isRetryableException(IOException throwable) { // Known retryable HttpTimeoutException messages: "request timed out" // Known retryable HttpConnectTimeoutException messages: "HTTP connect timed // out" - return !(throwable instanceof SSLException); + // ResponseBodyTooLargeException and UnsupportedContentEncodingException are permanent errors: + // a larger body or unsupported encoding will not resolve on retry. + return !(throwable instanceof SSLException) + && !(throwable instanceof ResponseBodyTooLargeException) + && !(throwable instanceof UnsupportedContentEncodingException); + } + + private static final class ResponseBodyTooLargeException extends IOException { + ResponseBodyTooLargeException(String message) { + super(message); + } + } + + private static final class UnsupportedContentEncodingException extends IOException { + UnsupportedContentEncodingException(String message) { + super(message); + } } private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { @@ -319,24 +335,48 @@ private byte[] buf() { } } - private static io.opentelemetry.sdk.common.export.HttpResponse toHttpResponse( - HttpResponse response) { - return new io.opentelemetry.sdk.common.export.HttpResponse() { - @Override - public int getStatusCode() { - return response.statusCode(); - } - - @Override - public String getStatusMessage() { - return String.valueOf(response.statusCode()); - } - - @Override - public byte[] getResponseBody() { - return response.body(); + private HttpResponse toHttpResponse(java.net.http.HttpResponse response) + throws IOException { + int statusCode = response.statusCode(); + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if we read more than maxResponseBodySize bytes, the body exceeded + // the limit. A body exactly at the limit will read no further (EOF is reached first). + // If maxResponseBodySize is >= Integer.MAX_VALUE, adding 1 would overflow (long) or exceed + // what an int can hold. In that case use Integer.MAX_VALUE — the overflow check can never + // trigger for such a large limit. + int readUpTo = + maxResponseBodySize >= Integer.MAX_VALUE + ? Integer.MAX_VALUE + : (int) (maxResponseBodySize + 1); + + String contentEncoding = response.headers().firstValue("Content-Encoding").orElse(null); + if (contentEncoding != null && !"gzip".equalsIgnoreCase(contentEncoding)) { + throw new UnsupportedContentEncodingException( + "Unsupported Content-Encoding: " + contentEncoding); + } + boolean decompress = "gzip".equalsIgnoreCase(contentEncoding); + + byte[] bodyBytes; + try (InputStream rawIs = response.body()) { + // The limit is applied to the decompressed bytes. + InputStream is = decompress ? new GZIPInputStream(rawIs) : rawIs; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buf = new byte[4 * 0x400]; // 4KB + int n; + while (baos.size() < readUpTo + && (n = is.read(buf, 0, Math.min(buf.length, readUpTo - baos.size()))) != -1) { + baos.write(buf, 0, n); } - }; + bodyBytes = baos.toByteArray(); + } catch (IOException e) { + bodyBytes = new byte[0]; + logger.log(Level.WARNING, "Failed to read response body", e); + } + if (bodyBytes.length > maxResponseBodySize) { + throw new ResponseBodyTooLargeException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes"); + } + return ImmutableHttpResponse.create(statusCode, String.valueOf(statusCode), bodyBytes); } private static class ByteBufferPool { diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index 0ece6a9978d..fcd9d64e0fe 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -29,6 +29,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getProxyOptions(), httpSenderConfig.getSslContext(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index c5bddc79fb1..24f764f3857 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -66,7 +66,8 @@ void setup() throws IOException, InterruptedException { Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); } @Test @@ -121,7 +122,8 @@ void sendInternal_RetryableConnectException() throws IOException, InterruptedExc Duration.ofSeconds(10), Collections::emptyMap, RetryPolicy.builder().setMaxAttempts(2).setInitialBackoff(Duration.ofMillis(1)).build(), - null); + null, + Long.MAX_VALUE); assertThatThrownBy(() -> sender.sendInternal(new NoOpRequestBodyWriter())) .satisfies( @@ -177,7 +179,8 @@ void connectTimeout() { null, null, null, - null); + null, + Long.MAX_VALUE); assertThat(sender) .extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class))) diff --git a/exporters/sender/okhttp/build.gradle.kts b/exporters/sender/okhttp/build.gradle.kts index 107270e9ada..a004fea9034 100644 --- a/exporters/sender/okhttp/build.gradle.kts +++ b/exporters/sender/okhttp/build.gradle.kts @@ -12,6 +12,8 @@ dependencies { implementation(project(":exporters:common")) implementation(project(":sdk:common")) + annotationProcessor("com.google.auto.value:auto-value") + implementation("com.squareup.okhttp3:okhttp") compileOnly("io.grpc:grpc-stub") diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java new file mode 100644 index 00000000000..cb229c2a6a2 --- /dev/null +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableGrpcResponse.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.GrpcResponse; +import io.opentelemetry.sdk.common.export.GrpcStatusCode; +import javax.annotation.Nullable; + +@AutoValue +abstract class ImmutableGrpcResponse implements GrpcResponse { + + static ImmutableGrpcResponse create( + GrpcStatusCode statusCode, @Nullable String statusDescription, byte[] responseMessage) { + return new AutoValue_ImmutableGrpcResponse(statusCode, statusDescription, responseMessage); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseMessage(); +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java new file mode 100644 index 00000000000..80fafddd833 --- /dev/null +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/ImmutableHttpResponse.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.common.export.HttpResponse; + +@AutoValue +abstract class ImmutableHttpResponse implements HttpResponse { + + static ImmutableHttpResponse create(int statusCode, String statusMessage, byte[] responseBody) { + return new AutoValue_ImmutableHttpResponse(statusCode, statusMessage, responseBody); + } + + @SuppressWarnings("mutable") + @Override + public abstract byte[] getResponseBody(); +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 82b4a169270..90dc3c698c5 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -32,7 +32,6 @@ import io.opentelemetry.sdk.common.export.GrpcStatusCode; import io.opentelemetry.sdk.common.export.MessageWriter; import io.opentelemetry.sdk.common.export.RetryPolicy; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -63,7 +62,6 @@ import okhttp3.ResponseBody; import okio.Buffer; import okio.GzipSource; -import okio.Okio; /** * A {@link GrpcSender} which uses OkHttp instead of grpc-java. @@ -83,6 +81,7 @@ public final class OkHttpGrpcSender implements GrpcSender { private final HttpUrl url; @Nullable private final Compressor compressor; private final Supplier>> headersSupplier; + private final long maxResponseBodySize; /** Creates a new {@link OkHttpGrpcSender}. */ @SuppressWarnings("TooManyParameters") @@ -95,7 +94,8 @@ public OkHttpGrpcSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -133,6 +133,7 @@ public OkHttpGrpcSender( this.compressor = compressor; this.headersSupplier = headersSupplier; this.url = HttpUrl.get(endpoint); + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -146,6 +147,8 @@ public void send( (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); } requestBuilder.addHeader("te", "trailers"); + // Mirror grpc-java's default DecompressorRegistry which advertises identity and gzip. + requestBuilder.addHeader("grpc-accept-encoding", "identity,gzip"); if (compressor != null) { requestBuilder.addHeader("grpc-encoding", compressor.getEncoding()); } @@ -165,58 +168,92 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - // Must consume body before accessing trailers - byte[] bodyBytes = null; - try { - bodyBytes = getResponseMessageBytes(body.bytes()); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.FINE, "Failed to read response body", e); - } - byte[] resolvedBodyBytes = bodyBytes; - GrpcStatusCode status = grpcStatus(response); - String description = grpcMessage(response); - onResponse.accept( - new GrpcResponse() { - @Override - public GrpcStatusCode getStatusCode() { - return status; - } - - @Override - public String getStatusDescription() { - return description; - } - - @Override - public byte[] getResponseMessage() { - return resolvedBodyBytes; - } - }); - } + handleResponse(response, onResponse); } })); } - private static byte[] getResponseMessageBytes(byte[] bodyBytes) throws IOException { - if (bodyBytes.length >= 5) { - ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes); - bodyStream.skip(5); - if (bodyBytes[0] == 1) { - Buffer buffer = new Buffer(); - buffer.readFrom(bodyStream); - GzipSource gzipSource = new GzipSource(buffer); - bodyBytes = Okio.buffer(gzipSource).readByteArray(); + private void handleResponse(Response response, Consumer onResponse) { + try (ResponseBody body = response.body()) { + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer wireBuffer = new Buffer(); + try { + while (wireBuffer.size() <= maxResponseBodySize) { + long n = body.source().read(wireBuffer, readUpTo - wireBuffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.FINE, "Failed to read response body", e); + } + + if (wireBuffer.size() > maxResponseBodySize) { + onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); + return; + } + + // Must consume body before accessing trailers + byte[] bodyBytes = new byte[0]; + byte[] wireBytes = wireBuffer.readByteArray(); + if (wireBytes.length >= 5) { + if (wireBytes[0] == 0) { + // Not compressed: slice the message payload from the gRPC frame + bodyBytes = Arrays.copyOfRange(wireBytes, 5, wireBytes.length); + } else { + // Compressed: validate the encoding and decompress with a post-decompression size limit + String encoding = response.header("grpc-encoding"); + if (!"gzip".equalsIgnoreCase(encoding)) { + onResponse.accept(responseUnsupportedGrpcEncoding(encoding)); + return; + } + try { + Buffer compressedBuffer = new Buffer(); + compressedBuffer.write(wireBytes, 5, wireBytes.length - 5); + GzipSource gzipSource = new GzipSource(compressedBuffer); + Buffer decompressedBuffer = new Buffer(); + while (decompressedBuffer.size() <= maxResponseBodySize) { + long n = gzipSource.read(decompressedBuffer, readUpTo - decompressedBuffer.size()); + if (n == -1L) { + break; + } + } + if (decompressedBuffer.size() > maxResponseBodySize) { + onResponse.accept(responseMessageTooLarge(maxResponseBodySize)); + return; + } + bodyBytes = decompressedBuffer.readByteArray(); + } catch (IOException e) { + logger.log(Level.FINE, "Failed to decompress response body", e); + } + } } else { - bodyBytes = Arrays.copyOfRange(bodyBytes, 5, bodyBytes.length); + logger.log(Level.FINE, "Invalid gRPC response frame"); } - return bodyBytes; - } else { - throw new IOException("Invalid response"); + onResponse.accept( + ImmutableGrpcResponse.create(grpcStatus(response), grpcMessage(response), bodyBytes)); } } + private static GrpcResponse responseMessageTooLarge(long maxResponseBodySize) { + return ImmutableGrpcResponse.create( + GrpcStatusCode.RESOURCE_EXHAUSTED, + "gRPC response body exceeded limit of " + maxResponseBodySize + " bytes", + new byte[0]); + } + + private static GrpcResponse responseUnsupportedGrpcEncoding(@Nullable String encoding) { + return ImmutableGrpcResponse.create( + GrpcStatusCode.INTERNAL, "Unsupported gRPC message encoding: " + encoding, new byte[0]); + } + private static GrpcStatusCode grpcStatus(Response response) { // Status can either be in the headers or trailers depending on error String grpcStatus = response.header(GRPC_STATUS); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index 09bb6eefe4f..fb4b275444f 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -31,6 +31,7 @@ public GrpcSender createSender(GrpcSenderConfig grpcSenderConfig) { grpcSenderConfig.getRetryPolicy(), grpcSenderConfig.getSslContext(), grpcSenderConfig.getTrustManager(), - grpcSenderConfig.getExecutorService()); + grpcSenderConfig.getExecutorService(), + grpcSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 16b5d90b7f4..49c7fc10821 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -39,8 +39,11 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import okio.Buffer; import okio.BufferedSink; +import okio.GzipSource; import okio.Okio; +import okio.Source; /** * {@link HttpSender} which is backed by OkHttp. @@ -58,6 +61,7 @@ public final class OkHttpHttpSender implements HttpSender { private final Supplier>> headerSupplier; private final MediaType mediaType; @Nullable private final Compressor compressor; + private final long maxResponseBodySize; /** Create a sender. */ @SuppressWarnings("TooManyParameters") @@ -72,7 +76,8 @@ public OkHttpHttpSender( @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @Nullable X509TrustManager trustManager, - @Nullable ExecutorService executorService) { + @Nullable ExecutorService executorService, + long maxResponseBodySize) { int callTimeoutMillis = (int) Math.min(timeout.toMillis(), Integer.MAX_VALUE); int connectTimeoutMillis = (int) Math.min(connectTimeout.toMillis(), Integer.MAX_VALUE); @@ -111,6 +116,7 @@ public OkHttpHttpSender( this.mediaType = MediaType.parse(contentType); this.compressor = compressor; this.headerSupplier = headerSupplier; + this.maxResponseBodySize = maxResponseBodySize; } @Override @@ -126,6 +132,10 @@ public void send( if (compressor != null) { requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); } + // Explicitly advertise gzip and identity encoding support. Because we set Accept-Encoding + // ourselves, OkHttp's BridgeInterceptor will not transparently decompress gzip responses + // (it only does so when it added the header), so we handle decompression ourselves below. + requestBuilder.addHeader("Accept-Encoding", "gzip, identity"); requestBuilder.post(new RequestBodyImpl(messageWriter, compressor, mediaType)); InstrumentationUtil.suppressInstrumentation( @@ -141,39 +151,54 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - onResponse.accept( - new HttpResponse() { - @Nullable private byte[] bodyBytes; - - @Override - public int getStatusCode() { - return response.code(); - } - - @Override - public String getStatusMessage() { - return response.message(); - } - - @Override - public byte[] getResponseBody() { - if (bodyBytes == null) { - try { - bodyBytes = body.bytes(); - } catch (IOException e) { - bodyBytes = new byte[0]; - logger.log(Level.WARNING, "Failed to read response body", e); - } - } - return bodyBytes; - } - }); - } + handleResponse(response, onResponse, onError); } })); } + private void handleResponse( + Response response, Consumer onResponse, Consumer onError) { + try (ResponseBody body = response.body()) { + String contentEncoding = response.header("Content-Encoding"); + if (contentEncoding != null && !"gzip".equalsIgnoreCase(contentEncoding)) { + onError.accept(new IOException("Unsupported Content-Encoding: " + contentEncoding)); + return; + } + boolean decompress = "gzip".equalsIgnoreCase(contentEncoding); + // Read up to maxResponseBodySize + 1 bytes. Reading exactly one byte more than the limit + // lets us detect overflow: if the buffer ends up larger than maxResponseBodySize, the body + // exceeded the limit. A body exactly at the limit will only fill the buffer to + // maxResponseBodySize (EOF is reached before the extra byte is read). + // If maxResponseBodySize is Long.MAX_VALUE, adding 1 would overflow. In that case use + // Long.MAX_VALUE directly — the overflow check can never trigger for such a large limit. + long readUpTo = + maxResponseBodySize == Long.MAX_VALUE ? Long.MAX_VALUE : maxResponseBodySize + 1; + Buffer buffer = new Buffer(); + try { + Source source = decompress ? new GzipSource(body.source()) : body.source(); + while (buffer.size() <= maxResponseBodySize) { + long n = source.read(buffer, readUpTo - buffer.size()); + if (n == -1L) { + break; + } + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to read response body", e); + } + + if (buffer.size() > maxResponseBodySize) { + onError.accept( + new IOException( + "HTTP response body exceeded limit of " + maxResponseBodySize + " bytes")); + return; + } + + byte[] bodyBytes = buffer.readByteArray(); + onResponse.accept( + ImmutableHttpResponse.create(response.code(), response.message(), bodyBytes)); + } + } + @Override public CompletableResultCode shutdown() { client.dispatcher().cancelAll(); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index 0fabe6e3bdb..581cf9bb549 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -30,6 +30,7 @@ public HttpSender createSender(HttpSenderConfig httpSenderConfig) { httpSenderConfig.getRetryPolicy(), httpSenderConfig.getSslContext(), httpSenderConfig.getTrustManager(), - httpSenderConfig.getExecutorService()); + httpSenderConfig.getExecutorService(), + httpSenderConfig.getMaxResponseBodySize()); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java index 2127f92a072..a7f314c93b5 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderTest.java @@ -89,7 +89,8 @@ void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); CompletableResultCode sendResult = new CompletableResultCode(); sender.send( @@ -132,7 +133,8 @@ void shutdown_NonManagedExecutor_ReturnsImmediately() { null, null, null, - customExecutor); // Pass custom executor -> managedExecutor = false + customExecutor, // Pass custom executor -> managedExecutor = false + Long.MAX_VALUE); CompletableResultCode shutdownResult = sender.shutdown(); @@ -169,7 +171,8 @@ void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exc null, null, null, - null); // null executor = managed + null, // null executor = managed + Long.MAX_VALUE); // Start multiple requests to ensure threads are busy CountDownLatch blockCallbacks = new CountDownLatch(1); @@ -231,7 +234,8 @@ void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception { null, null, null, - null); + null, + Long.MAX_VALUE); // Trigger some activity sender.send(new TestMessageWriter(), response -> {}, error -> {}); diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 705f96b601d..3bc73283db7 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -39,6 +39,7 @@ OkHttpGrpcSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 95699b5aa51..c64247a9b36 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -46,6 +46,7 @@ OkHttpHttpSender createSender(String endpoint) { null, null, null, - null); + null, + Long.MAX_VALUE); } } diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index 66bdb787cdb..ba5429d827a 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -42,12 +42,14 @@ public final class JaegerRemoteSamplerBuilder { private static final Sampler INITIAL_SAMPLER = Sampler.parentBased(Sampler.traceIdRatioBased(0.001)); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); + private static final long DEFAULT_MAX_RESPONSE_BODY_SIZE = 4 * 1024L * 1024L; private URI endpoint = DEFAULT_ENDPOINT; private Sampler initialSampler = INITIAL_SAMPLER; private int pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS; private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); private Supplier> headerSupplier = Collections::emptyMap; + private long maxResponseBodySize = DEFAULT_MAX_RESPONSE_BODY_SIZE; @Nullable private String serviceName; @@ -147,6 +149,16 @@ public JaegerRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) { return this; } + /** + * Sets the maximum number of bytes to read from a sampling strategy response body. If unset, + * defaults to 4 MiB. Must be positive. + */ + public JaegerRemoteSamplerBuilder setMaxSamplingStrategyResponseBodySize(long bytes) { + Utils.checkArgument(bytes > 0, "maxSamplingStrategyResponseBodySize must be positive"); + this.maxResponseBodySize = bytes; + return this; + } + /** * Sets the managed channel to use when communicating with the backend. Takes precedence over * {@link #setEndpoint(String)} if both are called. @@ -195,7 +207,8 @@ private GrpcSender resolveGrpcSender() { tlsConfigHelper.getSslContext(), tlsConfigHelper.getTrustManager(), null, - grpcChannel); + grpcChannel, + maxResponseBodySize); return grpcSenderProvider.createSender(grpcSenderConfig); } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java index 500b12ad147..047d99eccec 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSender.java @@ -27,6 +27,9 @@ public interface GrpcSender { * called when the call could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link GrpcResponse#getResponseMessage()} should contain at most + * {@link GrpcSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the message writer * @param onResponse the callback to invoke with the gRPC response * @param onError the callback to invoke when the gRPC call could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java index 53fd2eb556f..35e02e09b4c 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/GrpcSenderConfig.java @@ -88,4 +88,14 @@ public interface GrpcSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 4 * 1024L * 1024L; + } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java index a039d4ad6d0..bf0d75524f1 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSender.java @@ -27,6 +27,9 @@ public interface HttpSender { * called when the request could not be executed due to cancellation, connectivity problems, or * timeout. * + *

The byte array returned by {@link HttpResponse#getResponseBody()} should contain at most + * {@link HttpSenderConfig#getMaxResponseBodySize()} bytes. + * * @param messageWriter the request body message writer * @param onResponse the callback to invoke with the HTTP response * @param onError the callback to invoke when the HTTP request could not be executed diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java index d67a71968a7..83d83b9f7b0 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/HttpSenderConfig.java @@ -86,4 +86,14 @@ public interface HttpSenderConfig { */ @Nullable ExecutorService getExecutorService(); + + /** + * The maximum number of bytes to read from a response body. Defaults to 4 MiB. + * + *

Warning: setting a high or unbounded limit allows a malicious or misconfigured server to + * cause unbounded heap allocation, potentially leading to out-of-memory errors. + */ + default long getMaxResponseBodySize() { + return 4 * 1024L * 1024L; + } }