Skip to content

Commit 595e88b

Browse files
committed
fix: signal when dt reader stream is ready within wf client start call (dapr#901)
* fix: signal when dt reader stream is ready within wf client start call Signed-off-by: Samantha Coyle <sam@diagrid.io> * style: appease linter Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix: enable configurability + lint fixes Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix(build): add tests and fix bug Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix(build): add more tests Signed-off-by: Samantha Coyle <sam@diagrid.io> * style: appease linter Signed-off-by: Samantha Coyle <sam@diagrid.io> * fix(build): add another test Signed-off-by: Samantha Coyle <sam@diagrid.io> * test: add even more tests for build to pass lol Signed-off-by: Samantha Coyle <sam@diagrid.io> --------- Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent bfacebe commit 595e88b

3 files changed

Lines changed: 229 additions & 11 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,8 @@ def warning(self, msg, *args, **kwargs):
3232
def error(self, msg, *args, **kwargs):
3333
self._logger.error(msg, *args, **kwargs)
3434

35+
def exception(self, msg, *args, **kwargs):
36+
self._logger.exception(msg, *args, **kwargs)
37+
3538
def critical(self, msg, *args, **kwargs):
3639
self._logger.critical(msg, *args, **kwargs)

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""
1515

1616
import inspect
17+
import time
1718
from functools import wraps
1819
from typing import Optional, Sequence, TypeVar, Union
1920

@@ -54,8 +55,11 @@ def __init__(
5455
maximum_concurrent_activity_work_items: Optional[int] = None,
5556
maximum_concurrent_orchestration_work_items: Optional[int] = None,
5657
maximum_thread_pool_workers: Optional[int] = None,
58+
worker_ready_timeout: Optional[float] = None,
5759
):
5860
self._logger = Logger('WorkflowRuntime', logger_options)
61+
self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout
62+
5963
metadata = tuple()
6064
if settings.DAPR_API_TOKEN:
6165
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
@@ -86,10 +90,20 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):
8690

8791
def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
8892
"""Responsible to call Workflow function in orchestrationWrapper"""
89-
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
90-
if inp is None:
91-
return fn(daprWfContext)
92-
return fn(daprWfContext, inp)
93+
instance_id = getattr(ctx, 'instance_id', 'unknown')
94+
95+
try:
96+
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
97+
if inp is None:
98+
result = fn(daprWfContext)
99+
else:
100+
result = fn(daprWfContext, inp)
101+
return result
102+
except Exception as e:
103+
self._logger.exception(
104+
f'Workflow execution failed - instance_id: {instance_id}, error: {e}'
105+
)
106+
raise
93107

94108
if hasattr(fn, '_workflow_registered'):
95109
# whenever a workflow is registered, it has a _dapr_alternate_name attribute
@@ -152,10 +166,20 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None):
152166

153167
def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
154168
"""Responsible to call Activity function in activityWrapper"""
155-
wfActivityContext = WorkflowActivityContext(ctx)
156-
if inp is None:
157-
return fn(wfActivityContext)
158-
return fn(wfActivityContext, inp)
169+
activity_id = getattr(ctx, 'task_id', 'unknown')
170+
171+
try:
172+
wfActivityContext = WorkflowActivityContext(ctx)
173+
if inp is None:
174+
result = fn(wfActivityContext)
175+
else:
176+
result = fn(wfActivityContext, inp)
177+
return result
178+
except Exception as e:
179+
self._logger.exception(
180+
f'Activity execution failed - task_id: {activity_id}, error: {e}'
181+
)
182+
raise
159183

160184
if hasattr(fn, '_activity_registered'):
161185
# whenever an activity is registered, it has a _dapr_alternate_name attribute
@@ -174,13 +198,77 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
174198
)
175199
fn.__dict__['_activity_registered'] = True
176200

