Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions temporalio/lib/temporalio/client/schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ def _to_proto
Policy = Data.define(
:overlap,
:catchup_window,
:pause_on_failure
:pause_on_failure,
:keep_original_workflow_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am afraid this is a backwards incompatible change to add a required field. This is going to be a bit annoying, but you're going to have to make an initialize on the Policy class that accepts all of these kwargs, but defaults this new last one, and just call super within it. Can look at other initialize on Data.define classes in this file (e.g. Schedule and Backfill) to get an idea.

)

# Policies of a schedule.
Expand All @@ -728,6 +729,8 @@ def _to_proto
# @!attribute pause_on_failure
# @return [Boolean] Whether to pause the schedule if an action fails or times out. Note: For workflows, this
# only applies after all retries have been exhausted.
# @!attribute keep_original_workflow_id
# @return [Boolean] Whether to keep the original workflow ID without appending a timestamp for uniqueness.
class Policy
# @!visibility private
def self._from_proto(raw_policies)
Expand All @@ -736,7 +739,8 @@ def self._from_proto(raw_policies)
raw_policies.overlap_policy,
zero_means_nil: true),
catchup_window: Internal::ProtoUtils.duration_to_seconds(raw_policies.catchup_window) || raise, # Never nil
pause_on_failure: raw_policies.pause_on_failure
pause_on_failure: raw_policies.pause_on_failure,
keep_original_workflow_id: raw_policies.keep_original_workflow_id
)
end

Expand All @@ -747,10 +751,13 @@ def self._from_proto(raw_policies)
# missed actions.
# @param pause_on_failure [Boolean] Whether to pause the schedule if an action fails or times out. Note: For
# workflows, this only applies after all retries have been exhausted.
# @param keep_original_workflow_id [Boolean] Whether to keep the original workflow ID without appending a
# timestamp for uniqueness.
def initialize(
overlap: OverlapPolicy::SKIP,
catchup_window: 365 * 24 * 60 * 60.0,
pause_on_failure: false
pause_on_failure: false,
keep_original_workflow_id: false
)
super
end
Expand All @@ -760,7 +767,8 @@ def _to_proto
Api::Schedule::V1::SchedulePolicies.new(
overlap_policy: overlap,
catchup_window: Internal::ProtoUtils.seconds_to_duration(catchup_window),
pause_on_failure:
pause_on_failure:,
keep_original_workflow_id:
)
end
end
Expand Down
6 changes: 4 additions & 2 deletions temporalio/sig/temporalio/client/schedule.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ module Temporalio
attr_reader overlap: OverlapPolicy::enum
attr_reader catchup_window: duration
attr_reader pause_on_failure: bool
attr_reader keep_original_workflow_id: bool

def self._from_proto: (untyped raw_policies) -> Policy

def initialize: (
?overlap: OverlapPolicy::enum,
?catchup_window: duration,
?pause_on_failure: bool
?pause_on_failure: bool,
?keep_original_workflow_id: bool
) -> void

def _to_proto: -> untyped
Expand Down Expand Up @@ -332,4 +334,4 @@ module Temporalio
end
end
end
end
end
73 changes: 72 additions & 1 deletion temporalio/test/client_schedule_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def test_calendar_spec_defaults
assert_equal desc.info.next_action_times[i - 1] + (24 * 60 * 60), next_action_time unless i.zero?
end
ensure
delete_schedules(handle.id) if defined?(handle)
delete_schedules(handle.id) if defined?(handle) && handle
end

def test_trigger_immediately
Expand Down Expand Up @@ -342,4 +342,75 @@ def test_backfill
ensure
delete_schedules(handle.id) if defined?(handle)
end

def test_keep_original_workflow_id_policy_on_create
assert_no_schedules

task_queue = "tq-#{SecureRandom.uuid}"
handle = env.client.create_schedule(
"sched-#{SecureRandom.uuid}",
Temporalio::Client::Schedule.new(
action: Temporalio::Client::Schedule::Action::StartWorkflow.new(
'kitchen_sink',
{ actions: [{ result: { value: 'some-result' } }] },
id: "wf-#{SecureRandom.uuid}",
task_queue:
),
spec: Temporalio::Client::Schedule::Spec.new(
intervals: [Temporalio::Client::Schedule::Spec::Interval.new(every: 60.0)]
),
policy: Temporalio::Client::Schedule::Policy.new(keep_original_workflow_id: true),
state: Temporalio::Client::Schedule::State.new(paused: true)
)
)

desc = handle.describe
assert desc.schedule.policy.keep_original_workflow_id
assert desc.raw_description.schedule.policies.keep_original_workflow_id
ensure
delete_schedules(handle.id) if defined?(handle)
end

def test_keep_original_workflow_id_policy_on_update
assert_no_schedules

task_queue = "tq-#{SecureRandom.uuid}"
handle = env.client.create_schedule(
"sched-#{SecureRandom.uuid}",
Temporalio::Client::Schedule.new(
action: Temporalio::Client::Schedule::Action::StartWorkflow.new(
'kitchen_sink',
{ actions: [{ result: { value: 'some-result' } }] },
id: "wf-#{SecureRandom.uuid}",
task_queue:
),
spec: Temporalio::Client::Schedule::Spec.new(
intervals: [Temporalio::Client::Schedule::Spec::Interval.new(every: 60.0)]
),
state: Temporalio::Client::Schedule::State.new(paused: true)
)
)

desc = handle.describe
refute desc.schedule.policy.keep_original_workflow_id
refute desc.raw_description.schedule.policies.keep_original_workflow_id

handle.update do |input|
current_policy = input.description.schedule.policy
updated_schedule = input.description.schedule.with(
policy: Temporalio::Client::Schedule::Policy.new(
overlap: current_policy.overlap,
catchup_window: current_policy.catchup_window,
pause_on_failure: current_policy.pause_on_failure,
keep_original_workflow_id: true
)
)
Temporalio::Client::Schedule::Update.new(schedule: updated_schedule)
end
desc = handle.describe
assert desc.schedule.policy.keep_original_workflow_id
assert desc.raw_description.schedule.policies.keep_original_workflow_id
ensure
delete_schedules(handle.id) if defined?(handle) && handle
end
end
Loading