Skip to content

Workload-ID feature#48128

Open
dibahlfi wants to merge 15 commits intomainfrom
users/dibahl/workload-id-feature
Open

Workload-ID feature#48128
dibahlfi wants to merge 15 commits intomainfrom
users/dibahl/workload-id-feature

Conversation

@dibahlfi
Copy link
Member

The Workload ID feature allows customers to tag their Cosmos DB requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This enables customers to identify and monitor the source of their database requests without needing to enable expensive diagnostic logs.

Copilot AI review requested due to automatic review settings February 26, 2026 01:00
@dibahlfi dibahlfi requested review from a team and kirankumarkolli as code owners February 26, 2026 01:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements the Workload-ID feature for Azure Cosmos DB Java SDK, enabling customers to tag their database requests with a numeric identifier (1-50) that flows through to Azure Monitor metrics. This provides a lightweight alternative to diagnostic logs for monitoring the source of database requests.

Changes:

  • Added customHeaders() method to CosmosClientBuilder for setting client-level custom headers
  • Made setHeader() methods public on all request options classes (CosmosItemRequestOptions, CosmosBatchRequestOptions, CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosQueryRequestOptions) for per-request header overrides
  • Implemented workload-id header support in the RNTBD (Direct mode) protocol layer with proper error handling
  • Integrated custom headers feature into Spark connector with JSON-based configuration
  • Added comprehensive unit tests and E2E integration tests

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
CosmosClientBuilder.java Added customHeaders() method for client-level custom headers configuration
CosmosItemRequestOptions.java Made setHeader() public with updated documentation
CosmosQueryRequestOptions.java Added new public setHeader() method that delegates to internal implementation
CosmosChangeFeedRequestOptions.java Made setHeader() public with updated documentation
CosmosBulkExecutionOptions.java Made setHeader() public with updated documentation
CosmosBatchRequestOptions.java Made setHeader() public with updated documentation
RxDocumentClientImpl.java Added constructor overload accepting customHeaders and merged them into request headers
AsyncDocumentClient.java Added Builder support for customHeaders parameter
CosmosAsyncClient.java Passed customHeaders from builder to AsyncDocumentClient
RntbdRequestHeaders.java Added addWorkloadId() method with error handling for parsing and byte conversion
RntbdConstants.java Added WorkloadId enum entry with ID 0x00DC as a Byte type token
HttpConstants.java Added WORKLOAD_ID header constant definition
WorkloadIdE2ETests.java Comprehensive E2E tests covering CRUD, query, and regression scenarios
RntbdWorkloadIdTests.java Unit tests verifying RNTBD header definition and valid/invalid value handling
CustomHeadersTests.java Unit tests for CosmosClientBuilder and request options public API surface
CosmosConfig.scala Added CustomHeaders config entry with JSON parsing for Spark connector
CosmosClientConfiguration.scala Added customHeaders field to configuration case class
CosmosClientCache.scala Applied customHeaders to CosmosClientBuilder and included in cache key
SparkE2EWorkloadIdITest.scala E2E tests for Spark connector read/write with custom headers
CosmosClientConfigurationSpec.scala Unit tests for JSON parsing of customHeaders configuration
Multiple test files Updated test fixtures to include customHeaders: None for backward compatibility
Comments suppressed due to low confidence (1)

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/WorkloadIdE2ETests.java:327

  • Test coverage is missing for batch and bulk operations with workload-id headers. Consider adding E2E tests similar to the existing CRUD tests that verify:
  1. CosmosBatchRequestOptions.setHeader() works correctly with batch operations
  2. CosmosBulkExecutionOptions.setHeader() works correctly with bulk operations

This would ensure the workload-id feature works end-to-end for all operation types, not just point operations and queries.

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.rx;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * End-to-end integration tests for the custom headers / workload-id feature.
 * <p>
 * Test type: EMULATOR INTEGRATION TEST — requires the Cosmos DB Emulator to be running locally.
 */