201+
def wait_for_worker_ready(self, timeout: float = 30.0) -> bool:
202+
"""
203+
Wait for the worker's gRPC stream to become ready to receive work items.
204+
This method polls the worker's is_worker_ready() method until it returns True
205+
or the timeout is reached.
206+
207+
Args:
208+
timeout: Maximum time in seconds to wait for the worker to be ready.
209+
Defaults to 30 seconds.
210+
211+
Returns:
212+
True if the worker's gRPC stream is ready to receive work items, False if timeout.
213+
"""
214+
if not hasattr(self.__worker, 'is_worker_ready'):
215+
return False
216+
217+
elapsed = 0.0
218+
poll_interval = 0.1 # 100ms
219+
220+
while elapsed < timeout:
221+
if self.__worker.is_worker_ready():
222+
return True
223+
time.sleep(poll_interval)
224+
elapsed += poll_interval
225+
226+
self._logger.warning(
227+
f'WorkflowRuntime worker readiness check timed out after {timeout} seconds'
228+
)
229+
return False
230+
177231
def start(self):
178-
"""Starts the listening for work items on a background thread."""
179-
self.__worker.start()
232+
"""Starts the listening for work items on a background thread.
233+
This method waits for the worker's gRPC stream to be fully initialized
234+
before returning, ensuring that workflows can be scheduled immediately
235+
after start() completes.
236+
"""
237+
try:
238+
try:
239+
self.__worker.start()
240+
except Exception as start_error:
241+
self._logger.exception(f'WorkflowRuntime worker did not start: {start_error}')
242+
raise
243+
244+
# Verify the worker and its stream reader are ready
245+
if hasattr(self.__worker, 'is_worker_ready'):
246+
try:
247+
is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout)
248+
if not is_ready:
249+
raise RuntimeError('WorkflowRuntime worker and its stream are not ready')
250+
else:
251+
self._logger.debug(
252+
'WorkflowRuntime worker is ready and its stream can receive work items'
253+
)
254+
except Exception as ready_error:
255+
self._logger.exception(
256+
f'WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}'
257+
)
258+
raise ready_error
259+
else:
260+
self._logger.warning(
261+
'Unable to verify stream readiness. Workflows scheduled immediately may not be received.'
262+
)
263+
except Exception:
264+
raise
180265

181266
def shutdown(self):
182267
"""Stops the listening for work items on a background thread."""
183-
self.__worker.stop()
268+
try:
269+
self.__worker.stop()
270+
except Exception:
271+
raise
184272

