Skip to content

Commit f4a43ca

Browse files
THardy98tconley1428claude
authored
Upgrade on CAN (#1311)
* upgrade on CAN (needs test) * fix CAN overloads (add initial_version_behavior), add test * Add PR suggestions, add test to check CAN suggested reasons persist across WFTs * add docs to UNSPECIFIED enums for suggested CAN reasons and initial versioning behavior * remove unused _to_proto (casting instead) * formattign * add missing proto.nexus import * fix lint - remove misc print * add param to v2, add unused param suppression * get again formatting * Update to use new version upgraded flag * Formatting * Remove continue-as-new reasons exposure in favor of new bool API The continue-as-new reasons exposure functionality is no longer needed with the new bool API. This commit removes: - get_suggested_continue_as_new_reasons() method from workflow.Info - SuggestContinueAsNewReason enum class - Related implementation in _workflow_instance.py - Abstract method declaration from _Runtime Keeps is_target_worker_deployment_version_changed() functionality for upgrade-on-continue-as-new feature. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Revert protobuf files to main branch versions Reset all pb2.py and pb2.pyi files to their main branch versions to remove continue-as-new reason exposures that are no longer needed with the new bool API. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Remove invalid test --------- Co-authored-by: Tim Conley <timothy.conley@temporal.io> Co-authored-by: Claude <noreply@anthropic.com>
1 parent e33fc8d commit f4a43ca

4 files changed

Lines changed: 238 additions & 1 deletion

File tree

temporalio/worker/_interceptor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import temporalio.nexus
2222
import temporalio.nexus._util
2323
import temporalio.workflow
24-
from temporalio.workflow import VersioningIntent
24+
from temporalio.workflow import ContinueAsNewVersioningBehavior, VersioningIntent
2525

2626

2727
class Interceptor:
@@ -172,6 +172,7 @@ class ContinueAsNewInput:
172172
)
173173
headers: Mapping[str, temporalio.api.common.v1.Payload]
174174
versioning_intent: VersioningIntent | None
175+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None
175176
# The types may be absent
176177
arg_types: list[type] | None
177178

temporalio/worker/_workflow_instance.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
251251
self._current_history_length = 0
252252
self._current_history_size = 0
253253
self._continue_as_new_suggested = False
254+
self._target_worker_deployment_version_changed = False
254255
# Lazily loaded
255256
self._untyped_converted_memo: MutableMapping[str, Any] | None = None
256257
# Handles which are ready to run on the next event loop iteration
@@ -405,6 +406,9 @@ def activate(
405406
self._current_history_length = act.history_length
406407
self._current_history_size = act.history_size_bytes
407408
self._continue_as_new_suggested = act.continue_as_new_suggested
409+
self._target_worker_deployment_version_changed = (
410+
act.target_worker_deployment_version_changed
411+
)
408412
self._time_ns = act.timestamp.ToNanoseconds()
409413
self._is_replaying = act.is_replaying
410414
self._current_thread_id = threading.get_ident()
@@ -1131,6 +1135,8 @@ def workflow_continue_as_new(
11311135
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
11321136
),
11331137
versioning_intent: temporalio.workflow.VersioningIntent | None,
1138+
initial_versioning_behavior: temporalio.workflow.ContinueAsNewVersioningBehavior
1139+
| None,
11341140
) -> NoReturn:
11351141
self._assert_not_read_only("continue as new")
11361142
# Use definition if callable
@@ -1158,6 +1164,7 @@ def workflow_continue_as_new(
11581164
headers={},
11591165
arg_types=arg_types,
11601166
versioning_intent=versioning_intent,
1167+
initial_versioning_behavior=initial_versioning_behavior,
11611168
)
11621169
)
11631170

@@ -1227,6 +1234,9 @@ def workflow_instance(self) -> Any:
12271234
def workflow_is_continue_as_new_suggested(self) -> bool:
12281235
return self._continue_as_new_suggested
12291236

1237+
def workflow_is_target_worker_deployment_version_changed(self) -> bool:
1238+
return self._target_worker_deployment_version_changed
1239+
12301240
def workflow_is_replaying(self) -> bool:
12311241
return self._is_replaying
12321242

