-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Workload-ID feature #48128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Workload-ID feature #48128
Changes from all commits
f894bf4
a75ab7b
1bbf6b9
7aeb8b4
6d00d49
e2f1e36
c218ae3
b13d0e9
a79808d
9ce5d98
289d321
57ae651
7703485
2369f4c
4b99a7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,10 +4,10 @@ | |
| package com.azure.cosmos.spark | ||
|
|
||
| import com.azure.core.management.AzureEnvironment | ||
| import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, ReadConsistencyStrategy, spark} | ||
| import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, CosmosHeaderName, ReadConsistencyStrategy, spark} | ||
| import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants | ||
| import com.azure.cosmos.implementation.routing.LocationHelper | ||
| import com.azure.cosmos.implementation.{Configs, SparkBridgeImplementationInternal, Strings} | ||
| import com.azure.cosmos.implementation.{Configs, SparkBridgeImplementationInternal, Strings, Utils} | ||
| import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosContainerIdentity, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange, PartitionKeyDefinition} | ||
| import com.azure.cosmos.spark.ChangeFeedModes.ChangeFeedMode | ||
| import com.azure.cosmos.spark.ChangeFeedStartFromModes.{ChangeFeedStartFromMode, PointInTime} | ||
|
|
@@ -34,6 +34,7 @@ import java.time.format.DateTimeFormatter | |
| import java.time.{Duration, Instant} | ||
| import java.util | ||
| import java.util.{Locale, ServiceLoader} | ||
| import scala.collection.JavaConverters._ // scalastyle:ignore underscore.import | ||
| import scala.collection.concurrent.TrieMap | ||
| import scala.collection.immutable.{HashSet, List, Map} | ||
| import scala.collection.mutable | ||
|
|
@@ -151,6 +152,10 @@ private[spark] object CosmosConfigNames { | |
| val ThroughputControlTargetThroughputThreshold = "spark.cosmos.throughputControl.targetThroughputThreshold" | ||
| val ThroughputControlPriorityLevel = "spark.cosmos.throughputControl.priorityLevel" | ||
| val ThroughputControlThroughputBucket = "spark.cosmos.throughputControl.throughputBucket" | ||
| // Additional HTTP headers to attach to all Cosmos DB requests (e.g., workload-id for resource governance). | ||
| // Value is a JSON string like: {"x-ms-cosmos-workload-id": "15"} | ||
| // Flows through to CosmosClientBuilder.additionalHeaders(). | ||
| val AdditionalHeaders = "spark.cosmos.additionalHeaders" | ||
| val ThroughputControlGlobalControlDatabase = "spark.cosmos.throughputControl.globalControl.database" | ||
| val ThroughputControlGlobalControlContainer = "spark.cosmos.throughputControl.globalControl.container" | ||
| val ThroughputControlGlobalControlRenewalIntervalInMS = | ||
|
|
@@ -297,7 +302,8 @@ private[spark] object CosmosConfigNames { | |
| WriteOnRetryCommitInterceptor, | ||
| WriteFlushCloseIntervalInSeconds, | ||
| WriteMaxNoProgressIntervalInSeconds, | ||
| WriteMaxRetryNoProgressIntervalInSeconds | ||
| WriteMaxRetryNoProgressIntervalInSeconds, | ||
| AdditionalHeaders | ||
| ) | ||
|
|
||
| def validateConfigName(name: String): Unit = { | ||
|
|
@@ -540,7 +546,10 @@ private case class CosmosAccountConfig(endpoint: String, | |
| resourceGroupName: Option[String], | ||
| azureEnvironmentEndpoints: java.util.Map[String, String], | ||
| clientBuilderInterceptors: Option[List[CosmosClientBuilder => CosmosClientBuilder]], | ||
| clientInterceptors: Option[List[CosmosAsyncClient => CosmosAsyncClient]], | ||
| clientInterceptors: Option[List[CosmosAsyncClient => CosmosAsyncClient]], | ||
| // Optional additional HTTP headers (e.g., workload-id) parsed from | ||
| // spark.cosmos.additionalHeaders JSON config, passed to CosmosClientBuilder.additionalHeaders() | ||
| additionalHeaders: Option[Map[String, String]] | ||
| ) | ||
|
|
||
| private object CosmosAccountConfig extends BasicLoggingTrait { | ||
|
|
@@ -727,6 +736,40 @@ private object CosmosAccountConfig extends BasicLoggingTrait { | |
| parseFromStringFunction = clientInterceptorFQDN => clientInterceptorFQDN, | ||
| helpMessage = "CosmosAsyncClient interceptors (comma separated) - FQDNs of the service implementing the 'CosmosClientInterceptor' trait.") | ||
|
|
||
| // Config entry for custom HTTP headers (e.g., workload-id). Parses a JSON string like | ||
| // {"x-ms-cosmos-workload-id": "15"} into a Scala Map[String, String] using Jackson. | ||
| // These headers are converted to Map[CosmosHeaderName, String] and passed to | ||
| // CosmosClientBuilder.additionalHeaders() in CosmosClientCache. | ||
| // | ||
| // Validation: After JSON parsing, every header name is validated via CosmosHeaderName.fromString() | ||
| // to fail fast at config-parse time rather than at runtime during client creation. | ||
| // This prevents Spark jobs from starting, allocating cluster resources, and only failing | ||
| // later when CosmosClientCache tries to convert String keys to CosmosHeaderName instances. | ||
| private val AdditionalHeadersConfig = CosmosConfigEntry[Map[String, String]]( | ||
| key = CosmosConfigNames.AdditionalHeaders, | ||
| mandatory = false, | ||
| parseFromStringFunction = headersJson => { | ||
| try { | ||
| val typeRef = new com.fasterxml.jackson.core.`type`.TypeReference[java.util.Map[String, String]]() {} | ||
| val parsed = Utils.getSimpleObjectMapperWithAllowDuplicates.readValue(headersJson, typeRef).asScala.toMap | ||
|
|
||
| // Fail fast: validate every header name is a known CosmosHeaderName at parse time. | ||
| // Without this, unknown headers like {"x-bad-header": "value"} would parse successfully | ||
| // and only blow up at runtime in CosmosClientCache when CosmosHeaderName.fromString() is called. | ||
| for (key <- parsed.keys) { | ||
| CosmosHeaderName.fromString(key) // throws IllegalArgumentException for unknown headers | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟢 Suggestion · Fail-Fast Gap: Header values not validated at Spark config parse time The The code comment above explicitly states the fail-fast goal: "validate every header name is a known CosmosHeaderName at parse time … rather than at runtime during client creation." The same reasoning applies to values. Suggested fix — after the name validation loop, convert and validate values too: for (key <- parsed.keys) {
CosmosHeaderName.fromString(key)
}
// Also validate values at parse time to complete fail-fast coverage
val headerMap = new java.util.HashMap[CosmosHeaderName, String]()
for ((key, value) <- parsed) {
headerMap.put(CosmosHeaderName.fromString(key), value)
}
CosmosHeaderName.validateAdditionalHeaders(headerMap) |
||
| } | ||
|
|
||
| parsed | ||
| } catch { | ||
| case e: IllegalArgumentException => throw e | ||
| case e: Exception => throw new IllegalArgumentException( | ||
| s"Invalid JSON for '${CosmosConfigNames.AdditionalHeaders}': '$headersJson'. " + | ||
| "Expected format: {\"x-ms-cosmos-workload-id\": \"15\"}", e) | ||
| } | ||
| }, | ||
| helpMessage = "Optional additional headers as JSON map. Example: {\"x-ms-cosmos-workload-id\": \"15\"}") | ||
|
|
||
| private[spark] def parseProactiveConnectionInitConfigs(config: String): java.util.List[CosmosContainerIdentity] = { | ||
| val result = new java.util.ArrayList[CosmosContainerIdentity] | ||
| try { | ||
|
|
@@ -761,6 +804,8 @@ private object CosmosAccountConfig extends BasicLoggingTrait { | |
| val tenantIdOpt = CosmosConfigEntry.parse(cfg, TenantId) | ||
| val clientBuilderInterceptors = CosmosConfigEntry.parse(cfg, ClientBuilderInterceptors) | ||
| val clientInterceptors = CosmosConfigEntry.parse(cfg, ClientInterceptors) | ||
| // Parse optional additional HTTP headers from JSON config (e.g., {"x-ms-cosmos-workload-id": "15"}) | ||
| val additionalHeaders = CosmosConfigEntry.parse(cfg, AdditionalHeadersConfig) | ||
|
|
||
| val disableTcpConnectionEndpointRediscovery = CosmosConfigEntry.parse(cfg, DisableTcpConnectionEndpointRediscovery) | ||
| val preferredRegionsListOpt = CosmosConfigEntry.parse(cfg, PreferredRegionsList) | ||
|
|
@@ -880,7 +925,8 @@ private object CosmosAccountConfig extends BasicLoggingTrait { | |
| resourceGroupNameOpt, | ||
| azureEnvironmentOpt.get, | ||
| if (clientBuilderInterceptorsList.nonEmpty) { Some(clientBuilderInterceptorsList.toList) } else { None }, | ||
| if (clientInterceptorsList.nonEmpty) { Some(clientInterceptorsList.toList) } else { None }) | ||
| if (clientInterceptorsList.nonEmpty) { Some(clientInterceptorsList.toList) } else { None }, | ||
| additionalHeaders) | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.