185273
def versioned_workflow(
186274
self,

ext/dapr-ext-workflow/tests/test_workflow_runtime.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@
2626

2727

2828
class FakeTaskHubGrpcWorker:
29+
def __init__(self):
30+
self._orchestrator_fns = {}
31+
self._activity_fns = {}
32+
2933
def add_named_orchestrator(self, name: str, fn):
3034
listOrchestrators.append(name)
35+
self._orchestrator_fns[name] = fn
3136

3237
def add_named_activity(self, name: str, fn):
3338
listActivities.append(name)
39+
self._activity_fns[name] = fn
3440

3541

3642
class WorkflowRuntimeTest(unittest.TestCase):
@@ -171,3 +177,124 @@ def test_decorator_register_optinal_name(self):
171177
wanted_activity = ['test_act']
172178
assert listActivities == wanted_activity
173179
assert client_act._dapr_alternate_name == 'test_act'
180+
181+
182+
class WorkflowRuntimeWorkerReadyTest(unittest.TestCase):
183+
"""Tests for wait_for_worker_ready() and start() stream readiness."""
184+
185+
def setUp(self):
186+
listActivities.clear()
187+
listOrchestrators.clear()
188+
mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start()
189+
self.runtime = WorkflowRuntime()
190+
191+
def test_wait_for_worker_ready_returns_false_when_no_is_worker_ready(self):
192+
mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry'])
193+
del mock_worker.is_worker_ready
194+
self.runtime._WorkflowRuntime__worker = mock_worker
195+
self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.1))
196+
197+
def test_wait_for_worker_ready_returns_true_when_ready(self):
198+
mock_worker = mock.MagicMock()
199+
mock_worker.is_worker_ready.return_value = True
200+
self.runtime._WorkflowRuntime__worker = mock_worker
201+
self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0))
202+
mock_worker.is_worker_ready.assert_called()
203+
204+
def test_wait_for_worker_ready_returns_true_after_poll(self):
205+
"""Worker becomes ready on second poll."""
206+
mock_worker = mock.MagicMock()
207+
mock_worker.is_worker_ready.side_effect = [False, True]
208+
self.runtime._WorkflowRuntime__worker = mock_worker
209+
self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0))
210+
self.assertEqual(mock_worker.is_worker_ready.call_count, 2)
211+
212+
def test_wait_for_worker_ready_returns_false_on_timeout(self):
213+
mock_worker = mock.MagicMock()
214+
mock_worker.is_worker_ready.return_value = False
215+
self.runtime._WorkflowRuntime__worker = mock_worker
216+
self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.2))
217+
218+
def test_start_succeeds_when_worker_ready(self):
219+
mock_worker = mock.MagicMock()
220+
mock_worker.is_worker_ready.return_value = True
221+
self.runtime._WorkflowRuntime__worker = mock_worker
222+
self.runtime.start()
223+
mock_worker.start.assert_called_once()
224+
mock_worker.is_worker_ready.assert_called()
225+
226+
def test_start_logs_debug_when_worker_stream_ready(self):
227+
"""start() logs at debug when worker and stream are ready."""
228+
mock_worker = mock.MagicMock()
229+
mock_worker.is_worker_ready.return_value = True
230+
self.runtime._WorkflowRuntime__worker = mock_worker
231+
with mock.patch.object(self.runtime._logger, 'debug') as mock_debug:
232+
self.runtime.start()
233+
mock_debug.assert_called_once()
234+
call_args = mock_debug.call_args[0][0]
235+
self.assertIn('ready', call_args)
236+
self.assertIn('stream', call_args)
237+
238+
def test_start_logs_exception_when_worker_start_fails(self):
239+
"""start() logs exception when worker.start() raises."""
240+
mock_worker = mock.MagicMock()
241+
mock_worker.start.side_effect = RuntimeError('start failed')
242+
self.runtime._WorkflowRuntime__worker = mock_worker
243+
with mock.patch.object(self.runtime._logger, 'exception') as mock_exception:
244+
with self.assertRaises(RuntimeError):
245+
self.runtime.start()
246+
mock_exception.assert_called_once()
247+
self.assertIn('did not start', mock_exception.call_args[0][0])
248+
249+
def test_start_raises_when_worker_not_ready(self):
250+
listActivities.clear()
251+
listOrchestrators.clear()
252+
mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start()
253+
runtime = WorkflowRuntime(worker_ready_timeout=0.2)
254+
mock_worker = mock.MagicMock()
255+
mock_worker.is_worker_ready.return_value = False
256+
runtime._WorkflowRuntime__worker = mock_worker
257+
with self.assertRaises(RuntimeError) as ctx:
258+
runtime.start()
259+
self.assertIn('not ready', str(ctx.exception))
260+
261+
def test_start_logs_warning_when_no_is_worker_ready(self):
262+
mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry'])
263+
del mock_worker.is_worker_ready
264+
self.runtime._WorkflowRuntime__worker = mock_worker
265+
self.runtime.start()
266+
mock_worker.start.assert_called_once()
267+
268+
def test_worker_ready_timeout_init(self):
269+
listActivities.clear()
270+
listOrchestrators.clear()
271+
mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start()
272+
rt = WorkflowRuntime(worker_ready_timeout=15.0)
273+
self.assertEqual(rt._worker_ready_timeout, 15.0)
274+
275+
def test_start_raises_when_worker_start_fails(self):
276+
mock_worker = mock.MagicMock()
277+
mock_worker.is_worker_ready.return_value = True
278+
mock_worker.start.side_effect = RuntimeError('start failed')
279+
self.runtime._WorkflowRuntime__worker = mock_worker
280+
with self.assertRaises(RuntimeError) as ctx:
281+
self.runtime.start()
282+
self.assertIn('start failed', str(ctx.exception))
283+
mock_worker.start.assert_called_once()
284+
285+
def test_start_raises_when_wait_for_worker_ready_raises(self):
286+
mock_worker = mock.MagicMock()
287+
mock_worker.start.return_value = None
288+
mock_worker.is_worker_ready.side_effect = ValueError('ready check failed')
289+
self.runtime._WorkflowRuntime__worker = mock_worker
290+
with self.assertRaises(ValueError) as ctx:
291+
self.runtime.start()
292+
self.assertIn('ready check failed', str(ctx.exception))
293+
294+
def test_shutdown_raises_when_worker_stop_fails(self):
295+
mock_worker = mock.MagicMock()
296+
mock_worker.stop.side_effect = RuntimeError('stop failed')
297+
self.runtime._WorkflowRuntime__worker = mock_worker
298+
with self.assertRaises(RuntimeError) as ctx:
299+
self.runtime.shutdown()
300+
self.assertIn('stop failed', str(ctx.exception))

0 commit comments

Comments
 (0)