@@ -3428,6 +3438,11 @@ def _apply_command(self) -> None:
34283438
)
34293439
if self._input.versioning_intent:
34303440
v.versioning_intent = self._input.versioning_intent._to_proto()
3441+
if self._input.initial_versioning_behavior:
3442+
v.initial_versioning_behavior = cast(
3443+
"temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.ValueType",
3444+
int(self._input.initial_versioning_behavior),
3445+
)
34313446

34323447

34333448
def _encode_search_attributes(

temporalio/workflow.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
)
5151

5252
import temporalio.api.common.v1
53+
import temporalio.api.enums
54+
import temporalio.api.enums.v1
5355
import temporalio.bridge.proto.child_workflow
5456
import temporalio.bridge.proto.common
5557
import temporalio.bridge.proto.nexus
@@ -601,6 +603,16 @@ def is_continue_as_new_suggested(self) -> bool:
601603
"""
602604
return _Runtime.current().workflow_is_continue_as_new_suggested()
603605

606+
def is_target_worker_deployment_version_changed(self) -> bool:
607+
"""Check whether the target worker deployment version has changed.
608+
609+
Note: Upgrade-on-Continue-as-New is currently experimental.
610+
611+
Returns:
612+
True if the target worker deployment version has changed.
613+
"""
614+
return _Runtime.current().workflow_is_target_worker_deployment_version_changed()
615+
604616

605617
@dataclass(frozen=True)
606618
class ParentInfo:
@@ -690,6 +702,7 @@ def workflow_continue_as_new(
690702
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
691703
),
692704
versioning_intent: VersioningIntent | None,
705+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None,
693706
) -> NoReturn: ...
694707

695708
@abstractmethod
@@ -735,6 +748,9 @@ def workflow_instance(self) -> Any: ...
735748
@abstractmethod
736749
def workflow_is_continue_as_new_suggested(self) -> bool: ...
737750

751+
@abstractmethod
752+
def workflow_is_target_worker_deployment_version_changed(self) -> bool: ...
753+
738754
@abstractmethod
739755
def workflow_is_replaying(self) -> bool: ...
740756

@@ -4800,6 +4816,7 @@ def continue_as_new(
48004816
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48014817
) = None,
48024818
versioning_intent: VersioningIntent | None = None,
4819+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48034820
) -> NoReturn: ...
48044821

48054822

@@ -4818,6 +4835,7 @@ def continue_as_new(
48184835
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48194836
) = None,
48204837
versioning_intent: VersioningIntent | None = None,
4838+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48214839
) -> NoReturn: ...
48224840

48234841

@@ -4837,6 +4855,7 @@ def continue_as_new(
48374855
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48384856
) = None,
48394857
versioning_intent: VersioningIntent | None = None,
4858+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48404859
) -> NoReturn: ...
48414860

48424861

@@ -4856,6 +4875,7 @@ def continue_as_new(
48564875
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48574876
) = None,
48584877
versioning_intent: VersioningIntent | None = None,
4878+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48594879
) -> NoReturn: ...
48604880

48614881

@@ -4875,6 +4895,7 @@ def continue_as_new(
48754895
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48764896
) = None,
48774897
versioning_intent: VersioningIntent | None = None,
4898+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48784899
) -> NoReturn: ...
48794900

48804901

@@ -4893,6 +4914,7 @@ def continue_as_new(
48934914
temporalio.common.SearchAttributes | temporalio.common.TypedSearchAttributes
48944915
) = None,
48954916
versioning_intent: VersioningIntent | None = None,
4917+
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None = None,
48964918
) -> NoReturn:
48974919
"""Stop the workflow immediately and continue as new.
48984920
@@ -4934,6 +4956,7 @@ def continue_as_new(
49344956
memo=memo,
49354957
search_attributes=search_attributes,
49364958
versioning_intent=versioning_intent,
4959+
initial_versioning_behavior=initial_versioning_behavior,
49374960
)
49384961

49394962

@@ -5350,6 +5373,34 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
53505373
return temporalio.bridge.proto.common.VersioningIntent.UNSPECIFIED
53515374

53525375

5376+
class ContinueAsNewVersioningBehavior(IntEnum):
5377+
"""Experimental. Optionally decide the versioning behavior that the first task of the new run should use.
5378+
For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version
5379+
of the previous run.
5380+
"""
5381+
5382+
UNSPECIFIED = int(
5383+
temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
5384+
)
5385+
"""An initial versioning behavior is not set, follow the existing continue-as-new inheritance semantics.
5386+
See https://docs.temporal.io/worker-versioning#inheritance-semantics for more detail.
5387+
"""
5388+
5389+
AUTO_UPGRADE = int(
5390+
temporalio.api.enums.v1.ContinueAsNewVersioningBehavior.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
5391+
)
5392+
"""Start the new run with AutoUpgrade behavior. Use the Target Version of the workflow's task queue at
5393+
start-time, as AutoUpgrade workflows do. After the first workflow task completes, use whatever
5394+
Versioning Behavior the workflow is annotated with in the workflow code.
5395+
5396+
Note that if the previous workflow had a Pinned override, that override will be inherited by the
5397+
new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
5398+
command. If a Pinned override is inherited by the new run, and the new run starts with AutoUpgrade
5399+
behavior, the base version of the new run will be the Target Version as described above, but the
5400+
effective version will be whatever is specified by the Versioning Override until the override is removed.
5401+
"""
5402+
5403+
53535404
ServiceT = TypeVar("ServiceT")
53545405

53555406

tests/worker/test_worker.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from temporalio.client import (
2929
Client,
30+
WorkflowHandle,
3031
)
3132
from temporalio.common import PinnedVersioningOverride, RawValue, VersioningBehavior
3233
from temporalio.runtime import (
@@ -1162,6 +1163,54 @@ async def set_ramping_version(
11621163
return response
11631164

11641165

1166+
async def wait_for_worker_deployment_routing_config_propagation(
1167+
client: Client,
1168+
deployment_name: str,
1169+
expected_current_build_id: str,
1170+
expected_ramping_build_id: str = "",
1171+
) -> None:
1172+
"""Wait for routing config to be propagated to all task queues."""
1173+
import temporalio.api.enums.v1
1174+
1175+
async def check() -> bool:
1176+
resp = await client.workflow_service.describe_worker_deployment(
1177+
DescribeWorkerDeploymentRequest(
1178+
namespace=client.namespace,
1179+
deployment_name=deployment_name,
1180+
)
1181+
)
1182+
routing_config = resp.worker_deployment_info.routing_config
1183+
if (
1184+
routing_config.current_deployment_version.build_id
1185+
!= expected_current_build_id
1186+
):
1187+
return False
1188+
if (
1189+
routing_config.ramping_deployment_version.build_id
1190+
!= expected_ramping_build_id
1191+
):
1192+
return False
1193+
state = resp.worker_deployment_info.routing_config_update_state
1194+
if (
1195+
state
1196+
== temporalio.api.enums.v1.RoutingConfigUpdateState.ROUTING_CONFIG_UPDATE_STATE_COMPLETED
1197+
):
1198+
return True
1199+
if (
1200+
state
1201+
== temporalio.api.enums.v1.RoutingConfigUpdateState.ROUTING_CONFIG_UPDATE_STATE_UNSPECIFIED
1202+
):
1203+
return True # unimplemented
1204+
if (
1205+
state
1206+
== temporalio.api.enums.v1.RoutingConfigUpdateState.ROUTING_CONFIG_UPDATE_STATE_IN_PROGRESS
1207+
):
1208+
return False
1209+
return False
1210+
1211+
await assert_eventually(check)
1212+
1213+
11651214
def create_worker(
11661215
client: Client,
11671216
on_fatal_error: Callable[[BaseException], Awaitable[None]] | None = None,
@@ -1316,3 +1365,124 @@ async def capture_client_activity() -> None:
13161365
assert len(captured_clients) == 2
13171366
assert captured_clients[0] is client
13181367
assert captured_clients[1] is client2 # This will fail before the fix
1368+
1369+
1370+
@workflow.defn(
1371+
name="ContinueAsNewWithVersionUpgrade",
1372+
versioning_behavior=VersioningBehavior.PINNED,
1373+
)
1374+
class ContinueAsNewWithVersionUpgradeV1:
1375+
@workflow.run
1376+
async def run(self, attempt: int) -> str:
1377+
if attempt > 0:
1378+
return "v1.0"
1379+
1380+
# Loop waiting for CAN suggestion with version changed
1381+
while True:
1382+
# Trigger a WFT when timer expires, thereby refreshing the continue-as-new-suggested flag
1383+
await asyncio.sleep(0.01)
1384+
info = workflow.info()
1385+
if info.is_target_worker_deployment_version_changed():
1386+
workflow.continue_as_new(
1387+
arg=attempt + 1,
1388+
initial_versioning_behavior=workflow.ContinueAsNewVersioningBehavior.AUTO_UPGRADE,
1389+
)
1390+
1391+
1392+
@workflow.defn(
1393+
name="ContinueAsNewWithVersionUpgrade",
1394+
versioning_behavior=VersioningBehavior.PINNED,
1395+
)
1396+
class ContinueAsNewWithVersionUpgradeV2:
1397+
@workflow.run
1398+
async def run(self, attempt: int) -> str: # type:ignore[reportUnusedParameter]
1399+
return "v2.0"
1400+
1401+
1402+
async def wait_for_workflow_running_on_version(
1403+
handle: WorkflowHandle[Any, Any], expected_build_id: str
1404+
) -> None:
1405+
"""Wait until workflow is RUNNING with expected build ID."""
1406+
1407+
async def check() -> bool:
1408+
desc = await handle.describe()
1409+
if (
1410+
desc.status
1411+
!= temporalio.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1412+
):
1413+
return False
1414+
versioning_info = desc.raw_description.workflow_execution_info.versioning_info
1415+
if not versioning_info.HasField("deployment_version"):
1416+
return False
1417+
return versioning_info.deployment_version.build_id == expected_build_id
1418+
1419+
await assert_eventually(check)
1420+
1421+
1422+
async def test_continue_as_new_with_version_upgrade(
1423+
client: Client, env: WorkflowEnvironment
1424+
):
1425+
if env.supports_time_skipping:
1426+
pytest.skip("Test Server doesn't support worker deployments")
1427+
1428+
deployment_name = f"deployment-can-upgrade-{uuid.uuid4()}"
1429+
v1 = WorkerDeploymentVersion(deployment_name=deployment_name, build_id="1.0")
1430+
v2 = WorkerDeploymentVersion(deployment_name=deployment_name, build_id="2.0")
1431+
1432+
async with (
1433+
new_worker(
1434+
client,
1435+
ContinueAsNewWithVersionUpgradeV1,
1436+
deployment_config=WorkerDeploymentConfig(
1437+
version=v1,
1438+
use_worker_versioning=True,
1439+
),
1440+
) as w1,
1441+
new_worker(
1442+
client,
1443+
ContinueAsNewWithVersionUpgradeV2,
1444+
deployment_config=WorkerDeploymentConfig(
1445+
version=v2,
1446+
use_worker_versioning=True,
1447+
),
1448+
task_queue=w1.task_queue,
1449+
),
1450+
):
1451+
# Wait for the deployment to be ready
1452+
describe_resp = await wait_until_worker_deployment_visible(client, v1)
1453+
1454+
# Set version 1.0 as current
1455+
resp2 = await set_current_deployment_version(
1456+
client, describe_resp.conflict_token, v1
1457+
)
1458+
1459+
# Wait for v1.0-as-Current routing config to be propagated
1460+
await wait_for_worker_deployment_routing_config_propagation(
1461+
client, deployment_name, v1.build_id
1462+
)
1463+
1464+
# Start workflow with v1 as current
1465+
handle = await client.start_workflow(
1466+
"ContinueAsNewWithVersionUpgrade",
1467+
0,
1468+
id=f"test-can-version-upgrade-{uuid.uuid4()}",
1469+
task_queue=w1.task_queue,
1470+
)
1471+
1472+
# Wait for workflow to complete one WFT on v1.0
1473+
await wait_for_workflow_running_on_version(handle, v1.build_id)
1474+
1475+
# Wait for version 2.0 to be ready
1476+
await wait_until_worker_deployment_visible(client, v2)
1477+
1478+
# Set version 2.0 as current
1479+
await set_current_deployment_version(client, resp2.conflict_token, v2)
1480+
1481+
# Wait for v2.0-as-Current routing config to be propagated
1482+
await wait_for_worker_deployment_routing_config_propagation(
1483+
client, deployment_name, v2.build_id
1484+
)
1485+
1486+
# Expect workflow to return "v2.0", indicating that it continued-as-new and completed on v2
1487+
result = await handle.result()
1488+
assert result == "v2.0"

0 commit comments

Comments
 (0)