public class WorkloadIdE2ETests extends TestSuiteBase {

    private static final String DATABASE_ID = "workloadIdTestDb-" + UUID.randomUUID();
    private static final String CONTAINER_ID = "workloadIdTestContainer-" + UUID.randomUUID();

    private CosmosAsyncClient clientWithWorkloadId;
    private CosmosAsyncDatabase database;
    private CosmosAsyncContainer container;

    public WorkloadIdE2ETests() {
        super(new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY));
    }

    @BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT)
    public void beforeClass() {
        Map<String, String> headers = new HashMap<>();
        headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "15");

        clientWithWorkloadId = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(headers)
            .buildAsyncClient();

        database = createDatabase(clientWithWorkloadId, DATABASE_ID);

        PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
        ArrayList<String> paths = new ArrayList<>();
        paths.add("/mypk");
        partitionKeyDef.setPaths(paths);
        CosmosContainerProperties containerProperties = new CosmosContainerProperties(CONTAINER_ID, partitionKeyDef);
        database.createContainer(containerProperties).block();
        container = database.getContainer(CONTAINER_ID);
    }

    /**
     * verifies that a create (POST) operation succeeds when the client
     * has a workload-id custom header set at the builder level. Confirms the header
     * flows through the request pipeline without causing errors.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void createItemWithClientLevelWorkloadId() {
        TestObject doc = TestObject.create();

        CosmosItemResponse<TestObject> response = container
            .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(201);
    }

    /**
     * Verifies that a read (GET) operation succeeds with the client-level workload-id
     * header and that the correct document is returned. Ensures the header does not
     * interfere with normal read semantics.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void readItemWithClientLevelWorkloadId() {
        // Verify read operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosItemResponse<TestObject> response = container
            .readItem(doc.getId(), new PartitionKey(doc.getMypk()), TestObject.class)
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(200);
        assertThat(response.getItem().getId()).isEqualTo(doc.getId());
    }

    /**
     * Verifies that a replace (PUT) operation succeeds with the client-level workload-id
     * header. Confirms the header propagates correctly for update operations.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void replaceItemWithClientLevelWorkloadId() {
        // Verify replace operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        doc.setStringProp("updated-" + UUID.randomUUID());
        CosmosItemResponse<TestObject> response = container
            .replaceItem(doc, doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(200);
    }

    /**
     * Verifies that a delete operation succeeds with the client-level workload-id header
     * and returns the expected 204 No Content status code.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void deleteItemWithClientLevelWorkloadId() {
        // Verify delete operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosItemResponse<Object> response = container
            .deleteItem(doc.getId(), new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(204);
    }

    /**
     * Verifies that a per-request workload-id header override via
     * {@code CosmosItemRequestOptions.setHeader()} works. The request-level header
     * (value "30") should take precedence over the client-level default (value "15").
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void createItemWithRequestLevelWorkloadIdOverride() {
        // Verify per-request header override works — request-level should take precedence
        TestObject doc = TestObject.create();

        CosmosItemRequestOptions options = new CosmosItemRequestOptions()
            .setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "30");

        CosmosItemResponse<TestObject> response = container
            .createItem(doc, new PartitionKey(doc.getMypk()), options)
            .block();

        assertThat(response).isNotNull();
        assertThat(response.getStatusCode()).isEqualTo(201);
    }

    /**
     * Verifies that a cross-partition query operation succeeds when the client has a
     * workload-id custom header. Confirms the header flows correctly through the
     * query pipeline and does not affect result correctness.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void queryItemsWithClientLevelWorkloadId() {
        // Verify query operation succeeds with workload-id header
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
        long count = container
            .queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
            .collectList()
            .block()
            .size();

        assertThat(count).isGreaterThanOrEqualTo(1);
    }

    /**
     * Verifies that a per-request workload-id header override on
     * {@code CosmosQueryRequestOptions.setHeader()} works for query operations.
     * The request-level header (value "42") should take precedence over the
     * client-level default.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void queryItemsWithRequestLevelWorkloadIdOverride() {
        // Verify per-request header override on query options works
        TestObject doc = TestObject.create();
        container.createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions()).block();

        CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions()
            .setHeader(HttpConstants.HttpHeaders.WORKLOAD_ID, "42");

        long count = container
            .queryItems("SELECT * FROM c WHERE c.id = '" + doc.getId() + "'", queryOptions, TestObject.class)
            .collectList()
            .block()
            .size();

        assertThat(count).isGreaterThanOrEqualTo(1);
    }

    /**
     * Regression test: verifies that a client created without any custom headers
     * continues to work normally. Ensures the custom headers feature does not
     * introduce regressions for clients that do not use it.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithNoCustomHeadersStillWorks() {
        // Verify that a client without custom headers works normally (no regression)
        CosmosAsyncClient clientWithoutHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithoutHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithoutHeaders);
        }
    }

    /**
     * Verifies that a client created with an empty custom headers map works normally.
     * An empty map should behave identically to no custom headers — no errors,
     * no unexpected behavior.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithEmptyCustomHeaders() {
        // Verify that a client with empty custom headers map works normally
        CosmosAsyncClient clientWithEmptyHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(new HashMap<>())
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithEmptyHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithEmptyHeaders);
        }
    }

    /**
     * Verifies that a client can be configured with multiple custom headers simultaneously
     * (workload-id plus an additional custom header). Confirms that all headers flow
     * through the pipeline without interfering with each other.
     */
    @Test(groups = { "emulator" }, timeOut = TIMEOUT)
    public void clientWithMultipleCustomHeaders() {
        // Verify that multiple custom headers can be set simultaneously
        Map<String, String> headers = new HashMap<>();
        headers.put(HttpConstants.HttpHeaders.WORKLOAD_ID, "20");
        headers.put("x-ms-custom-test-header", "test-value");

        CosmosAsyncClient clientWithMultipleHeaders = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .customHeaders(headers)
            .buildAsyncClient();

        try {
            CosmosAsyncContainer c = clientWithMultipleHeaders
                .getDatabase(DATABASE_ID)
                .getContainer(CONTAINER_ID);

            TestObject doc = TestObject.create();
            CosmosItemResponse<TestObject> response = c
                .createItem(doc, new PartitionKey(doc.getMypk()), new CosmosItemRequestOptions())
                .block();

            assertThat(response).isNotNull();
            assertThat(response.getStatusCode()).isEqualTo(201);
        } finally {
            safeClose(clientWithMultipleHeaders);
        }
    }

    @AfterClass(groups = { "emulator" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
    public void afterClass() {
        safeDeleteDatabase(database);
        safeClose(clientWithWorkloadId);
    }
}


@github-actions
Copy link
Contributor

github-actions bot commented Feb 26, 2026

API Change Check

APIView identified API level changes in this PR and created the following API reviews

com.azure:azure-cosmos

* <p>
* If the same header is also set on request options (e.g.,
* {@code CosmosItemRequestOptions.setHeader(String, String)}),
* the request-level value takes precedence over the client-level value.
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also call out that customHeaders take lower precedence (in javadoc) than headers set through targeted APIs (see: setConsistencyLevel, setReadConsistencyStrategy etc.) and also validate this.

Copy link
Member

@xinlian12 xinlian12 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good question, what we want to do when the same header can be set from multiple different places, which one should take priority

I am wondering whether we should just start with only allow work load id header -> for direct mode, if SDK does not know the header, we are just going to drop the header anyway. And allow customer setting important headers through customHeader I feel can cause a lot confusion issues in future

Copy link
Member Author

@dibahlfi dibahlfi Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added allowlist in CosmosClientBuilder as no option can provide true cross-mode flexibility because of how RNTBD works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 cents - change the name to additionalHeaders and make it a Map of CosmsoHeaderName, String or so - and have it to be an enum - that way we control which headers can be sert and manage expectations early?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

* @return the CosmosBatchRequestOptions.
*/
CosmosBatchRequestOptions setHeader(String name, String value) {
public CosmosBatchRequestOptions setHeader(String name, String value) {
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add such setter for CosmosReadManyRequestOptions too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@jeet1995 jeet1995 Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dibahlfi - rethinking this, I feel adding a public setCustomHeaders() on request options which takes a map is more consistent with customHeaders() added to CosmosClientBuilder. This way setHeader can continue being package-private and you can also plugin header-filter logic at request options level.

Thoughts @xinlian12 / @FabianMeiswinkel ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to be consistent. fixed.

key = CosmosConfigNames.CustomHeaders,
mandatory = false,
parseFromStringFunction = headersJson => {
val mapper = new com.fasterxml.jackson.databind.ObjectMapper()
Copy link
Member

@jeet1995 jeet1995 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reusable instance of ObjectMapper and how are we handling format issues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can just use Utils.simpleObjectMapperAllowingDuplicatedProperties

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@dibahlfi
Copy link
Member Author

dibahlfi commented Mar 1, 2026

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command.

@dibahlfi
Copy link
Member Author

dibahlfi commented Mar 1, 2026

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command.

@dibahlfi
Copy link
Member Author

dibahlfi commented Mar 2, 2026

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@dibahlfi
Copy link
Member Author

dibahlfi commented Mar 2, 2026

/azp run java - cosmos - spark

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

### 4.45.0-beta.1 (Unreleased)

#### Features Added
* Added `customHeaders` support to allow setting custom HTTP headers (e.g., `x-ms-cosmos-workload-id`) that are sent with every request. - See [PR 48128](https://github.com/Azure/azure-sdk-for-java/pull/48128)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: x-ms-cosmos-workload-id is an RNTBD header too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the changelog description

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the public API needs to express more clearly that this - unlike in many other SDks - is not a random list of headers. Your allow-list hints at that already - but instead of failing at runtime - why not use a strongly typed ocntract - like an enum that can be used to identify the allowed header names? Note that the enum should be non-exhaustive - We have a few of tehse cases like for OCmssoMetricNames aleady - normal enums in Java would make adding new avlues a breakign change (technically)


// Apply client-level custom headers (e.g., workload-id) to metadata requests
// Use putIfAbsent to ensure SDK system headers (USER_AGENT, VERSION, etc.) are not overwritten
if (customHeaders != null && !customHeaders.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intention is to send custom headers to other metadata requests, it can help to modify RxGatewayStoreModel which is the transport layer for metadata requests (address requests being the exception as these are made from GatewayAddressCache itself).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the comment wording as the code is already doing it.

* @throws IllegalArgumentException if any header key is not in the allowlist, or if the
* workload-id value is not a valid integer
*/
public CosmosClientBuilder customHeaders(Map<String, String> customHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An in-general best practice here would be to create a deep-copy of customHeaders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by not storing the caller's reference

* Only headers with RNTBD encoding support are included in this enum, ensuring consistent
* behavior across both Gateway mode (HTTP) and Direct mode (RNTBD binary protocol).
*/
public enum CosmosHeaderName {
Copy link
Member

@jeet1995 jeet1995 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with using an enum is binary compatibility — when new constants are added to CosmosHeaderName in a future release, pre-compiled customer code that switches over the enum values can behave unexpectedly (the compiler generates a synthetic ordinal lookup table baked into the customer's .class file). The Azure SDK Java Guidelines recommend against using Java enum for extensible value sets. CosmosMetricName, CosmosMetricCategory, and CosmosMetricTagName all use the non-exhaustive final class pattern for this reason — CosmosHeaderName should follow the same pattern. See - https://azure.github.io/azure-sdk/java_introduction.html#enumerations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @return the matching {@link CosmosHeaderName}
* @throws IllegalArgumentException if the header name does not match any known enum value
*/
public static CosmosHeaderName fromString(String headerName) {
Copy link
Member

@jeet1995 jeet1995 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does fromString and validateAdditionalHeaders need to be public API? Can ImplementationBridgeHelper be used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed both methods:
fromString — This must stay public. It's called from the Spark connector which is a separate module (azure-cosmos-spark_3). The Spark connector parses header names as strings from user JSON configuration and needs fromString to convert them to CosmosHeaderName enum values before calling builder.additionalHeaders(enumHeaders).

validateAdditionalHeaders — you are right that this is only called internally . It could technically be made non-public via ImplementationBridgeHelper. However, I would lean toward keeping it public:

The ImplementationBridgeHelper pattern is designed for instance methods on classes — using it for a static validation method on an enum adds significant boilerplate for minimal encapsulation benefit.
The method is a pure validation utility — it's idempotent, stateless, and safe for anyone to call. Exposing it doesn't leak internal state or create a backward-compatibility burden.
That said, if you feel strongly about minimizing the public API surface, I can make the change. Let me know your preference.

httpClient,
ApiType.SQL);
ApiType.SQL,
null);
Copy link
Member

@jeet1995 jeet1995 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: additionalHeaders can be wired in ThinClientStoreModel's constructor as well (nit because I'm assuming for data-plane requests, every such request already has additionalHeaders wiring already. Is this the right understanding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@xinlian12
Copy link
Member

Deep Review Summary — PR #48128 (Workload-ID Feature)

Overall: Solid feature implementation with excellent test coverage and well-designed header precedence. However, there are important gaps — a missing field in the builder clone utility, incomplete setAdditionalHeaders coverage across request options classes, and an unresolved API design question about enum vs. final-class pattern.

Existing comments: 30+ from reviewers. 2 of my findings reinforce unresolved feedback from @jeet1995.

Findings (11 total)

# Severity Finding
1 🔴 Blocking CosmosBridgeInternal.cloneCosmosClientBuilder() doesn't copy additionalHeaders
2 🟡 Recommendation CosmosHeaderName should be final class, not Java enum (reinforces @jeet1995)
3 🟡 Recommendation ThinClientStoreModel passes null for additionalHeaders (reinforces @jeet1995)
4 🟡 Recommendation RNTBD byte truncation without validation for values > 127
5 🟡 Recommendation Constructor parameter explosion in RxDocumentClientImpl
6 🟢 Suggestion Weak test assertions — setAdditionalHeaders tests only check isNotNull()
7 🟢 Suggestion fromString() linear scan could use static map
8 🟢 Suggestion Over-commenting self-explanatory code
9 💬 Observation Excellent test coverage across all modes
10 💬 Observation Header precedence design is correct
11 🔴 Blocking setAdditionalHeaders missing on control-plane and stored-procedure request options

Details posted as inline comments below.


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

* @return current CosmosClientBuilder
* @throws IllegalArgumentException if the workload-id value is not a valid integer
*/
public CosmosClientBuilder additionalHeaders(Map<CosmosHeaderName, String> additionalHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Blocking · Correctness: Missing Field in Clone

CosmosBridgeInternal.cloneCosmosClientBuilder() does not copy additionalHeaders

CosmosBridgeInternal.cloneCosmosClientBuilder() copies endpoint, key, credentials, connection configs, consistency, preferred regions, throttling, etc. — but does NOT copy additionalHeaders. Any code path that clones a builder with workload-id configured will silently lose the header.

The method is called by TestSuiteBase.copyCosmosClientBuilder() (used in this PR's own tests) and could be used by production code paths. The test verifyNoWorkloadIdHeaderWhenNotConfigured passes by accident because the base builder from the data provider never has additionalHeaders set.

Fix: Add this to CosmosBridgeInternal.cloneCosmosClientBuilder():

java if (builder.getAdditionalHeadersRaw() != null) { copy.additionalHeaders(builder.getAdditionalHeadersRaw()); }


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* The SDK validates that the value is a valid integer but does not enforce range limits —
* range validation is the backend's responsibility.
*/
WORKLOAD_ID(HttpConstants.HttpHeaders.WORKLOAD_ID);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation · API Design: Enum vs Final Class

[Already posted by @jeet1995 — Unresolved]

CosmosHeaderName should use the non-exhaustive final class pattern, not a Java enum

The Azure SDK Java Guidelines recommend against Java enums for extensible value sets. When new enum constants are added in future releases, pre-compiled customer code that uses switch over the enum values will break (the compiler generates a synthetic ordinal lookup table baked into the customer's .class file).

Other Cosmos types — CosmosMetricName, CosmosMetricCategory, CosmosMetricTagName — all use the non-exhaustive final class pattern for exactly this reason. CosmosHeaderName is a public API that ships to customers and cannot be changed from enum to final class once released without a breaking change.

@jeet1995 raised this concern earlier. The author's response about binary compatibility actually supports using a final class (enums are the ones with the switch-table problem). This should be resolved before merge.


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

globalEndpointManager,
httpClient,
ApiType.SQL);
ApiType.SQL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation · Correctness: Missing Feature Wiring

[Already posted by @jeet1995 — Unresolved]

ThinClientStoreModel passes null for additionalHeaders, silently dropping workload-id

The constructor passes null to RxGatewayStoreModel for additionalHeaders. The createThinProxy() method in RxDocumentClientImpl does not pass this.additionalHeaders either — it doesn't take the parameter.

Customers using thin client mode who configure workload-id on CosmosClientBuilder will have it silently dropped on all thin client requests. Either wire additionalHeaders through or document this as a known limitation.


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@xinlian12
Copy link
Member

🔴 Blocking · Completeness: Missing setAdditionalHeaders on Request Options Classes

setAdditionalHeaders() is not exposed on control-plane and stored-procedure request options

The PR adds setAdditionalHeaders(Map<CosmosHeaderName, String>) to 6 request options classes:

  • CosmosItemRequestOptions (also inherited by CosmosPatchItemRequestOptions)
  • CosmosBatchRequestOptions
  • CosmosBulkExecutionOptions
  • CosmosChangeFeedRequestOptions
  • CosmosQueryRequestOptions
  • CosmosReadManyRequestOptions

But it is missing on these classes:

  • CosmosStoredProcedureRequestOptionsstored procedure execution is a data-plane operation where per-request workload-id tagging is relevant
  • CosmosContainerRequestOptions — container CRUD
  • CosmosDatabaseRequestOptions — database CRUD
  • CosmosPermissionRequestOptions — permission management
  • CosmosConflictRequestOptions — conflict resolution

Client-level additionalHeaders still flow to ALL operations via getRequestHeaders(), so the workload-id IS sent. But customers cannot override per-request on these operation types. CosmosStoredProcedureRequestOptions is the most impactful gap since sproc execution is a data-plane operation where different stored procs may represent different workloads.

Per-item options (CosmosBatchItemRequestOptions, CosmosBulkItemRequestOptions) are correctly excluded — workload-id is per-request, not per-item.


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

parseFromStringFunction = headersJson => {
try {
val typeRef = new com.fasterxml.jackson.core.`type`.TypeReference[java.util.Map[String, String]]() {}
Utils.getSimpleObjectMapperWithAllowDuplicates.readValue(headersJson, typeRef).asScala.toMap
Copy link
Member

@xinlian12 xinlian12 Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Blocking · Correctness: Deferred Validation

Unknown header names in spark.cosmos.additionalHeaders are accepted at parse time but blow up at runtime

The AdditionalHeadersConfig parser uses Jackson to deserialize the JSON string into a Map[String, String]. This succeeds for ANY valid JSON key-value pairs — no header name validation happens here.

The actual validation occurs later in CosmosClientCache.scala when CosmosHeaderName.fromString(key) is called during client creation:

scala for ((key, value) <- cosmosClientConfiguration.additionalHeaders.get) { enumHeaders.put(CosmosHeaderName.fromString(key), value) // throws here } ``n This means a Spark job with spark.cosmos.additionalHeaders = {x-bad-header: value}` will:

  1. Start successfully and allocate cluster resources
  2. Set up partitioning and begin task scheduling
  3. Fail at runtime on the first data operation with IllegalArgumentException: Unknown header

The test reject unknown additional headers is misleadingly named — it actually asserts parsing succeeds and notes in a comment that validation happens later.


⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of blocking, this probably should be recommendation comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// Address resolution is the one metadata path that bypasses RxGatewayStoreModel (which
// handles all other metadata requests) — GatewayAddressCache calls httpClient.send() directly.
// Use putIfAbsent to ensure SDK system headers (USER_AGENT, VERSION, etc.) are not overwritten.
if (additionalHeaders != null && !additionalHeaders.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a weird question -> because not all requests will trigger address refresh flow. But what if the workload id header is configured on request option, should we also populate/pass down to address refresh?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO Address resolution results are cached and shared across requests.
Request A with workload-id=15 triggers address resolution -> cached
Request B with workload-id=20 reuses the cached result — no address call at all
The address call attribution would be inconsistent and misleading

The client-level workload-id is the correct attribution. For Jarvis monitoring, the purpose of workload-id on metadata calls is to identify which client/application is generating excessive PKRange or collection-read traffic. The client-level header already serves this purpose perfectly — it identifies the client instance causing the address resolution load.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but what if the workload id is not configured on client level - it is optional there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, If the customer deliberately chose not to set workload-id at the client level, that's an explicit decision. They're saying "my client doesn't have a default workload identity."
IMO Per-request override is for the user's actual operation, not for internal SDK plumbing. Address resolution is SDK-internal — the customer didn't ask for it and doesn't control when it happens.
The same caching argument still applies: address resolution is cached and shared across requests. Tagging the cache-fill with one caller's per-request header would be misleading.
What do you think?

@dibahlfi
Copy link
Member Author

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member

@sdkReviewAgent

@xinlian12
Copy link
Member

sdkReviewAgent | Status: ⏳ Queued

Review requested by @xinlian12. I'll start shortly.

@xinlian12
Copy link
Member

sdkReviewAgent | Status: 🔍 Reviewing

I'm reviewing this PR now. I'll post my findings as comments when done.

// Without this, unknown headers like {"x-bad-header": "value"} would parse successfully
// and only blow up at runtime in CosmosClientCache when CosmosHeaderName.fromString() is called.
for (key <- parsed.keys) {
CosmosHeaderName.fromString(key) // throws IllegalArgumentException for unknown headers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion · Fail-Fast Gap: Header values not validated at Spark config parse time

The AdditionalHeadersConfig parser validates header names at parse time (via CosmosHeaderName.fromString(key)) but does not validate header values. A Spark config like {"x-ms-cosmos-workload-id": "abc"} will pass parsing successfully, start the job, allocate executors, and only fail when CosmosClientBuilder.additionalHeaders() calls CosmosHeaderName.validateAdditionalHeaders() on the executor — at which point cluster resources are already allocated.

The code comment above explicitly states the fail-fast goal: "validate every header name is a known CosmosHeaderName at parse time … rather than at runtime during client creation." The same reasoning applies to values.

Suggested fix — after the name validation loop, convert and validate values too:

for (key <- parsed.keys) {
  CosmosHeaderName.fromString(key)
}
// Also validate values at parse time to complete fail-fast coverage
val headerMap = new java.util.HashMap[CosmosHeaderName, String]()
for ((key, value) <- parsed) {
  headerMap.put(CosmosHeaderName.fromString(key), value)
}
CosmosHeaderName.validateAdditionalHeaders(headerMap)

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

@dibahlfi
Copy link
Member Author

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants