From 70ee047a03eee5e9aae9a25cd5aa08e7d205c90c Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 18 Mar 2026 17:07:56 -0700 Subject: [PATCH 1/7] Add Push Mode to Taskworker + Unit Tests --- clients/python/pyproject.toml | 2 +- clients/python/src/examples/cli.py | 8 +- .../python/src/taskbroker_client/metrics.py | 28 +++ .../src/taskbroker_client/worker/client.py | 14 +- .../src/taskbroker_client/worker/worker.py | 118 ++++++++++- clients/python/tests/worker/test_worker.py | 194 +++++++++++++++++- uv.lock | 8 +- 7 files changed, 353 insertions(+), 19 deletions(-) diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index 1dc56621..33d51c77 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" dependencies = [ "sentry-arroyo>=2.33.1", "sentry-sdk[http2]>=2.43.0", - "sentry-protos>=0.4.11", + "sentry-protos>=0.8.5", "confluent_kafka>=2.3.0", "cronsim>=2.6", "grpcio>=1.67.0", diff --git a/clients/python/src/examples/cli.py b/clients/python/src/examples/cli.py index 26cd8e01..d58761bc 100644 --- a/clients/python/src/examples/cli.py +++ b/clients/python/src/examples/cli.py @@ -73,7 +73,12 @@ def scheduler() -> None: help="The number of child processes to start.", default=2, ) -def worker(rpc_host: str, concurrency: int) -> None: +@click.option( + "--push-mode", + help="Whether to run in PUSH or PULL mode.", + default=False, +) +def worker(rpc_host: str, concurrency: int, push_mode: bool) -> None: from taskbroker_client.worker import TaskWorker click.echo("Starting worker") @@ -87,6 +92,7 @@ def worker(rpc_host: str, concurrency: int) -> None: rebalance_after=32, processing_pool_name="examples", process_type="forkserver", + push_mode=push_mode, ) exitcode = worker.start() raise SystemExit(exitcode) diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index f48fdf27..6324029a 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -14,6 +14,22 @@ class MetricsBackend(Protocol): An abstract class that defines the interface for metrics backends. """ + @abstractmethod + def gauge( + self, + key: str, + value: float, + instance: str | None = None, + tags: Tags | None = None, + sample_rate: float = 1, + unit: str | None = None, + stacklevel: int = 0, + ) -> None: + """ + Records a gauge metric (a point-in-time value). + """ + raise NotImplementedError + @abstractmethod def incr( self, @@ -71,6 +87,18 @@ class NoOpMetricsBackend(MetricsBackend): Default metrics backend that does not record anything. """ + def gauge( + self, + key: str, + value: float, + instance: str | None = None, + tags: Tags | None = None, + sample_rate: float = 1, + unit: str | None = None, + stacklevel: int = 0, + ) -> None: + pass + def incr( self, name: str, diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 0670af9c..f19aeaf9 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -139,12 +139,16 @@ def __init__( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + push_mode: bool = False, + grpc_port: int = 50052, ) -> None: assert len(hosts) > 0, "You must provide at least one RPC host to connect to" self._application = application self._hosts = hosts self._rpc_secret = rpc_secret self._metrics = metrics + self._push_mode = push_mode + self._grpc_port = grpc_port self._grpc_options: list[tuple[str, Any]] = [ ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) @@ -157,6 +161,7 @@ def __init__( ) self._cur_host = random.choice(self._hosts) + self._host_to_stubs_lock = threading.Lock() self._host_to_stubs: dict[str, ConsumerServiceStub] = { self._cur_host: self._connect_to_host(self._cur_host) } @@ -200,6 +205,12 @@ def _connect_to_host(self, host: str) -> ConsumerServiceStub: channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets)) return ConsumerServiceStub(channel) + def _get_stub(self, host: str) -> ConsumerServiceStub: + with self._host_to_stubs_lock: + if host not in self._host_to_stubs: + self._host_to_stubs[host] = self._connect_to_host(host) + return self._host_to_stubs[host] + def _check_consecutive_unavailable_errors(self) -> None: if self._num_consecutive_unavailable_errors >= self._max_consecutive_unavailable_errors: self._temporary_unavailable_hosts[self._cur_host] = ( @@ -324,10 +335,11 @@ def update_task( f"Host: {processing_result.host} is temporarily unavailable" ) + stub = self._get_stub(processing_result.host) with self._metrics.timer( "taskworker.update_task.rpc", tags={"host": processing_result.host} ): - response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) + response = stub.SetTaskStatus(request) except grpc.RpcError as err: self._metrics.incr( "taskworker.client.rpc_error", diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index ca05073f..6855668e 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -13,7 +13,12 @@ from typing import Any import grpc -from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask +from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc +from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( + FetchNextTask, + PushTaskRequest, + PushTaskResponse, +) from taskbroker_client.app import import_app from taskbroker_client.constants import ( @@ -33,6 +38,34 @@ logger = logging.getLogger(__name__) +class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): + """ + gRPC servicer that receives task activations pushed from the broker + """ + + def __init__(self, worker: TaskWorker) -> None: + self.worker = worker + + def PushTask( + self, + request: PushTaskRequest, + context: grpc.ServicerContext, + ) -> PushTaskResponse: + """Handle incoming task activation.""" + # Create `InflightTaskActivation` from the pushed task + inflight = InflightTaskActivation( + activation=request.task, + host=request.callback_url, + receive_timestamp=time.monotonic(), + ) + + # Push the task to the worker queue (wait at most 5 seconds) + if not self.worker.push_task(inflight, timeout=5): + context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") + + return PushTaskResponse() + + class TaskWorker: """ A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. @@ -60,6 +93,8 @@ def __init__( process_type: str = "spawn", health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, + push_mode: bool = False, + grpc_port: int = 50052, **kwargs: dict[str, Any], ) -> None: self.options = kwargs @@ -69,6 +104,11 @@ def __init__( self._concurrency = concurrency app = import_app(app_module) + if push_mode: + logger.info("Running in PUSH mode") + else: + logger.info("Running in PULL mode") + self.client = TaskbrokerClient( hosts=broker_hosts, application=app.name, @@ -81,6 +121,8 @@ def __init__( ), rpc_secret=app.config["rpc_secret"], grpc_config=app.config["grpc_config"], + push_mode=push_mode, + grpc_port=grpc_port, ) self._metrics = app.metrics @@ -110,12 +152,13 @@ def __init__( self._processing_pool_name: str = processing_pool_name or "unknown" + self._push_mode = push_mode + self._grpc_port = grpc_port + def start(self) -> int: """ - Run the worker main loop - - Once started a Worker will loop until it is killed, or - completes its max_task_count when it shuts down. + When in PULL mode, this starts a loop that runs until the worker completes its `max_task_count` or it is killed. + When in PUSH mode, this starts the worker gRPC server. """ self.start_result_thread() self.start_spawn_children_thread() @@ -128,12 +171,32 @@ def signal_handler(*args: Any) -> None: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - try: - while True: - self.run_once() - except KeyboardInterrupt: - self.shutdown() - raise + if self._push_mode: + try: + # Start gRPC server + server = grpc.server(ThreadPoolExecutor(max_workers=self._concurrency)) + taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( + WorkerServicer(self), server + ) + server.add_insecure_port(f"[::]:{self._grpc_port}") + server.start() + logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) + + # Wait for shutdown signal + server.wait_for_termination() + + except KeyboardInterrupt: + server.stop(grace=5) + self.shutdown() + else: + try: + while True: + self.run_once() + except KeyboardInterrupt: + self.shutdown() + raise + + return 0 def run_once(self) -> None: """Access point for tests to run a single worker loop""" @@ -172,6 +235,39 @@ def shutdown(self) -> None: logger.info("taskworker.worker.shutdown.complete") + def push_task(self, inflight: InflightTaskActivation, timeout: float | None = None) -> bool: + """ + Push a task to child tasks queue. + + When timeout is `None`, blocks until the queue has space. When timeout is + set (e.g. 5.0), waits at most that many seconds and returns `False` if the + queue is still full (worker busy). + """ + try: + self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize()) + except Exception as e: + # `qsize` does not work on all machines + logger.warning(f"Could not report size of child tasks queue - {e}") + + start_time = time.monotonic() + try: + if timeout is None: + self._child_tasks.put(inflight) + else: + self._child_tasks.put(inflight, timeout=timeout) + except queue.Full: + self._metrics.incr( + "taskworker.worker.push_task.busy", + tags={"processing_pool": self._processing_pool_name}, + ) + return False + self._metrics.distribution( + "taskworker.worker.child_task.put.duration", + time.monotonic() - start_time, + tags={"processing_pool": self._processing_pool_name}, + ) + return True + def _add_task(self) -> bool: """ Add a task to child tasks queue. Returns False if no new task was fetched. diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 8bfb35b2..84c27398 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -16,6 +16,8 @@ TASK_ACTIVATION_STATUS_COMPLETE, TASK_ACTIVATION_STATUS_FAILURE, TASK_ACTIVATION_STATUS_RETRY, + PushTaskRequest, + PushTaskResponse, RetryState, TaskActivation, ) @@ -25,7 +27,7 @@ from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import current_task from taskbroker_client.types import InflightTaskActivation, ProcessingResult -from taskbroker_client.worker.worker import TaskWorker +from taskbroker_client.worker.worker import TaskWorker, WorkerServicer from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded, child_process SIMPLE_TASK = InflightTaskActivation( @@ -300,6 +302,91 @@ def get_task_response(*args: Any, **kwargs: Any) -> InflightTaskActivation | Non assert mock_client.get_task.called assert mock_client.update_task.call_count == 3 + def test_push_task_success_no_timeout(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + child_tasks_queue_maxsize=2, + ) + mock_metrics = mock.MagicMock() + taskworker._metrics = mock_metrics + mock_queue = mock.MagicMock() + mock_queue.full.return_value = False + taskworker._child_tasks = mock_queue + + result = taskworker.push_task(SIMPLE_TASK, timeout=None) + + self.assertTrue(result) + mock_metrics.gauge.assert_called_once_with("taskworker.child_tasks.size", mock.ANY) + mock_metrics.distribution.assert_called_once_with( + "taskworker.worker.child_task.put.duration", + mock.ANY, + tags={"processing_pool": "unknown"}, + ) + mock_queue.put.assert_called_once_with(SIMPLE_TASK) + + def test_push_task_success_with_timeout(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + child_tasks_queue_maxsize=2, + ) + taskworker._metrics = mock.MagicMock() + mock_queue = mock.MagicMock() + taskworker._child_tasks = mock_queue + + result = taskworker.push_task(SIMPLE_TASK, timeout=1.0) + + self.assertTrue(result) + mock_queue.put.assert_called_once_with(SIMPLE_TASK, timeout=1.0) + + def test_push_task_queue_full_returns_false_and_incr_busy(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + child_tasks_queue_maxsize=1, + ) + taskworker._metrics = mock.MagicMock() + mock_queue = mock.MagicMock() + mock_queue.qsize.return_value = 1 + mock_queue.put.side_effect = queue.Full + taskworker._child_tasks = mock_queue + + result = taskworker.push_task(RETRY_TASK, timeout=0.01) + + self.assertFalse(result) + taskworker._metrics.incr.assert_called_once_with( + "taskworker.worker.push_task.busy", + tags={"processing_pool": "unknown"}, + ) + self.assertEqual(mock_queue.put.call_count, 1) + + def test_push_task_gauge_exception_still_enqueues(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + child_tasks_queue_maxsize=2, + ) + mock_metrics = mock.MagicMock() + mock_metrics.gauge.side_effect = RuntimeError("qsize not supported") + taskworker._metrics = mock_metrics + mock_queue = mock.MagicMock() + taskworker._child_tasks = mock_queue + + result = taskworker.push_task(SIMPLE_TASK, timeout=None) + + self.assertTrue(result) + mock_queue.put.assert_called_once_with(SIMPLE_TASK) + mock_metrics.distribution.assert_called_once() + def test_run_once_current_task_state(self) -> None: # Run a task that uses retry_task() helper # to raise and catch a NoRetriesRemainingError @@ -348,6 +435,111 @@ def update_task_response(*args: Any, **kwargs: Any) -> None: assert redis.get("no-retries-remaining"), "key should exist if except block was hit" redis.delete("no-retries-remaining") + def test_constructor_push_mode_and_grpc_port_stored(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + push_mode=True, + grpc_port=50099, + ) + self.assertTrue(taskworker._push_mode) + self.assertEqual(taskworker._grpc_port, 50099) + self.assertTrue(taskworker.client._push_mode) + self.assertEqual(taskworker.client._grpc_port, 50099) + + def test_constructor_pull_mode_default(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + ) + self.assertFalse(taskworker._push_mode) + self.assertEqual(taskworker._grpc_port, 50052) + + def test_start_push_mode_server_creation_and_shutdown(self) -> None: + mock_server = mock.MagicMock() + mock_server.wait_for_termination.side_effect = KeyboardInterrupt() + + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + push_mode=True, + grpc_port=50060, + ) + with mock.patch("taskbroker_client.worker.worker.grpc.server") as mock_grpc_server: + mock_grpc_server.return_value = mock_server + with mock.patch("taskbroker_client.worker.worker.ThreadPoolExecutor") as mock_tpe: + with mock.patch( + "taskbroker_client.worker.worker.taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server" + ) as mock_add_servicer: + exitcode = taskworker.start() + + self.assertEqual(exitcode, 0) + mock_grpc_server.assert_called_once() + self.assertEqual(mock_tpe.call_args[1]["max_workers"], 1) + mock_add_servicer.assert_called_once() + self.assertIsInstance(mock_add_servicer.call_args[0][0], WorkerServicer) + self.assertEqual(mock_add_servicer.call_args[0][1], mock_server) + mock_server.add_insecure_port.assert_called_once_with("[::]:50060") + mock_server.start.assert_called_once() + mock_server.wait_for_termination.assert_called_once() + mock_server.stop.assert_called_once_with(grace=5) + self.assertTrue(taskworker._shutdown_event.is_set()) + + +class TestWorkerServicer(TestCase): + def test_push_task_success(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + push_mode=True, + ) + with mock.patch.object(taskworker, "push_task", return_value=True) as mock_push_task: + request = PushTaskRequest( + task=SIMPLE_TASK.activation, + callback_url="broker-host:50051", + ) + mock_context = mock.MagicMock() + servicer = WorkerServicer(taskworker) + + response = servicer.PushTask(request, mock_context) + + self.assertIsInstance(response, PushTaskResponse) + mock_context.abort.assert_not_called() + mock_push_task.assert_called_once_with(mock.ANY, timeout=5) + (inflight,) = mock_push_task.call_args[0] + self.assertEqual(inflight.activation.id, SIMPLE_TASK.activation.id) + self.assertEqual(inflight.host, "broker-host:50051") + + def test_push_task_worker_busy(self) -> None: + taskworker = TaskWorker( + app_module="examples.app:app", + broker_hosts=["127.0.0.1:50051"], + max_child_task_count=100, + process_type="fork", + child_tasks_queue_maxsize=1, + ) + with mock.patch.object(taskworker, "push_task", return_value=False): + request = PushTaskRequest( + task=SIMPLE_TASK.activation, + callback_url="broker-host:50051", + ) + mock_context = mock.MagicMock() + servicer = WorkerServicer(taskworker) + + servicer.PushTask(request, mock_context) + + mock_context.abort.assert_called_once_with( + grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy" + ) + @mock.patch("taskbroker_client.worker.workerchild.capture_checkin") def test_child_process_complete(mock_capture_checkin: mock.MagicMock) -> None: diff --git a/uv.lock b/uv.lock index fdf8f228..941ab117 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" resolution-markers = [ "sys_platform == 'darwin' or sys_platform == 'linux'", @@ -546,7 +546,7 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.4.11" +version = "0.8.6" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -554,7 +554,7 @@ dependencies = [ { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.4.11-py3-none-any.whl", hash = "sha256:d60709cd9989679fbe1ca1a9a02393a3af59292da333905d5b28beaa04220352" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.8.6-py3-none-any.whl", hash = "sha256:bffd32fae9df928a1d4fc519c1ff02fa3ba8fac7bf8ba0ea6495b1eb353575ef" }, ] [[package]] @@ -698,7 +698,7 @@ requires-dist = [ { name = "redis", specifier = ">=3.4.1" }, { name = "redis-py-cluster", specifier = ">=2.1.0" }, { name = "sentry-arroyo", specifier = ">=2.33.1" }, - { name = "sentry-protos", specifier = ">=0.4.11" }, + { name = "sentry-protos", specifier = ">=0.8.5" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.43.0" }, { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" }, From 357099be684597368a0b5e0c1d0f1270bb53e6cc Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 18 Mar 2026 17:42:20 -0700 Subject: [PATCH 2/7] Fix Worker CLI Options --- clients/python/src/examples/cli.py | 16 ++++++++++++---- config.yaml | 2 ++ 2 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 config.yaml diff --git a/clients/python/src/examples/cli.py b/clients/python/src/examples/cli.py index d58761bc..28088fb3 100644 --- a/clients/python/src/examples/cli.py +++ b/clients/python/src/examples/cli.py @@ -74,11 +74,18 @@ def scheduler() -> None: default=2, ) @click.option( - "--push-mode", - help="Whether to run in PUSH or PULL mode.", - default=False, + "--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True ) -def worker(rpc_host: str, concurrency: int, push_mode: bool) -> None: +@click.option( + "--grpc-port", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True +) +@click.option( + "--grpc-port", + help="Port for the gRPC server to listen on.", + default=50052, + type=int, +) +def worker(rpc_host: str, concurrency: int, push_mode: bool, grpc_port: int) -> None: from taskbroker_client.worker import TaskWorker click.echo("Starting worker") @@ -93,6 +100,7 @@ def worker(rpc_host: str, concurrency: int, push_mode: bool) -> None: processing_pool_name="examples", process_type="forkserver", push_mode=push_mode, + grpc_port=grpc_port, ) exitcode = worker.start() raise SystemExit(exitcode) diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000..26d870f0 --- /dev/null +++ b/config.yaml @@ -0,0 +1,2 @@ +push_mode: true +log_filter: "debug,sqlx=debug,librdkafka=warn,h2=off" From eef475aa2d06f2c4e29e132d50bea386199c6031 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 18 Mar 2026 18:03:53 -0700 Subject: [PATCH 3/7] Fix gRPC Port CLI Option Duplicate --- clients/python/src/examples/cli.py | 3 --- config.yaml | 2 -- 2 files changed, 5 deletions(-) delete mode 100644 config.yaml diff --git a/clients/python/src/examples/cli.py b/clients/python/src/examples/cli.py index 28088fb3..02ce6e40 100644 --- a/clients/python/src/examples/cli.py +++ b/clients/python/src/examples/cli.py @@ -76,9 +76,6 @@ def scheduler() -> None: @click.option( "--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True ) -@click.option( - "--grpc-port", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True -) @click.option( "--grpc-port", help="Port for the gRPC server to listen on.", diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 26d870f0..00000000 --- a/config.yaml +++ /dev/null @@ -1,2 +0,0 @@ -push_mode: true -log_filter: "debug,sqlx=debug,librdkafka=warn,h2=off" From 1988ba9a97f2bd0b1c2b6a01b0c4f5ef7913463b Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 18 Mar 2026 18:49:28 -0700 Subject: [PATCH 4/7] Stop Server Only if Exists --- clients/python/src/taskbroker_client/worker/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 6855668e..9937b1d8 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -186,7 +186,10 @@ def signal_handler(*args: Any) -> None: server.wait_for_termination() except KeyboardInterrupt: - server.stop(grace=5) + # This may be triggered before the server is initialized + if server: + server.stop(grace=5) + self.shutdown() else: try: From 1df88aa6982f759aa8de3141114288ec94f8a298 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 18 Mar 2026 19:13:48 -0700 Subject: [PATCH 5/7] Make Sure Server Variable Exists --- clients/python/src/taskbroker_client/worker/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 9937b1d8..50e42cb3 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -172,6 +172,8 @@ def signal_handler(*args: Any) -> None: signal.signal(signal.SIGTERM, signal_handler) if self._push_mode: + server = None + try: # Start gRPC server server = grpc.server(ThreadPoolExecutor(max_workers=self._concurrency)) From 43ec7275d8a201f407863ebe6bbee52866f284d4 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 19 Mar 2026 11:21:18 -0700 Subject: [PATCH 6/7] Pass Timeout to Queue Put Directly --- clients/python/src/taskbroker_client/worker/worker.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 50e42cb3..efd75e9b 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -251,15 +251,12 @@ def push_task(self, inflight: InflightTaskActivation, timeout: float | None = No try: self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize()) except Exception as e: - # `qsize` does not work on all machines + # 'qsize' does not work on macOS logger.warning(f"Could not report size of child tasks queue - {e}") start_time = time.monotonic() try: - if timeout is None: - self._child_tasks.put(inflight) - else: - self._child_tasks.put(inflight, timeout=timeout) + self._child_tasks.put(inflight, timeout=timeout) except queue.Full: self._metrics.incr( "taskworker.worker.push_task.busy", From b61581ca193e6dd100cccf4b2e4ff392cad3af26 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 19 Mar 2026 13:47:06 -0700 Subject: [PATCH 7/7] Fix Python Client Tests --- clients/python/tests/worker/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 84c27398..3244c719 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -325,7 +325,7 @@ def test_push_task_success_no_timeout(self) -> None: mock.ANY, tags={"processing_pool": "unknown"}, ) - mock_queue.put.assert_called_once_with(SIMPLE_TASK) + mock_queue.put.assert_called_once_with(SIMPLE_TASK, timeout=None) def test_push_task_success_with_timeout(self) -> None: taskworker = TaskWorker( @@ -384,7 +384,7 @@ def test_push_task_gauge_exception_still_enqueues(self) -> None: result = taskworker.push_task(SIMPLE_TASK, timeout=None) self.assertTrue(result) - mock_queue.put.assert_called_once_with(SIMPLE_TASK) + mock_queue.put.assert_called_once_with(SIMPLE_TASK, timeout=None) mock_metrics.distribution.assert_called_once() def test_run_once_current_task_state(self) -> None: