Skip to content

Commit 164096b

Browse files
authored
Misc ci fixes (#1380)
* Invert loop and try catch. Exception can rarely occur outside the catch * Add sleep before validating list activities * Add RPC cancelled retries to assert_eventually * Switch from sleep to assert_eventually * Format
1 parent f7da465 commit 164096b

3 files changed

Lines changed: 41 additions & 27 deletions

File tree

tests/helpers/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async def assert_eventually(
7171
*,
7272
timeout: timedelta = timedelta(seconds=10),
7373
interval: timedelta = timedelta(milliseconds=200),
74+
retry_on_rpc_cancelled: bool = True,
7475
) -> T:
7576
start_sec = time.monotonic()
7677
while True:
@@ -80,6 +81,11 @@ async def assert_eventually(
8081
except AssertionError:
8182
if timedelta(seconds=time.monotonic() - start_sec) >= timeout:
8283
raise
84+
except RPCError as e:
85+
if retry_on_rpc_cancelled and e.status == RPCStatusCode.CANCELLED:
86+
continue
87+
else:
88+
raise
8389
await asyncio.sleep(interval.total_seconds())
8490

8591

tests/test_activity.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from temporalio.service import RPCError, RPCStatusCode
2929
from temporalio.testing import WorkflowEnvironment
3030
from temporalio.worker import Worker
31-
from tests.helpers import assert_eq_eventually
31+
from tests.helpers import assert_eq_eventually, assert_eventually
3232

3333

3434
@activity.defn
@@ -507,16 +507,21 @@ async def test_list_activities(client: Client, env: WorkflowEnvironment):
507507
start_to_close_timeout=timedelta(seconds=5),
508508
)
509509

510-
executions = [
511-
e async for e in client.list_activities(f'ActivityId = "{activity_id}"')
512-
]
513-
assert len(executions) == 1
514-
execution = executions[0]
515-
assert execution.activity_id == activity_id
516-
assert execution.activity_type == "increment"
517-
assert execution.task_queue == task_queue
518-
assert execution.status == ActivityExecutionStatus.RUNNING
519-
assert execution.state_transition_count is None # Not set until activity completes
510+
async def check_executions():
511+
executions = [
512+
e async for e in client.list_activities(f'ActivityId = "{activity_id}"')
513+
]
514+
assert len(executions) == 1
515+
execution = executions[0]
516+
assert execution.activity_id == activity_id
517+
assert execution.activity_type == "increment"
518+
assert execution.task_queue == task_queue
519+
assert execution.status == ActivityExecutionStatus.RUNNING
520+
assert (
521+
execution.state_transition_count is None
522+
) # Not set until activity completes
523+
524+
await assert_eventually(check_executions)
520525

521526

522527
async def test_count_activities(client: Client, env: WorkflowEnvironment):

tests/worker/test_workflow.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7725,38 +7725,38 @@ async def test_workflow_missing_local_activity_no_activities(client: Client):
77257725
async def heartbeat_activity(
77267726
catch_err: bool = True,
77277727
) -> temporalio.activity.ActivityCancellationDetails | None:
7728-
while True:
7729-
try:
7728+
try:
7729+
while True:
77307730
activity.heartbeat()
77317731
# If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause.
77327732
if activity.info().heartbeat_details:
77337733
return activity.cancellation_details()
77347734
await asyncio.sleep(0.1)
7735-
except (CancelledError, asyncio.CancelledError) as err:
7736-
if not catch_err:
7737-
raise err
7738-
return activity.cancellation_details()
7739-
finally:
7740-
activity.heartbeat("finally-complete")
7735+
except (CancelledError, asyncio.CancelledError) as err:
7736+
if not catch_err:
7737+
raise err
7738+
return activity.cancellation_details()
7739+
finally:
7740+
activity.heartbeat("finally-complete")
77417741

77427742

77437743
@activity.defn
77447744
def sync_heartbeat_activity(
77457745
catch_err: bool = True,
77467746
) -> temporalio.activity.ActivityCancellationDetails | None:
7747-
while True:
7748-
try:
7747+
try:
7748+
while True:
77497749
activity.heartbeat()
77507750
# If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause.
77517751
if activity.info().heartbeat_details:
77527752
return activity.cancellation_details()
77537753
time.sleep(0.1)
7754-
except (CancelledError, asyncio.CancelledError) as err:
7755-
if not catch_err:
7756-
raise err
7757-
return activity.cancellation_details()
7758-
finally:
7759-
activity.heartbeat("finally-complete")
7754+
except (CancelledError, asyncio.CancelledError) as err:
7755+
if not catch_err:
7756+
raise err
7757+
return activity.cancellation_details()
7758+
finally:
7759+
activity.heartbeat("finally-complete")
77607760

77617761

77627762
@workflow.defn
@@ -7769,6 +7769,7 @@ async def run(
77697769
result.append(
77707770
await workflow.execute_activity(
77717771
sync_heartbeat_activity,
7772+
True,
77727773
activity_id=activity_id,
77737774
start_to_close_timeout=timedelta(seconds=10),
77747775
heartbeat_timeout=timedelta(seconds=2),
@@ -7778,6 +7779,7 @@ async def run(
77787779
result.append(
77797780
await workflow.execute_activity(
77807781
heartbeat_activity,
7782+
True,
77817783
activity_id=f"{activity_id}-2",
77827784
start_to_close_timeout=timedelta(seconds=10),
77837785
heartbeat_timeout=timedelta(seconds=2),
@@ -8348,6 +8350,7 @@ async def test_previous_run_failure(client: Client):
83488350
task_queue=worker.task_queue,
83498351
retry_policy=RetryPolicy(
83508352
initial_interval=timedelta(milliseconds=10),
8353+
maximum_attempts=2,
83518354
),
83528355
)
83538356
result = await handle.result()

0 commit comments

Comments
 (0)