Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions temporalio/lib/temporalio/common_enums.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,35 @@ module WorkflowIDConflictPolicy
TERMINATE_EXISTING = Api::Enums::V1::WorkflowIdConflictPolicy::WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
end

# Specifies the versioning behavior for the first task of a new run after continue-as-new. This is currently
# experimental.
module ContinueAsNewVersioningBehavior
# Unspecified. Follow existing continue-as-new inheritance semantics.
UNSPECIFIED =
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
# Start the new run with AutoUpgrade behavior. Use the Target Version of the workflow's task queue at start-time.
# After the first workflow task completes, use whatever Versioning Behavior the workflow is annotated with in the
# workflow code.
AUTO_UPGRADE =
Api::Enums::V1::ContinueAsNewVersioningBehavior::CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
end

# Specifies why the server suggests continue-as-new. This is currently experimental.
module SuggestContinueAsNewReason
# Unspecified.
UNSPECIFIED =
Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_UNSPECIFIED
# Workflow History size is getting too large.
HISTORY_SIZE_TOO_LARGE =
Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE
# Workflow History event count is getting too large.
TOO_MANY_HISTORY_EVENTS =
Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS
# Workflow's count of completed plus in-flight updates is too large.
TOO_MANY_UPDATES =
Api::Enums::V1::SuggestContinueAsNewReason::SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES
end

# Specifies when a workflow might move from a worker of one Build Id to another.
module VersioningBehavior
# Unspecified versioning behavior. By default, workers opting into worker versioning will
Expand Down
11 changes: 9 additions & 2 deletions temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
:pending_timers, :pending_child_workflow_starts, :pending_child_workflows,
:pending_nexus_operation_starts, :pending_nexus_operations,
:pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter,
:failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version,
:failure_converter, :cancellation, :continue_as_new_suggested,
:suggest_continue_as_new_reasons, :target_worker_deployment_version_changed,
:current_deployment_version,
:current_history_length, :current_history_size, :replaying, :random,
:signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity,
:in_query_or_validator
Expand Down Expand Up @@ -92,6 +94,8 @@ def initialize(details)
@interceptors = details.interceptors
@cancellation, @cancellation_proc = Cancellation.new
@continue_as_new_suggested = false
@suggest_continue_as_new_reasons = []
@target_worker_deployment_version_changed = false
@current_history_length = 0
@current_history_size = 0
@replaying = false
Expand Down Expand Up @@ -176,6 +180,8 @@ def activate(activation)
@commands = []
@current_activation_error = nil
@continue_as_new_suggested = activation.continue_as_new_suggested
@suggest_continue_as_new_reasons = activation.suggest_continue_as_new_reasons.map(&:to_i)
@target_worker_deployment_version_changed = activation.target_worker_deployment_version_changed
@current_deployment_version = WorkerDeploymentVersion._from_bridge(
activation.deployment_version_for_current_task
)
Expand Down Expand Up @@ -639,7 +645,8 @@ def on_top_level_exception(err)
memo: ProtoUtils.memo_to_proto_hash(err.memo, payload_converter),
headers: ProtoUtils.headers_to_proto_hash(err.headers, payload_converter),
search_attributes: err.search_attributes&._to_proto,
retry_policy: err.retry_policy&._to_proto
retry_policy: err.retry_policy&._to_proto,
initial_versioning_behavior: err.initial_versioning_behavior || 0
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ def create_nexus_client(endpoint:, service:)
NexusClient.new(endpoint:, service:, outbound: @outbound)
end

def suggest_continue_as_new_reasons
@instance.suggest_continue_as_new_reasons
end

def target_worker_deployment_version_changed?
@instance.target_worker_deployment_version_changed
end

def current_details
@instance.current_details || ''
end
Expand Down
22 changes: 20 additions & 2 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ def self.create_nexus_client(endpoint:, service:)
_current.create_nexus_client(endpoint:, service:)
end

# @return [Array<SuggestContinueAsNewReason::enum>] Reasons the server suggests continue-as-new. Empty if no
# suggestion. This is currently experimental.
def self.suggest_continue_as_new_reasons
_current.suggest_continue_as_new_reasons
end

# @return [Boolean] Whether the target worker deployment version has changed from the one this workflow is running
# on. This is currently experimental.
def self.target_worker_deployment_version_changed?
_current.target_worker_deployment_version_changed?
end

# Get current details for this workflow that may appear in UI/CLI. Unlike static details set at start, this value
# can be updated throughout the life of the workflow. This can be in Temporal markdown format and can span multiple
# lines. This is currently experimental.
Expand Down Expand Up @@ -634,7 +646,8 @@ def respond_to_missing?(name, include_all = false)
# Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new.
class ContinueAsNewError < Error
attr_accessor :args, :workflow, :task_queue, :run_timeout, :task_timeout,
:retry_policy, :memo, :search_attributes, :arg_hints, :headers
:retry_policy, :memo, :search_attributes, :arg_hints, :headers,
:initial_versioning_behavior

