Skip to content

Commit 5ea6c2c

Browse files
feat: add support for gRPC client interceptors in ClientConfig (#360)
1 parent db93077 commit 5ea6c2c

4 files changed

Lines changed: 86 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.9.0 [unreleased]
22

3+
### Features
4+
5+
1. [#360](https://github.com/InfluxCommunity/influxdb3-java/pull/360): Support passing interceptors to the Flight client.
6+
37
## 1.8.0 [2026-02-19]
48

59
### Features

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.time.Duration;
2929
import java.util.Arrays;
3030
import java.util.HashMap;
31+
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Objects;
3334
import java.util.Properties;
@@ -36,6 +37,8 @@
3637
import javax.annotation.Nonnull;
3738
import javax.annotation.Nullable;
3839

40+
import io.grpc.ClientInterceptor;
41+
3942
import com.influxdb.v3.client.write.WriteOptions;
4043
import com.influxdb.v3.client.write.WritePrecision;
4144

@@ -68,6 +71,7 @@
6871
* <li><code>headers</code> - headers to be added to requests</li>
6972
* <li><code>sslRootsFilePath</code> - path to the stored certificates file in PEM format</li>
7073
* <li><code>disableGRPCCompression</code> - disables the default gRPC compression header</li>
74+
* <li><code>interceptors</code> - list of client interceptors to be used in the query API</li>
7175
* </ul>
7276
* <p>
7377
* If you want to create a client with custom configuration, you can use following code:
@@ -115,6 +119,7 @@ public final class ClientConfig {
115119
private final Map<String, String> headers;
116120
private final String sslRootsFilePath;
117121
private final boolean disableGRPCCompression;
122+
private final List<ClientInterceptor> interceptors;
118123

119124
/**
120125
* Deprecated use {@link #proxyUrl}.
@@ -329,6 +334,16 @@ public boolean getDisableGRPCCompression() {
329334
return disableGRPCCompression;
330335
}
331336

337+
/**
338+
* Gets a list of client interceptors.
339+
*
340+
* @return a list of client interceptors.
341+
*/
342+
@Nullable
343+
public List<ClientInterceptor> getInterceptors() {
344+
return interceptors;
345+
}
346+
332347
/**
333348
* Validates the configuration properties.
334349
*/
@@ -366,7 +381,8 @@ public boolean equals(final Object o) {
366381
&& Objects.equals(authenticator, that.authenticator)
367382
&& Objects.equals(headers, that.headers)
368383
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath)
369-
&& disableGRPCCompression == that.disableGRPCCompression;
384+
&& disableGRPCCompression == that.disableGRPCCompression
385+
&& Objects.equals(interceptors, that.interceptors);
370386
}
371387

372388
@Override
@@ -375,7 +391,7 @@ public int hashCode() {
375391
database, writePrecision, gzipThreshold, writeNoSync,
376392
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
377393
proxy, proxyUrl, authenticator, headers,
378-
defaultTags, sslRootsFilePath, disableGRPCCompression);
394+
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
379395
}
380396

381397
@Override
@@ -429,6 +445,7 @@ public static final class Builder {
429445
private Map<String, String> headers;
430446
private String sslRootsFilePath;
431447
private boolean disableGRPCCompression;
448+
private List<ClientInterceptor> interceptors;
432449

433450
/**
434451
* Sets the URL of the InfluxDB server.
@@ -723,6 +740,18 @@ public Builder disableGRPCCompression(final boolean disableGRPCCompression) {
723740
return this;
724741
}
725742

743+
/**
744+
* Sets a list of interceptors to be used for the query API.
745+
*
746+
* @param interceptors a list of ClientInterceptor
747+
* @return this
748+
*/
749+
@Nonnull
750+
public Builder interceptors(@Nullable final List<ClientInterceptor> interceptors) {
751+
this.interceptors = interceptors;
752+
return this;
753+
}
754+
726755
/**
727756
* Build an instance of {@code ClientConfig}.
728757
*
@@ -897,5 +926,6 @@ private ClientConfig(@Nonnull final Builder builder) {
897926
headers = builder.headers;
898927
sslRootsFilePath = builder.sslRootsFilePath;
899928
disableGRPCCompression = builder.disableGRPCCompression;
929+
interceptors = builder.interceptors;
900930
}
901931
}

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
176176
.with(Codec.Identity.NONE, false));
177177
}
178178

179+
if (config.getInterceptors() != null) {
180+
nettyChannelBuilder.intercept(config.getInterceptors());
181+
}
182+
179183
return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
180184
}
181185

src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,19 @@
2424
import java.net.InetSocketAddress;
2525
import java.net.URISyntaxException;
2626
import java.util.HashMap;
27+
import java.util.List;
2728
import java.util.Map;
2829
import java.util.stream.IntStream;
2930
import java.util.stream.Stream;
3031

32+
import io.grpc.CallOptions;
33+
import io.grpc.Channel;
34+
import io.grpc.ClientCall;
35+
import io.grpc.ClientInterceptor;
36+
import io.grpc.ForwardingClientCall;
3137
import io.grpc.HttpConnectProxiedSocketAddress;
38+
import io.grpc.Metadata;
39+
import io.grpc.MethodDescriptor;
3240
import io.grpc.ProxyDetector;
3341
import io.grpc.internal.GrpcUtil;
3442
import org.apache.arrow.flight.CallHeaders;
@@ -108,6 +116,44 @@ public void callHeaders() throws Exception {
108116
}
109117
}
110118

119+
@Test
120+
public void setHeaderInInterceptor() throws Exception {
121+
ClientInterceptor interceptor = new ClientInterceptor() {
122+
@Override
123+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
124+
final MethodDescriptor<ReqT, RespT> method,
125+
final CallOptions callOptions,
126+
final Channel next
127+
) {
128+
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
129+
return new ForwardingClientCall.SimpleForwardingClientCall<>(call) {
130+
@Override
131+
public void start(final Listener<RespT> responseListener, final Metadata headers) {
132+
Metadata.Key<String> key = Metadata.Key.of(
133+
"some-header",
134+
Metadata.ASCII_STRING_MARSHALLER
135+
);
136+
headers.put(key, "This is from interceptor");
137+
super.start(responseListener, headers);
138+
}
139+
};
140+
}
141+
};
142+
143+
ClientConfig clientConfig = new ClientConfig.Builder()
144+
.host(server.getLocation().getUri().toString())
145+
.token("my-token".toCharArray())
146+
.interceptors(List.of(interceptor))
147+
.build();
148+
149+
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig);
150+
var data = executeQuery(flightSqlClient, Map.of(), Map.of())) {
151+
Assertions.assertThat(data.count()).isEqualTo(rowCount);
152+
final Map<String, String> receivedHeaders = headerFactory.getLastInstance().getHeaders();
153+
Assertions.assertThat(receivedHeaders.get("some-header")).isEqualTo("This is from interceptor");
154+
}
155+
}
156+
111157
@Test
112158
public void callHeadersWithoutToken() throws Exception {
113159
ClientConfig clientConfig = new ClientConfig.Builder()

0 commit comments

Comments
 (0)