Skip to content

Commit 8156690

Browse files
committed
Add FunctionInvocationId gRPC metadata propagation for Azure Functions
Adds support for propagating the Azure Functions invocation ID to the Durable Functions host via gRPC metadata, enabling correlation between worker-side function invocations and host-side orchestration events. - Added interceptor support to DurableTaskGrpcClientBuilder - Created FunctionInvocationIdInterceptor in azurefunctions module - Updated DurableClientContext to configure the interceptor automatically - Added unit tests for the interceptor Related to Azure/azure-functions-durable-extension#3317
1 parent 9793693 commit 8156690

6 files changed

Lines changed: 207 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## Unreleased
2+
* Add client operation correlation logging: `FunctionInvocationId` is now propagated via gRPC metadata to the host for client operations, enabling correlation with host logs.
23

34
## v1.6.2
45
* Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249))

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
1212
import com.microsoft.durabletask.OrchestrationMetadata;
1313
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
14+
import com.microsoft.durabletask.azurefunctions.internal.FunctionInvocationIdInterceptor;
1415

1516
import java.io.UnsupportedEncodingException;
1617
import java.net.MalformedURLException;
@@ -29,6 +30,7 @@ public class DurableClientContext {
2930
private String taskHubName;
3031
private String requiredQueryStringParameters;
3132
private DurableTaskClient client;
33+
private String functionInvocationId;
3234

3335
/**
3436
* Gets the name of the client binding's task hub.
@@ -39,6 +41,18 @@ public String getTaskHubName() {
3941
return this.taskHubName;
4042
}
4143

44+
/**
45+
* Sets the function invocation ID for correlation with host-side logs.
46+
* <p>
47+
* Call this method before calling {@link #getClient()} to enable correlation
48+
* between client operations and host-side logs.
49+
*
50+
* @param invocationId the Azure Functions invocation ID
51+
*/
52+
public void setFunctionInvocationId(String invocationId) {
53+
this.functionInvocationId = invocationId;
54+
}
55+
4256
/**
4357
* Gets the durable task client associated with the current function invocation.
4458
*
@@ -56,7 +70,14 @@ public DurableTaskClient getClient() {
5670
throw new IllegalStateException("The client context RPC base URL was invalid!", ex);
5771
}
5872

59-
this.client = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort()).build();
73+
DurableTaskGrpcClientBuilder builder = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort());
74+
75+
// Add interceptor for function invocation ID correlation if set
76+
if (this.functionInvocationId != null && !this.functionInvocationId.isEmpty()) {
77+
builder.addInterceptor(new FunctionInvocationIdInterceptor(this.functionInvocationId));
78+
}
79+
80+
this.client = builder.build();
6081
return this.client;
6182
}
6283

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.microsoft.durabletask.azurefunctions.internal;
5+
6+
import io.grpc.*;
7+
8+
/**
9+
* A gRPC client interceptor that adds the Azure Functions invocation ID to outgoing calls
10+
* for correlation with host-side logs.
11+
*/
12+
public final class FunctionInvocationIdInterceptor implements ClientInterceptor {
13+
private static final String INVOCATION_ID_METADATA_KEY_NAME = "x-azure-functions-invocationid";
14+
private static final Metadata.Key<String> INVOCATION_ID_KEY =
15+
Metadata.Key.of(INVOCATION_ID_METADATA_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER);
16+
17+
private final String invocationId;
18+
19+
/**
20+
* Creates a new interceptor that will add the specified invocation ID to all gRPC calls.
21+
*
22+
* @param invocationId the Azure Functions invocation ID to add to calls
23+
*/
24+
public FunctionInvocationIdInterceptor(String invocationId) {
25+
this.invocationId = invocationId;
26+
}
27+
28+
@Override
29+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
30+
MethodDescriptor<ReqT, RespT> method,
31+
CallOptions callOptions,
32+
Channel next) {
33+
34+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
35+
next.newCall(method, callOptions)) {
36+
@Override
37+
public void start(Listener<RespT> responseListener, Metadata headers) {
38+
if (invocationId != null && !invocationId.isEmpty()) {
39+
headers.put(INVOCATION_ID_KEY, invocationId);
40+
}
41+
super.start(responseListener, headers);
42+
}
43+
};
44+
}
45+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask.azurefunctions.internal;
4+
5+
import io.grpc.*;
6+
import org.junit.jupiter.api.Test;
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.Mockito.*;
9+
10+
/**
11+
* Tests for FunctionInvocationIdInterceptor.
12+
*/
13+
public class FunctionInvocationIdInterceptorTests {
14+
15+
private static final Metadata.Key<String> INVOCATION_ID_KEY =
16+
Metadata.Key.of("x-azure-functions-invocationid", Metadata.ASCII_STRING_MARSHALLER);
17+
18+
@Test
19+
public void interceptCall_addsInvocationIdToMetadata() {
20+
// Arrange
21+
String testInvocationId = "test-invocation-id-123";
22+
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(testInvocationId);
23+
24+
Channel mockChannel = mock(Channel.class);
25+
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
26+
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
27+
CallOptions callOptions = CallOptions.DEFAULT;
28+
29+
when(mockChannel.newCall(any(), any())).thenReturn(mockCall);
30+
31+
// Act
32+
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);
33+
34+
// Assert - Start the call to trigger the metadata modification
35+
Metadata headers = new Metadata();
36+
interceptedCall.start(mock(ClientCall.Listener.class), headers);
37+
38+
// Verify the invocation ID was added to the headers
39+
assertEquals(testInvocationId, headers.get(INVOCATION_ID_KEY));
40+
}
41+
42+
@Test
43+
public void interceptCall_withNullInvocationId_doesNotAddHeader() {
44+
// Arrange
45+
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null);
46+
47+
Channel mockChannel = mock(Channel.class);
48+
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
49+
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
50+
CallOptions callOptions = CallOptions.DEFAULT;
51+
52+
when(mockChannel.newCall(any(), any())).thenReturn(mockCall);
53+
54+
// Act
55+
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);
56+
57+
// Assert - Start the call to trigger the metadata modification
58+
Metadata headers = new Metadata();
59+
interceptedCall.start(mock(ClientCall.Listener.class), headers);
60+
61+
// Verify no invocation ID was added
62+
assertNull(headers.get(INVOCATION_ID_KEY));
63+
}
64+
65+
@Test
66+
public void interceptCall_withEmptyInvocationId_doesNotAddHeader() {
67+
// Arrange
68+
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("");
69+
70+
Channel mockChannel = mock(Channel.class);
71+
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
72+
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
73+
CallOptions callOptions = CallOptions.DEFAULT;
74+
75+
when(mockChannel.newCall(any(), any())).thenReturn(mockCall);
76+
77+
// Act
78+
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);
79+
80+
// Assert - Start the call to trigger the metadata modification
81+
Metadata headers = new Metadata();
82+
interceptedCall.start(mock(ClientCall.Listener.class), headers);
83+
84+
// Verify no invocation ID was added
85+
assertNull(headers.get(INVOCATION_ID_KEY));
86+
}
87+
88+
@Test
89+
public void constructor_acceptsValidInvocationId() {
90+
// Act & Assert - no exception should be thrown
91+
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("valid-id");
92+
assertNotNull(interceptor);
93+
}
94+
95+
@Test
96+
public void constructor_acceptsNull() {
97+
// Act & Assert - no exception should be thrown
98+
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null);
99+
assertNotNull(interceptor);
100+
}
101+
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
5858
sidecarGrpcChannel = this.managedSidecarChannel;
5959
}
6060

