Conversation
There was a problem hiding this comment.
Pull request overview
This pull request implements the Workload-ID feature for Azure Cosmos DB Java SDK, enabling customers to tag their database requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This provides a lightweight alternative to diagnostic logs for monitoring the source of database requests.
Changes:
- Added
customHeaders()method to CosmosClientBuilder for setting client-level custom headers - Made
setHeader()methods public on all request options classes (CosmosItemRequestOptions, CosmosBatchRequestOptions, CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosQueryRequestOptions) for per-request header overrides - Implemented workload-id header support in the RNTBD (Direct mode) protocol layer with proper error handling
- Integrated custom headers feature into Spark connector with JSON-based configuration
- Added comprehensive unit tests and E2E integration tests
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| CosmosClientBuilder.java | Added customHeaders() method for client-level custom headers configuration |
| CosmosItemRequestOptions.java | Made setHeader() public with updated documentation |
| CosmosQueryRequestOptions.java | Added new public setHeader() method that delegates to internal implementation |
| CosmosChangeFeedRequestOptions.java | Made setHeader() public with updated documentation |
| CosmosBulkExecutionOptions.java | Made setHeader() public with updated documentation |
| CosmosBatchRequestOptions.java | Made setHeader() public with updated documentation |
| RxDocumentClientImpl.java | Added constructor overload accepting customHeaders and merged them into request headers |
| AsyncDocumentClient.java | Added Builder support for customHeaders parameter |
| CosmosAsyncClient.java | Passed customHeaders from builder to AsyncDocumentClient |
| RntbdRequestHeaders.java | Added addWorkloadId() method with error handling for parsing and byte conversion |
| RntbdConstants.java | Added WorkloadId enum entry with ID 0x00DC as a Byte type token |
| HttpConstants.java | Added WORKLOAD_ID header constant definition |
| WorkloadIdE2ETests.java | Comprehensive E2E tests covering CRUD, query, and regression scenarios |
| RntbdWorkloadIdTests.java | Unit tests verifying RNTBD header definition and valid/invalid value handling |
| CustomHeadersTests.java | Unit tests for CosmosClientBuilder and request options public API surface |
| CosmosConfig.scala | Added CustomHeaders config entry with JSON parsing for Spark connector |
| CosmosClientConfiguration.scala | Added customHeaders field to configuration case class |
| CosmosClientCache.scala | Applied customHeaders to CosmosClientBuilder and included in cache key |
| SparkE2EWorkloadIdITest.scala | E2E tests for Spark connector read/write with custom headers |
| CosmosClientConfigurationSpec.scala | Unit tests for JSON parsing of customHeaders configuration |
| Multiple test files | Updated test fixtures to include customHeaders: None for backward compatibility |
Comments suppressed due to low confidence (1)
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/WorkloadIdE2ETests.java:327
- Test coverage is missing for batch and bulk operations with workload-id headers. Consider adding E2E tests similar to the existing CRUD tests that verify:
- CosmosBatchRequestOptions.setHeader() works correctly with batch operations
- CosmosBulkExecutionOptions.setHeader() works correctly with bulk operations
This would ensure the workload-id feature works end-to-end for all operation types, not just point operations and queries.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.rx;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
/**
* End-to-end integration tests for the custom headers / workload-id feature.
* <p>
* Test type: EMULATOR INTEGRATION TEST — requires the Cosmos DB Emulator to be running locally.
*/
public class WorkloadIdE2ETests extends TestSuiteBase {
private static final String DATABASE_ID = "workloadIdTestDb-" + UUID.randomUUID();
private static final String CONTAINER_ID = "workloadIdTestContainer-" + UUID.randomUUID();
private CosmosAsyncClient clientWithWorkloadId;
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
public WorkloadIdE2ETests() {
super(new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY));
}
@BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
Map<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "15");
clientWithWorkloadId = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(headers)
.buildAsyncClient();
database = createDatabase(clientWithWorkloadId, DATABASE_ID);
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);
CosmosContainerProperties containerProperties = new CosmosContainerProperties(CONTAINER_ID, partitionKeyDef);
database.createContainer(containerProperties).block();
container = database.getContainer(CONTAINER_ID);
}
/**
* verifies that a create (POST) operation succeeds when the client
* has a workload-id custom header set at the builder level. Confirms the header
* flows through the request pipeline without causing errors.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createItemWithClientLevelWorkloadId() {
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = container
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
}
/**
* Verifies that a read (GET) operation succeeds with the client-level workload-id
* header and that the correct document is returned. Ensures the header does not
* interfere with normal read semantics.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void readItemWithClientLevelWorkloadId() {
// Verify read operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosItemResponse<TestObject> response = container
.readItem(doc.getId(), new PartitionKey(doc.getMypk()), TestObject.class)
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(200);
assertThat(response.getItem().getId()).isEqualTo(doc.getId());
}
/**
* Verifies that a replace (PUT) operation succeeds with the client-level workload-id
* header. Confirms the header propagates correctly for update operations.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void replaceItemWithClientLevelWorkloadId() {
// Verify replace operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
doc.setStringProp("updated-" + UUID.randomUUID());
CosmosItemResponse<TestObject> response = container
.replaceItem(doc, doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(200);
}
/**
* Verifies that a delete operation succeeds with the client-level workload-id header
* and returns the expected 204 No Content status code.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void deleteItemWithClientLevelWorkloadId() {
// Verify delete operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosItemResponse<Object> response = container
.deleteItem(doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(204);
}
/**
* Verifies that a per-request workload-id header override via
* {@code CosmosItemRequestOptions.setHeader()} works. The request-level header
* (value "30") should take precedence over the client-level default (value "15").
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createItemWithRequestLevelWorkloadIdOverride() {
// Verify per-request header override works — request-level should take precedence
TestObject doc = TestObject.create();
CosmosItemRequestOptions options = new CosmosItemRequestOptions()
.setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "30");
CosmosItemResponse<TestObject> response = container
.createItem(doc, new PartitionKey(doc.getMypk()), options)
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
}
/**
* Verifies that a cross-partition query operation succeeds when the client has a
* workload-id custom header. Confirms the header flows correctly through the
* query pipeline and does not affect result correctness.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void queryItemsWithClientLevelWorkloadId() {
// Verify query operation succeeds with workload-id header
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
long count = container
.queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
.collectList()
.block()
.size();
assertThat(count).isGreaterThanOrEqualTo(1);
}
/**
* Verifies that a per-request workload-id header override on
* {@code CosmosQueryRequestOptions.setHeader()} works for query operations.
* The request-level header (value "42") should take precedence over the
* client-level default.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void queryItemsWithRequestLevelWorkloadIdOverride() {
// Verify per-request header override on query options works
TestObject doc = TestObject.create();
container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
.setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "42");
long count = container
.queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
.collectList()
.block()
.size();
assertThat(count).isGreaterThanOrEqualTo(1);
}
/**
* Regression test: verifies that a client created without any custom headers
* continues to work normally. Ensures the custom headers feature does not
* introduce regressions for clients that do not use it.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithNoCustomHeadersStillWorks() {
// Verify that a client without custom headers works normally (no regression)
CosmosAsyncClient clientWithoutHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithoutHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithoutHeaders);
}
}
/**
* Verifies that a client created with an empty custom headers map works normally.
* An empty map should behave identically to no custom headers — no errors,
* no unexpected behavior.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithEmptyCustomHeaders() {
// Verify that a client with empty custom headers map works normally
CosmosAsyncClient clientWithEmptyHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(new HashMap<>())
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithEmptyHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithEmptyHeaders);
}
}
/**
* Verifies that a client can be configured with multiple custom headers simultaneously
* (workload-id plus an additional custom header). Confirms that all headers flow
* through the pipeline without interfering with each other.
*/
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void clientWithMultipleCustomHeaders() {
// Verify that multiple custom headers can be set simultaneously
Map<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "20");
headers.put("x-ms-custom-test-header", "test-value");
CosmosAsyncClient clientWithMultipleHeaders = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.customHeaders(headers)
.buildAsyncClient();
try {
CosmosAsyncContainer c = clientWithMultipleHeaders
.getDatabase(DATABASE_ID)
.getContainer(CONTAINER_ID);
TestObject doc = TestObject.create();
CosmosItemResponse<TestObject> response = c
.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
.block();
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);
} finally {
safeClose(clientWithMultipleHeaders);
}
}
@AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeDeleteDatabase(database);
safeClose(clientWithWorkloadId);
}
}
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java
Show resolved
Hide resolved
...s/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java
Outdated
Show resolved
Hide resolved
.../main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
Outdated
Show resolved
Hide resolved
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
| * <p> | ||
| * If the same header is also set on request options (e.g., | ||
| * {@code CosmosItemRequestOptions.setHeader(String, String)}), | ||
| * the request-level value takes precedence over the client-level value. |
There was a problem hiding this comment.
We should also call out that customHeaders take lower precedence (in javadoc) than headers set through targeted APIs (see: setConsistencyLevel, setReadConsistencyStrategy etc.) and also validate this.
There was a problem hiding this comment.
I think this is a good question, what we want to do when the same header can be set from multiple different places, which one should take priority
I am wondering whether we should just start with only allow work load id header -> for direct mode, if SDK does not know the header, we are just going to drop the header anyway. And allow customer setting important headers through customHeader I feel can cause a lot confusion issues in future
There was a problem hiding this comment.
added allowlist in CosmosClientBuilder as no option can provide true cross-mode flexibility because of how RNTBD works.
There was a problem hiding this comment.
My 2 cents - change the name to additionalHeaders and make it a Map of CosmsoHeaderName, String or so - and have it to be an enum - that way we control which headers can be sert and manage expectations early?
| * @return the CosmosBatchRequestOptions. | ||
| */ | ||
| CosmosBatchRequestOptions setHeader(String name, String value) { | ||
| public CosmosBatchRequestOptions setHeader(String name, String value) { |
There was a problem hiding this comment.
We can add such setter for CosmosReadManyRequestOptions too.
There was a problem hiding this comment.
@dibahlfi - rethinking this, I feel adding a public setCustomHeaders() on request options which takes a map is more consistent with customHeaders() added to CosmosClientBuilder. This way setHeader can continue being package-private and you can also plugin header-filter logic at request options level.
Thoughts @xinlian12 / @FabianMeiswinkel ?
There was a problem hiding this comment.
I think it make sense to be consistent. fixed.
| key = CosmosConfigNames.CustomHeaders, | ||
| mandatory = false, | ||
| parseFromStringFunction = headersJson => { | ||
| val mapper = new com.fasterxml.jackson.databind.ObjectMapper() |
There was a problem hiding this comment.
Is there a reusable instance of ObjectMapper and how are we handling format issues?
There was a problem hiding this comment.
maybe we can just use Utils.simpleObjectMapperAllowingDuplicatedProperties
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/WorkloadIdE2ETests.java
Outdated
Show resolved
Hide resolved
.../main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java
Outdated
Show resolved
Hide resolved
|
/azp run java - cosmos - tests |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
|
/azp run java - cosmos - tests |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
| ### 4.45.0-beta.1 (Unreleased) | ||
|
|
||
| #### Features Added | ||
| * Added `customHeaders` support to allow setting custom HTTP headers (e.g., `x-ms-cosmos-workload-id`) that are sent with every request. - See [PR 48128](https://github.com/Azure/azure-sdk-for-java/pull/48128) |
There was a problem hiding this comment.
nit: x-ms-cosmos-workload-id is an RNTBD header too.
There was a problem hiding this comment.
fixed the changelog description
FabianMeiswinkel
left a comment
There was a problem hiding this comment.
I think the public API needs to express more clearly that this - unlike in many other SDks - is not a random list of headers. Your allow-list hints at that already - but instead of failing at runtime - why not use a strongly typed ocntract - like an enum that can be used to identify the allowed header names? Note that the enum should be non-exhaustive - We have a few of tehse cases like for OCmssoMetricNames aleady - normal enums in Java would make adding new avlues a breakign change (technically)
|
|
||
| // Apply client-level custom headers (e.g., workload-id) to metadata requests | ||
| // Use putIfAbsent to ensure SDK system headers (USER_AGENT, VERSION, etc.) are not overwritten | ||
| if (customHeaders != null && !customHeaders.isEmpty()) { |
There was a problem hiding this comment.
If the intention is to send custom headers to other metadata requests, it can help to modify RxGatewayStoreModel which is the transport layer for metadata requests (address requests being the exception as these are made from GatewayAddressCache itself).
There was a problem hiding this comment.
fixed the comment wording as the code is already doing it.
| * @throws IllegalArgumentException if any header key is not in the allowlist, or if the | ||
| * workload-id value is not a valid integer | ||
| */ | ||
| public CosmosClientBuilder customHeaders(Map<String, String> customHeaders) { |
There was a problem hiding this comment.
An in-general best practice here would be to create a deep-copy of customHeaders.
There was a problem hiding this comment.
fixed by not storing the caller's reference
| * Only headers with RNTBD encoding support are included in this enum, ensuring consistent | ||
| * behavior across both Gateway mode (HTTP) and Direct mode (RNTBD binary protocol). | ||
| */ | ||
| public enum CosmosHeaderName { |
There was a problem hiding this comment.
The issue with using an enum is binary compatibility — when new constants are added to CosmosHeaderName in a future release, pre-compiled customer code that switches over the enum values can behave unexpectedly (the compiler generates a synthetic ordinal lookup table baked into the customer's .class file). The Azure SDK Java Guidelines recommend against using Java enum for extensible value sets. CosmosMetricName, CosmosMetricCategory, and CosmosMetricTagName all use the non-exhaustive final class pattern for this reason — CosmosHeaderName should follow the same pattern. See - https://azure.github.io/azure-sdk/java_introduction.html#enumerations
| * @return the matching {@link CosmosHeaderName} | ||
| * @throws IllegalArgumentException if the header name does not match any known enum value | ||
| */ | ||
| public static CosmosHeaderName fromString(String headerName) { |
There was a problem hiding this comment.
Does fromString and validateAdditionalHeaders need to be public API? Can ImplementationBridgeHelper be used?
There was a problem hiding this comment.
I reviewed both methods:
fromString — This must stay public. It's called from the Spark connector which is a separate module (azure-cosmos-spark_3). The Spark connector parses header names as strings from user JSON configuration and needs fromString to convert them to CosmosHeaderName enum values before calling builder.additionalHeaders(enumHeaders).
validateAdditionalHeaders — you are right that this is only called internally . It could technically be made non-public via ImplementationBridgeHelper. However, I would lean toward keeping it public:
The ImplementationBridgeHelper pattern is designed for instance methods on classes — using it for a static validation method on an enum adds significant boilerplate for minimal encapsulation benefit.
The method is a pure validation utility — it's idempotent, stateless, and safe for anyone to call. Exposing it doesn't leak internal state or create a backward-compatibility burden.
That said, if you feel strongly about minimizing the public API surface, I can make the change. Let me know your preference.
| httpClient, | ||
| ApiType.SQL); | ||
| ApiType.SQL, | ||
| null); |
There was a problem hiding this comment.
nit: additionalHeaders can be wired in ThinClientStoreModel's constructor as well (nit because I'm assuming for data-plane requests, every such request already has additionalHeaders wiring already. Is this the right understanding?
Deep Review Summary — PR #48128 (Workload-ID Feature)Overall: Solid feature implementation with excellent test coverage and well-designed header precedence. However, there are important gaps — a missing field in the builder clone utility, incomplete Existing comments: 30+ from reviewers. 2 of my findings reinforce unresolved feedback from @jeet1995. Findings (11 total)
Details posted as inline comments below. |
| * @return current CosmosClientBuilder | ||
| * @throws IllegalArgumentException if the workload-id value is not a valid integer | ||
| */ | ||
| public CosmosClientBuilder additionalHeaders(Map<CosmosHeaderName, String> additionalHeaders) { |
There was a problem hiding this comment.
🔴 Blocking · Correctness: Missing Field in Clone
CosmosBridgeInternal.cloneCosmosClientBuilder() does not copy additionalHeaders
CosmosBridgeInternal.cloneCosmosClientBuilder() copies endpoint, key, credentials, connection configs, consistency, preferred regions, throttling, etc. — but does NOT copy additionalHeaders. Any code path that clones a builder with workload-id configured will silently lose the header.
The method is called by TestSuiteBase.copyCosmosClientBuilder() (used in this PR's own tests) and could be used by production code paths. The test verifyNoWorkloadIdHeaderWhenNotConfigured passes by accident because the base builder from the data provider never has additionalHeaders set.
Fix: Add this to CosmosBridgeInternal.cloneCosmosClientBuilder():
java if (builder.getAdditionalHeadersRaw() != null) { copy.additionalHeaders(builder.getAdditionalHeadersRaw()); }
| * The SDK validates that the value is a valid integer but does not enforce range limits — | ||
| * range validation is the backend's responsibility. | ||
| */ | ||
| WORKLOAD_ID(HttpConstants.HttpHeaders.WORKLOAD_ID); |
There was a problem hiding this comment.
🟡 Recommendation · API Design: Enum vs Final Class
[Already posted by @jeet1995 — Unresolved]
CosmosHeaderName should use the non-exhaustive final class pattern, not a Java enum
The Azure SDK Java Guidelines recommend against Java enums for extensible value sets. When new enum constants are added in future releases, pre-compiled customer code that uses switch over the enum values will break (the compiler generates a synthetic ordinal lookup table baked into the customer's .class file).
Other Cosmos types — CosmosMetricName, CosmosMetricCategory, CosmosMetricTagName — all use the non-exhaustive final class pattern for exactly this reason. CosmosHeaderName is a public API that ships to customers and cannot be changed from enum to final class once released without a breaking change.
@jeet1995 raised this concern earlier. The author's response about binary compatibility actually supports using a final class (enums are the ones with the switch-table problem). This should be resolved before merge.
| globalEndpointManager, | ||
| httpClient, | ||
| ApiType.SQL); | ||
| ApiType.SQL, |
There was a problem hiding this comment.
🟡 Recommendation · Correctness: Missing Feature Wiring
[Already posted by @jeet1995 — Unresolved]
ThinClientStoreModel passes null for additionalHeaders, silently dropping workload-id
The constructor passes null to RxGatewayStoreModel for additionalHeaders. The createThinProxy() method in RxDocumentClientImpl does not pass this.additionalHeaders either — it doesn't take the parameter.
Customers using thin client mode who configure workload-id on CosmosClientBuilder will have it silently dropped on all thin client requests. Either wire additionalHeaders through or document this as a known limitation.
.../main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/AdditionalHeadersTests.java
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosHeaderName.java
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosClientCache.scala
Show resolved
Hide resolved
|
🔴 Blocking · Completeness: Missing
The PR adds
But it is missing on these classes:
Client-level Per-item options ( |
| parseFromStringFunction = headersJson => { | ||
| try { | ||
| val typeRef = new com.fasterxml.jackson.core.`type`.TypeReference[java.util.Map[String, String]]() {} | ||
| Utils.getSimpleObjectMapperWithAllowDuplicates.readValue(headersJson, typeRef).asScala.toMap |
There was a problem hiding this comment.
🔴 Blocking · Correctness: Deferred Validation
Unknown header names in spark.cosmos.additionalHeaders are accepted at parse time but blow up at runtime
The AdditionalHeadersConfig parser uses Jackson to deserialize the JSON string into a Map[String, String]. This succeeds for ANY valid JSON key-value pairs — no header name validation happens here.
The actual validation occurs later in CosmosClientCache.scala when CosmosHeaderName.fromString(key) is called during client creation:
scala for ((key, value) <- cosmosClientConfiguration.additionalHeaders.get) { enumHeaders.put(CosmosHeaderName.fromString(key), value) // throws here } ``n This means a Spark job with spark.cosmos.additionalHeaders = {x-bad-header: value}` will:
- Start successfully and allocate cluster resources
- Set up partitioning and begin task scheduling
- Fail at runtime on the first data operation with
IllegalArgumentException: Unknown header
The test reject unknown additional headers is misleadingly named — it actually asserts parsing succeeds and notes in a comment that validation happens later.
There was a problem hiding this comment.
instead of blocking, this probably should be recommendation comment
| // Address resolution is the one metadata path that bypasses RxGatewayStoreModel (which | ||
| // handles all other metadata requests) — GatewayAddressCache calls httpClient.send() directly. | ||
| // Use putIfAbsent to ensure SDK system headers (USER_AGENT, VERSION, etc.) are not overwritten. | ||
| if (additionalHeaders != null && !additionalHeaders.isEmpty()) { |
There was a problem hiding this comment.
I guess this is a weird question -> because not all requests will trigger address refresh flow. But what if the workload id header is configured on request option, should we also populate/pass down to address refresh?
There was a problem hiding this comment.
IMO Address resolution results are cached and shared across requests.
Request A with workload-id=15 triggers address resolution -> cached
Request B with workload-id=20 reuses the cached result — no address call at all
The address call attribution would be inconsistent and misleading
The client-level workload-id is the correct attribution. For Jarvis monitoring, the purpose of workload-id on metadata calls is to identify which client/application is generating excessive PKRange or collection-read traffic. The client-level header already serves this purpose perfectly — it identifies the client instance causing the address resolution load.
There was a problem hiding this comment.
but what if the workload id is not configured on client level - it is optional there?
There was a problem hiding this comment.
Yes, If the customer deliberately chose not to set workload-id at the client level, that's an explicit decision. They're saying "my client doesn't have a default workload identity."
IMO Per-request override is for the user's actual operation, not for internal SDK plumbing. Address resolution is SDK-internal — the customer didn't ask for it and doesn't control when it happens.
The same caching argument still applies: address resolution is cached and shared across requests. Tagging the cache-fill with one caller's per-request header would be misleading.
What do you think?
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
@sdkReviewAgent |
|
sdkReviewAgent | Status: ⏳ Queued Review requested by @xinlian12. I'll start shortly. |
|
sdkReviewAgent | Status: 🔍 Reviewing I'm reviewing this PR now. I'll post my findings as comments when done. |
| // Without this, unknown headers like {"x-bad-header": "value"} would parse successfully | ||
| // and only blow up at runtime in CosmosClientCache when CosmosHeaderName.fromString() is called. | ||
| for (key <- parsed.keys) { | ||
| CosmosHeaderName.fromString(key) // throws IllegalArgumentException for unknown headers |
There was a problem hiding this comment.
🟢 Suggestion · Fail-Fast Gap: Header values not validated at Spark config parse time
The AdditionalHeadersConfig parser validates header names at parse time (via CosmosHeaderName.fromString(key)) but does not validate header values. A Spark config like {"x-ms-cosmos-workload-id": "abc"} will pass parsing successfully, start the job, allocate executors, and only fail when CosmosClientBuilder.additionalHeaders() calls CosmosHeaderName.validateAdditionalHeaders() on the executor — at which point cluster resources are already allocated.
The code comment above explicitly states the fail-fast goal: "validate every header name is a known CosmosHeaderName at parse time … rather than at runtime during client creation." The same reasoning applies to values.
Suggested fix — after the name validation loop, convert and validate values too:
for (key <- parsed.keys) {
CosmosHeaderName.fromString(key)
}
// Also validate values at parse time to complete fail-fast coverage
val headerMap = new java.util.HashMap[CosmosHeaderName, String]()
for ((key, value) <- parsed) {
headerMap.put(CosmosHeaderName.fromString(key), value)
}
CosmosHeaderName.validateAdditionalHeaders(headerMap)|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
The Workload ID feature allows customers to tag their Cosmos DB requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This enables customers to identify and monitor the source of their database requests without needing to enable expensive diagnostic logs.