# Create a continue as new error.
#
Expand All @@ -657,6 +670,9 @@ class ContinueAsNewError < Error
# workflow definition has arg hints, those are used by default.
# @param headers [Hash<String, Object>] Headers for the workflow. The default is _not_ carried over from the
# current workflow.
# @param initial_versioning_behavior [ContinueAsNewVersioningBehavior::enum, nil] Versioning behavior for the
# first task of the new run. Set to {ContinueAsNewVersioningBehavior::AUTO_UPGRADE} to upgrade a pinned workflow
# to the latest version on continue-as-new. This is currently experimental.
def initialize(
*args,
workflow: nil,
Expand All @@ -667,7 +683,8 @@ def initialize(
memo: nil,
search_attributes: nil,
arg_hints: nil,
headers: {}
headers: {},
initial_versioning_behavior: nil
)
super('Continue as new')
@args = args
Expand All @@ -680,6 +697,7 @@ def initialize(
@search_attributes = search_attributes
@arg_hints = arg_hints
@headers = headers
@initial_versioning_behavior = initial_versioning_behavior
Workflow._current.initialize_continue_as_new_error(self)
end
end
Expand Down
16 changes: 16 additions & 0 deletions temporalio/sig/temporalio/common_enums.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ module Temporalio
TERMINATE_EXISTING: enum
end

module ContinueAsNewVersioningBehavior
type enum = Integer

UNSPECIFIED: enum
AUTO_UPGRADE: enum
end

module SuggestContinueAsNewReason
type enum = Integer

UNSPECIFIED: enum
HISTORY_SIZE_TOO_LARGE: enum
TOO_MANY_HISTORY_EVENTS: enum
TOO_MANY_UPDATES: enum
end

module VersioningBehavior
type enum = Integer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module Temporalio
attr_reader failure_converter: Converters::FailureConverter
attr_reader cancellation: Cancellation
attr_reader continue_as_new_suggested: bool
attr_reader suggest_continue_as_new_reasons: Array[SuggestContinueAsNewReason::enum]
attr_reader target_worker_deployment_version_changed: bool
attr_reader current_history_length: Integer
attr_reader current_history_size: Integer
attr_reader replaying: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ module Temporalio

def create_nexus_client: (endpoint: Symbol | String, service: Symbol | String) -> Workflow::NexusClient

def suggest_continue_as_new_reasons: -> Array[SuggestContinueAsNewReason::enum]

def target_worker_deployment_version_changed?: -> bool

def current_details: -> String
def current_details=: (String? details) -> void

Expand Down
8 changes: 7 additions & 1 deletion temporalio/sig/temporalio/workflow.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ module Temporalio

def self.create_nexus_client: (endpoint: Symbol | String, service: Symbol | String) -> NexusClient

def self.suggest_continue_as_new_reasons: -> Array[SuggestContinueAsNewReason::enum]

def self.target_worker_deployment_version_changed?: -> bool

def self.current_details: -> String
def self.current_details=: (String? details) -> void

Expand Down Expand Up @@ -177,6 +181,7 @@ module Temporalio
attr_accessor search_attributes: SearchAttributes?
attr_accessor arg_hints: Array[Object]?
attr_accessor headers: Hash[String, Object?]
attr_accessor initial_versioning_behavior: ContinueAsNewVersioningBehavior::enum?

def initialize: (
*Object? args,
Expand All @@ -188,7 +193,8 @@ module Temporalio
?memo: Hash[String | Symbol, Object?]?,
?search_attributes: SearchAttributes?,
?arg_hints: Array[Object]?,
?headers: Hash[String, Object?]
?headers: Hash[String, Object?],
?initial_versioning_behavior: ContinueAsNewVersioningBehavior::enum?
) -> void
end

Expand Down
2 changes: 2 additions & 0 deletions temporalio/test/sig/worker_workflow_versioning_test.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ class WorkerWorkflowVersioningTest < Test
def wait_until_worker_deployment_visible: (untyped client, Temporalio::WorkerDeploymentVersion version) -> untyped
def set_current_deployment_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version) -> untyped
def set_ramping_version: (untyped client, String task_queue, Temporalio::WorkerDeploymentVersion version, Float rate) -> untyped
def wait_for_workflow_running_on_version: (untyped handle, String expected_build_id) -> void
def wait_for_worker_deployment_routing_config_propagation: (untyped client, String deployment_name, String expected_current_build_id) -> void
end
2 changes: 1 addition & 1 deletion temporalio/test/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def initialize
if target_host.empty?
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
logger: Logger.new($stdout),
dev_server_download_version: 'v1.6.1-server-1.31.0-150.0',
dev_server_download_version: 'v1.6.2-server-1.31.0-151.6',
dev_server_extra_args: [
# Allow continue as new to be immediate
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',
Expand Down
2 changes: 1 addition & 1 deletion temporalio/test/worker_workflow_nexus_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def test_nexus_handler_error
assert result['handler_error']
assert_equal 'BAD_REQUEST', result['error_type']
assert_equal Temporalio::Error::NexusHandlerError::RetryBehavior::UNSPECIFIED, result['retry_behavior']
assert_includes result['message'], 'operation failed'
# TODO: re-add message assertion once nexus failure message is stabilized
end
end

Expand Down
141 changes: 141 additions & 0 deletions temporalio/test/worker_workflow_versioning_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,145 @@ def test_workflows_can_use_versioning_override
assert(execution_started_event.workflow_execution_started_event_attributes.versioning_override)
end
end

# V1: Pinned. Loops with timer, checking target_worker_deployment_version_changed?.
# When detected, CAN with AUTO_UPGRADE. Guard: if attempt > 0, return "v1.0".
class CanVersionUpgradeWorkflowV1 < Temporalio::Workflow::Definition
workflow_name :ContinueAsNewWithVersionUpgrade
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED

def execute(attempt)
return 'v1.0' if attempt.positive?

loop do
Temporalio::Workflow.sleep(0.01)
next unless Temporalio::Workflow.target_worker_deployment_version_changed?

raise Temporalio::Workflow::ContinueAsNewError.new(
attempt + 1,
initial_versioning_behavior: Temporalio::ContinueAsNewVersioningBehavior::AUTO_UPGRADE
)
end
end
end

# V2: Pinned. Just returns "v2.0".
class CanVersionUpgradeWorkflowV2 < Temporalio::Workflow::Definition
workflow_name :ContinueAsNewWithVersionUpgrade
workflow_versioning_behavior Temporalio::VersioningBehavior::PINNED

def execute(_attempt)
'v2.0'
end
end

def test_continue_as_new_with_version_upgrade
deployment_name = "deployment-can-upgrade-#{SecureRandom.uuid}"
worker_v1 = Temporalio::WorkerDeploymentVersion.new(
deployment_name: deployment_name, build_id: '1.0'
)
worker_v2 = Temporalio::WorkerDeploymentVersion.new(
deployment_name: deployment_name, build_id: '2.0'
)

task_queue = "tq-#{SecureRandom.uuid}"

worker1 = Temporalio::Worker.new(
client: env.client,
task_queue: task_queue,
workflows: [CanVersionUpgradeWorkflowV1],
deployment_options: Temporalio::Worker::DeploymentOptions.new(
version: worker_v1,
use_worker_versioning: true
)
)

worker2 = Temporalio::Worker.new(
client: env.client,
task_queue: task_queue,
workflows: [CanVersionUpgradeWorkflowV2],
deployment_options: Temporalio::Worker::DeploymentOptions.new(
version: worker_v2,
use_worker_versioning: true
)
)

Temporalio::Worker.run_all(worker1, worker2) do
# Wait for v1 deployment to be visible and set as current
describe_resp = wait_until_worker_deployment_visible(env.client, worker_v1)
resp2 = set_current_deployment_version(env.client, describe_resp.conflict_token, worker_v1)

# Wait for routing config propagation
wait_for_worker_deployment_routing_config_propagation(env.client, deployment_name, worker_v1.build_id)

# Start workflow on v1
handle = env.client.start_workflow(
'ContinueAsNewWithVersionUpgrade',
0,
id: "test-can-version-upgrade-#{SecureRandom.uuid}",
task_queue: task_queue
)

# Wait for workflow to be running on v1
wait_for_workflow_running_on_version(handle, worker_v1.build_id)

# Wait for v2 deployment to be visible
wait_until_worker_deployment_visible(env.client, worker_v2)

# Set v2 as current
set_current_deployment_version(env.client, resp2.conflict_token, worker_v2)

# Wait for routing config propagation
wait_for_worker_deployment_routing_config_propagation(env.client, deployment_name, worker_v2.build_id)

# Expect workflow to CAN onto v2 and return "v2.0"
result = handle.result
assert_equal 'v2.0', result
end
end

def wait_for_workflow_running_on_version(handle, expected_build_id)
assert_eventually do
desc = handle.describe
assert_equal Temporalio::Client::WorkflowExecutionStatus::RUNNING, desc.status,
"workflow not yet running (status: #{desc.status})"

versioning_info = desc.raw_description.workflow_execution_info&.versioning_info
assert versioning_info.respond_to?(:deployment_version),
'versioning_info does not have deployment_version'

assert_equal expected_build_id, versioning_info.deployment_version&.build_id
end
end

def wait_for_worker_deployment_routing_config_propagation(
client, deployment_name, expected_current_build_id, expected_ramping_build_id = ''
)
assert_eventually do
res = client.workflow_service.describe_worker_deployment(
Temporalio::Api::WorkflowService::V1::DescribeWorkerDeploymentRequest.new(
namespace: client.namespace,
deployment_name: deployment_name
)
)
info = res.worker_deployment_info
routing_config = info&.routing_config
assert routing_config, 'routing config not yet available'

assert_equal expected_current_build_id,
routing_config.current_deployment_version&.build_id.to_s

assert_equal expected_ramping_build_id,
routing_config.ramping_deployment_version&.build_id.to_s

state = info.routing_config_update_state
assert(
state == :ROUTING_CONFIG_UPDATE_STATE_COMPLETED ||
state == :ROUTING_CONFIG_UPDATE_STATE_UNSPECIFIED,
"routing config propagation not complete (state: #{state})"
)
rescue Temporalio::Error::RPCError
assert false, 'RPC error during routing config check'
end
end
end
Loading