61+
// Apply any interceptors that were configured in the builder
62+
List<ClientInterceptor> interceptors = builder.getInterceptors();
63+
if (!interceptors.isEmpty()) {
64+
sidecarGrpcChannel = ClientInterceptors.intercept(sidecarGrpcChannel, interceptors);
65+
}
66+
6167
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
6268
}
6369

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
package com.microsoft.durabletask;
44

55
import io.grpc.Channel;
6+
import io.grpc.ClientInterceptor;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.List;
611

712
/**
813
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
@@ -13,6 +18,7 @@ public final class DurableTaskGrpcClientBuilder {
1318
int port;
1419
Channel channel;
1520
String defaultVersion;
21+
List<ClientInterceptor> interceptors = new ArrayList<>();
1622

1723
/**
1824
* Sets the {@link DataConverter} to use for converting serializable data payloads.
@@ -65,6 +71,32 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
6571
return this;
6672
}
6773

74+
/**
75+
* Adds a gRPC client interceptor to be applied to all gRPC calls made by the client.
76+
* <p>
77+
* Interceptors can be used to add custom headers, logging, or other cross-cutting concerns
78+
* to gRPC calls. Multiple interceptors can be added and will be applied in the order they
79+
* were added.
80+
*
81+
* @param interceptor the gRPC client interceptor to add
82+
* @return this builder object
83+
*/
84+
public DurableTaskGrpcClientBuilder addInterceptor(ClientInterceptor interceptor) {
85+
if (interceptor != null) {
86+
this.interceptors.add(interceptor);
87+
}
88+
return this;
89+
}
90+
91+
/**
92+
* Gets the list of interceptors that have been added to this builder.
93+
*
94+
* @return an unmodifiable list of interceptors
95+
*/
96+
List<ClientInterceptor> getInterceptors() {
97+
return Collections.unmodifiableList(this.interceptors);
98+
}
99+
68100
/**
69101
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
70102
* @return a new {@link DurableTaskClient} object

0 commit comments

Comments
 (0)