Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions openapi/openapiv2.json
Original file line number Diff line number Diff line change
Expand Up @@ -14588,6 +14588,30 @@
"pollerScalingDecision": {
"$ref": "#/definitions/v1PollerScalingDecision",
"description": "Server-advised information the SDK may use to adjust its poller count."
},
"pollerGroupId": {
"type": "string",
"description": "This poller group ID identifies the owner of the workflow task awaiting for query response.\nCorresponding RespondQueryTaskCompleted should pass this value for proper routing."
},
"pollerGroupInfos": {
"type": "array",
"items": {
"type": "object",
"$ref": "#/definitions/v1PollerGroupInfo"
},
"description": "The weighted list of poller groups IDs that client should use for future polls to this task\nqueue. Client is expected to:\n 1. Maintain minimum number of pollers no less than the number of groups.\n 2. Try to assign the next poll to a group without any pending polls,\n 3. If every group has some pending polls, assign the next poll to a group randomly\n according to the weights."
}
}
},
"v1PollerGroupInfo": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"weight": {
"type": "number",
"format": "float"
}
}
},
Expand Down
24 changes: 24 additions & 0 deletions openapi/openapiv3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11647,6 +11647,30 @@ components:
allOf:
- $ref: '#/components/schemas/PollerScalingDecision'
description: Server-advised information the SDK may use to adjust its poller count.
pollerGroupId:
type: string
description: |-
This poller group ID identifies the owner of the workflow task awaiting for query response.
Corresponding RespondQueryTaskCompleted should pass this value for proper routing.
pollerGroupInfos:
type: array
items:
$ref: '#/components/schemas/PollerGroupInfo'
description: |-
The weighted list of poller groups IDs that client should use for future polls to this task
queue. Client is expected to:
1. Maintain minimum number of pollers no less than the number of groups.
2. Try to assign the next poll to a group without any pending polls,
3. If every group has some pending polls, assign the next poll to a group randomly
according to the weights.
PollerGroupInfo:
type: object
properties:
id:
type: string
weight:
type: number
format: float
PollerInfo:
type: object
properties:
Expand Down
5 changes: 5 additions & 0 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ message TimestampedCompatibleBuildIdRedirectRule {
google.protobuf.Timestamp create_time = 2;
}

message PollerGroupInfo {
string id = 1;
float weight = 2;
}

