diff --git a/CHANGELOG.md b/CHANGELOG.md index 37978503..50769a30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.9.0 [unreleased] +### Features + +1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client. + ## 1.8.0 [2026-02-19] ### Features diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index 7e0e8ac7..4326f3a5 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -36,6 +37,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import io.grpc.ClientInterceptor; + import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -68,6 +71,7 @@ *
  • headers - headers to be added to requests
  • *
  • sslRootsFilePath - path to the stored certificates file in PEM format
  • *
  • disableGRPCCompression - disables the default gRPC compression header
  • + *
  • interceptors - list of client interceptors to be used in the query API
  • * *

    * If you want to create a client with custom configuration, you can use following code: @@ -115,6 +119,7 @@ public final class ClientConfig { private final Map headers; private final String sslRootsFilePath; private final boolean disableGRPCCompression; + private final List interceptors; /** * Deprecated use {@link #proxyUrl}. @@ -329,6 +334,16 @@ public boolean getDisableGRPCCompression() { return disableGRPCCompression; } + /** + * Gets a list of client interceptors. + * + * @return a list of client interceptors. + */ + @Nullable + public List getInterceptors() { + return interceptors; + } + /** * Validates the configuration properties. */ @@ -366,7 +381,8 @@ public boolean equals(final Object o) { && Objects.equals(authenticator, that.authenticator) && Objects.equals(headers, that.headers) && Objects.equals(sslRootsFilePath, that.sslRootsFilePath) - && disableGRPCCompression == that.disableGRPCCompression; + && disableGRPCCompression == that.disableGRPCCompression + && Objects.equals(interceptors, that.interceptors); } @Override @@ -375,7 +391,7 @@ public int hashCode() { database, writePrecision, gzipThreshold, writeNoSync, timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation, proxy, proxyUrl, authenticator, headers, - defaultTags, sslRootsFilePath, disableGRPCCompression); + defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors); } @Override @@ -429,6 +445,7 @@ public static final class Builder { private Map headers; private String sslRootsFilePath; private boolean disableGRPCCompression; + private List interceptors; /** * Sets the URL of the InfluxDB server. @@ -723,6 +740,18 @@ public Builder disableGRPCCompression(final boolean disableGRPCCompression) { return this; } + /** + * Sets a list of interceptors to be used for the query API. + * + * @param interceptors a list of ClientInterceptor + * @return this + */ + @Nonnull + public Builder interceptors(@Nullable final List interceptors) { + this.interceptors = interceptors; + return this; + } + /** * Build an instance of {@code ClientConfig}. * @@ -897,5 +926,6 @@ private ClientConfig(@Nonnull final Builder builder) { headers = builder.headers; sslRootsFilePath = builder.sslRootsFilePath; disableGRPCCompression = builder.disableGRPCCompression; + interceptors = builder.interceptors; } } diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index d80c7e87..9880e7f2 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -176,6 +176,10 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) { .with(Codec.Identity.NONE, false)); } + if (config.getInterceptors() != null) { + nettyChannelBuilder.intercept(config.getInterceptors()); + } + return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build()); } diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index a6873c56..ab326c90 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -24,11 +24,19 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.IntStream; import java.util.stream.Stream; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.ProxyDetector; import io.grpc.internal.GrpcUtil; import org.apache.arrow.flight.CallHeaders; @@ -108,6 +116,44 @@ public void callHeaders() throws Exception { } } + @Test + public void setHeaderInInterceptor() throws Exception { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, + final CallOptions callOptions, + final Channel next + ) { + ClientCall call = next.newCall(method, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall<>(call) { + @Override + public void start(final Listener responseListener, final Metadata headers) { + Metadata.Key key = Metadata.Key.of( + "some-header", + Metadata.ASCII_STRING_MARSHALLER + ); + headers.put(key, "This is from interceptor"); + super.start(responseListener, headers); + } + }; + } + }; + + ClientConfig clientConfig = new ClientConfig.Builder() + .host(server.getLocation().getUri().toString()) + .token("my-token".toCharArray()) + .interceptors(List.of(interceptor)) + .build(); + + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig); + var data = executeQuery(flightSqlClient, Map.of(), Map.of())) { + Assertions.assertThat(data.count()).isEqualTo(rowCount); + final Map receivedHeaders = headerFactory.getLastInstance().getHeaders(); + Assertions.assertThat(receivedHeaders.get("some-header")).isEqualTo("This is from interceptor"); + } + } + @Test public void callHeadersWithoutToken() throws Exception { ClientConfig clientConfig = new ClientConfig.Builder()