// Attached to task responses to give hints to the SDK about how it may adjust its number of
// pollers.
message PollerScalingDecision {
Expand Down
60 changes: 58 additions & 2 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ message GetWorkflowExecutionHistoryReverseResponse {
message PollWorkflowTaskQueueRequest {
string namespace = 1;
temporal.api.taskqueue.v1.TaskQueue task_queue = 2;
// Client must pass one of the poller group IDs received in `poller_group_infos` of the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clarify that is expected to be unset the first time?

// the PollWorkflowTaskQueueResponse according to the instructions. If not set, the poll is
// routed randomly which can cause it being blocked without receiving a task while the queue
// actually has tasks in another server location.
string poller_group_id = 9;
// The identity of the worker/client who is polling this task queue
string identity = 3;
// A unique key for this worker instance, used for tracking worker lifecycle.
Expand All @@ -275,7 +280,7 @@ message PollWorkflowTaskQueueRequest {
temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6;

// Removed in 1.55.0; was temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat
reserved 7;
reserved 7;
reserved "worker_heartbeat";
}

Expand Down Expand Up @@ -331,6 +336,16 @@ message PollWorkflowTaskQueueResponse {
repeated temporal.api.protocol.v1.Message messages = 15;
// Server-advised information the SDK may use to adjust its poller count.
temporal.api.taskqueue.v1.PollerScalingDecision poller_scaling_decision = 16;
// This poller group ID identifies the owner of the workflow task awaiting for query response.
// Corresponding RespondQueryTaskCompleted should pass this value for proper routing.
string poller_group_id = 17;
// The weighted list of poller groups IDs that client should use for future polls to this task
// queue. Client is expected to:
// 1. Maintain minimum number of pollers no less than the number of groups.
// 2. Try to assign the next poll to a group without any pending polls,
// 3. If every group has some pending polls, assign the next poll to a group randomly
// according to the weights.
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 18;
}

message RespondWorkflowTaskCompletedRequest {
Expand Down Expand Up @@ -443,6 +458,11 @@ message RespondWorkflowTaskFailedResponse {
message PollActivityTaskQueueRequest {
string namespace = 1;
temporal.api.taskqueue.v1.TaskQueue task_queue = 2;
// Client must pass one of the poller group IDs received in `poller_group_infos` of the last
// the PollActivityTaskQueueResponse according to the instructions. If not set, the poll is
// routed randomly which can cause it being blocked without receiving a task while the queue
// actually has tasks in another server location.
string poller_group_id = 9;
// The identity of the worker/client
string identity = 3;
// A unique key for this worker instance, used for tracking worker lifecycle.
Expand Down Expand Up @@ -516,6 +536,13 @@ message PollActivityTaskQueueResponse {
temporal.api.common.v1.Priority priority = 19;
// The run ID of the activity execution, only set for standalone activities.
string activity_run_id = 20;
// The weighted list of poller groups IDs that client should use for future polls to this task
// queue. Client is expected to:
// 1. Maintain minimum number of pollers no less than the number of groups.
// 2. Try to assign the next poll to a group without any pending polls,
// 3. If every group has some pending polls, assign the next poll to a group randomly
// according to the weights.
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 21;
}

message RecordActivityTaskHeartbeatRequest {
Expand Down Expand Up @@ -1028,6 +1055,9 @@ message RespondQueryTaskCompletedRequest {
// Why did the task fail? It's important to note that many of the variants in this enum cannot
// apply to worker responses. See the type's doc for more.
temporal.api.enums.v1.WorkflowTaskFailedCause cause = 8;
// Client must forward the poller_group_id received in PollWorkflowTaskQueueResponse for proper
// routing of the response.
string poller_group_id = 9;
}

message RespondQueryTaskCompletedResponse {
Expand Down Expand Up @@ -1868,12 +1898,17 @@ message PollWorkflowExecutionUpdateResponse {

message PollNexusTaskQueueRequest {
string namespace = 1;
temporal.api.taskqueue.v1.TaskQueue task_queue = 3;
// Client must pass one of the poller group IDs received in `poller_group_infos` of the last
// the PollNexusTaskQueueResponse according to the instructions. If not set, the poll is
// routed randomly which can cause it being blocked without receiving a task while the queue
// actually has tasks in another server location.
string poller_group_id = 9;
// The identity of the client who initiated this request.
string identity = 2;
// A unique key for this worker instance, used for tracking worker lifecycle.
// This is guaranteed to be unique, whereas identity is not guaranteed to be unique.
string worker_instance_key = 8;
temporal.api.taskqueue.v1.TaskQueue task_queue = 3;
// Information about this worker's build identifier and if it is choosing to use the versioning
// feature. See the `WorkerVersionCapabilities` docstring for more.
// Deprecated. Replaced by deployment_options.
Expand All @@ -1892,6 +1927,21 @@ message PollNexusTaskQueueResponse {
temporal.api.nexus.v1.Request request = 2;
// Server-advised information the SDK may use to adjust its poller count.
temporal.api.taskqueue.v1.PollerScalingDecision poller_scaling_decision = 3;
// Should be returned in RespondNexusTaskCompletedRequest and RespondNexusTaskFailedRequest
// for proper routing of handler's response.
string routing_key = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Routing key is new to me. How does this differ from poller_group_id, or is it meant to have been replaced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be here, deleting it.

// This poller group ID identifies the owner of the nexus task awaiting for synchronous
// response.
// Corresponding `RespondNexusTaskCompleted` and `RespondNexusTaskFailed` calls should pass this
// value for proper response routing.
string poller_group_id = 5;
// The weighted list of poller groups IDs that client should use for future polls to this task
// queue. Client is expected to:
// 1. Maintain minimum number of pollers no less than the number of groups.
// 2. Try to assign the next poll to a group without any pending polls,
// 3. If every group has some pending polls, assign the next poll to a group randomly
// according to the weights.
repeated temporal.api.taskqueue.v1.PollerGroupInfo poller_group_infos = 6;
}

message RespondNexusTaskCompletedRequest {
Expand All @@ -1902,6 +1952,9 @@ message RespondNexusTaskCompletedRequest {
bytes task_token = 3;
// Embedded response to be translated into a frontend response.
temporal.api.nexus.v1.Response response = 4;
// Client must forward the poller_group_id received in PollNexusTaskQueueResponse for proper
// routing of the response.
string poller_group_id = 5;
}

message RespondNexusTaskCompletedResponse {
Expand All @@ -1917,6 +1970,9 @@ message RespondNexusTaskFailedRequest {
temporal.api.nexus.v1.HandlerError error = 4 [deprecated = true];
// The error the handler failed with. Must contain a NexusHandlerFailureInfo object.
temporal.api.failure.v1.Failure failure = 5;
// Client must forward the poller_group_id received in PollNexusTaskQueueResponse for proper
// routing of the response.
string poller_group_id = 6;
}

message RespondNexusTaskFailedResponse {
Expand Down
27 changes: 26 additions & 1 deletion temporal/api/workflowservice/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ service WorkflowService {
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc PollWorkflowTaskQueue (PollWorkflowTaskQueueRequest) returns (PollWorkflowTaskQueueResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// RespondWorkflowTaskCompleted is called by workers to successfully complete workflow tasks
Expand Down Expand Up @@ -219,6 +223,10 @@ service WorkflowService {
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc PollActivityTaskQueue (PollActivityTaskQueueRequest) returns (PollActivityTaskQueueResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// RecordActivityTaskHeartbeat is optionally called by workers while they execute activities.
Expand Down Expand Up @@ -618,7 +626,12 @@ service WorkflowService {
//
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc RespondQueryTaskCompleted (RespondQueryTaskCompletedRequest) returns (RespondQueryTaskCompletedResponse) {}
rpc RespondQueryTaskCompleted (RespondQueryTaskCompletedRequest) returns (RespondQueryTaskCompletedResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// ResetStickyTaskQueue resets the sticky task queue related information in the mutable state of
// a given workflow. This is prudent for workers to perform if a workflow has been paged out of
Expand Down Expand Up @@ -1240,18 +1253,30 @@ service WorkflowService {
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc PollNexusTaskQueue(PollNexusTaskQueueRequest) returns (PollNexusTaskQueueResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// RespondNexusTaskCompleted is called by workers to respond to Nexus tasks received via PollNexusTaskQueue.
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc RespondNexusTaskCompleted(RespondNexusTaskCompletedRequest) returns (RespondNexusTaskCompletedResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// RespondNexusTaskFailed is called by workers to fail Nexus tasks received via PollNexusTaskQueue.
// (-- api-linter: core::0127::http-annotation=disabled
// aip.dev/not-precedent: We do not expose worker API to HTTP. --)
rpc RespondNexusTaskFailed(RespondNexusTaskFailedRequest) returns (RespondNexusTaskFailedResponse) {
option (temporal.api.protometa.v1.request_header) = {
header: "temporal-resource-id"
value: "poller:{poller_group_id}"
};
}

// UpdateActivityOptions is called by the client to update the options of an activity by its ID or type.
Expand Down
Loading