-
-
Notifications
You must be signed in to change notification settings - Fork 5
feat(taskworker): Add Push Mode to Taskworker #576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
70ee047
357099b
eef475a
1988ba9
1df88aa
43ec727
b61581c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -139,12 +139,16 @@ def __init__( | |
| health_check_settings: HealthCheckSettings | None = None, | ||
| rpc_secret: str | None = None, | ||
| grpc_config: str | None = None, | ||
| push_mode: bool = False, | ||
| grpc_port: int = 50052, | ||
| ) -> None: | ||
| assert len(hosts) > 0, "You must provide at least one RPC host to connect to" | ||
| self._application = application | ||
| self._hosts = hosts | ||
| self._rpc_secret = rpc_secret | ||
| self._metrics = metrics | ||
| self._push_mode = push_mode | ||
| self._grpc_port = grpc_port | ||
|
|
||
| self._grpc_options: list[tuple[str, Any]] = [ | ||
| ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) | ||
|
|
@@ -157,6 +161,7 @@ def __init__( | |
| ) | ||
|
|
||
| self._cur_host = random.choice(self._hosts) | ||
| self._host_to_stubs_lock = threading.Lock() | ||
| self._host_to_stubs: dict[str, ConsumerServiceStub] = { | ||
| self._cur_host: self._connect_to_host(self._cur_host) | ||
| } | ||
|
|
@@ -200,6 +205,12 @@ def _connect_to_host(self, host: str) -> ConsumerServiceStub: | |
| channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets)) | ||
| return ConsumerServiceStub(channel) | ||
|
|
||
| def _get_stub(self, host: str) -> ConsumerServiceStub: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once we switch the taskbroker to be a deployment instead of a statefulset all of this logic will go away, the workers don't need to keep track of individual hosts.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will the workers use the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually don't think we will even need that, we can just have one service that gets configured that all the workers talk too.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One good thing about this is that you can still use push mode with SQLite if you'd like. If we drop In our case,
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we're not going to be running sqlite long term, then we should remove it (after providing a way to transition to postgres/in memory)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that's true, we want to leave it as we do the migration. Once everything is migrated we can point self-hosted to the existing postgres DB. |
||
| with self._host_to_stubs_lock: | ||
| if host not in self._host_to_stubs: | ||
| self._host_to_stubs[host] = self._connect_to_host(host) | ||
| return self._host_to_stubs[host] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent lock usage on
|
||
|
|
||
| def _check_consecutive_unavailable_errors(self) -> None: | ||
| if self._num_consecutive_unavailable_errors >= self._max_consecutive_unavailable_errors: | ||
| self._temporary_unavailable_hosts[self._cur_host] = ( | ||
|
|
@@ -324,10 +335,11 @@ def update_task( | |
| f"Host: {processing_result.host} is temporarily unavailable" | ||
| ) | ||
|
|
||
| stub = self._get_stub(processing_result.host) | ||
| with self._metrics.timer( | ||
| "taskworker.update_task.rpc", tags={"host": processing_result.host} | ||
| ): | ||
| response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) | ||
| response = stub.SetTaskStatus(request) | ||
| except grpc.RpcError as err: | ||
| self._metrics.incr( | ||
| "taskworker.client.rpc_error", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,12 @@ | |
| from typing import Any | ||
|
|
||
| import grpc | ||
| from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask | ||
| from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc | ||
| from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( | ||
| FetchNextTask, | ||
| PushTaskRequest, | ||
| PushTaskResponse, | ||
| ) | ||
|
|
||
| from taskbroker_client.app import import_app | ||
| from taskbroker_client.constants import ( | ||
|
|
@@ -33,6 +38,34 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): | ||
| """ | ||
| gRPC servicer that receives task activations pushed from the broker | ||
| """ | ||
|
|
||
| def __init__(self, worker: TaskWorker) -> None: | ||
| self.worker = worker | ||
|
|
||
| def PushTask( | ||
| self, | ||
| request: PushTaskRequest, | ||
| context: grpc.ServicerContext, | ||
| ) -> PushTaskResponse: | ||
| """Handle incoming task activation.""" | ||
| # Create `InflightTaskActivation` from the pushed task | ||
| inflight = InflightTaskActivation( | ||
| activation=request.task, | ||
| host=request.callback_url, | ||
| receive_timestamp=time.monotonic(), | ||
| ) | ||
|
|
||
| # Push the task to the worker queue (wait at most 5 seconds) | ||
| if not self.worker.push_task(inflight, timeout=5): | ||
| context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") | ||
markstory marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return PushTaskResponse() | ||
james-mcnulty marked this conversation as resolved.
Show resolved
Hide resolved
james-mcnulty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class TaskWorker: | ||
| """ | ||
| A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. | ||
|
|
@@ -60,6 +93,8 @@ def __init__( | |
| process_type: str = "spawn", | ||
| health_check_file_path: str | None = None, | ||
| health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, | ||
| push_mode: bool = False, | ||
| grpc_port: int = 50052, | ||
| **kwargs: dict[str, Any], | ||
| ) -> None: | ||
| self.options = kwargs | ||
|
|
@@ -69,6 +104,11 @@ def __init__( | |
| self._concurrency = concurrency | ||
| app = import_app(app_module) | ||
|
|
||
| if push_mode: | ||
| logger.info("Running in PUSH mode") | ||
| else: | ||
| logger.info("Running in PULL mode") | ||
|
|
||
| self.client = TaskbrokerClient( | ||
| hosts=broker_hosts, | ||
| application=app.name, | ||
|
|
@@ -81,6 +121,8 @@ def __init__( | |
| ), | ||
| rpc_secret=app.config["rpc_secret"], | ||
| grpc_config=app.config["grpc_config"], | ||
| push_mode=push_mode, | ||
| grpc_port=grpc_port, | ||
| ) | ||
| self._metrics = app.metrics | ||
|
|
||
|
|
@@ -110,12 +152,13 @@ def __init__( | |
|
|
||
| self._processing_pool_name: str = processing_pool_name or "unknown" | ||
|
|
||
| self._push_mode = push_mode | ||
| self._grpc_port = grpc_port | ||
|
|
||
| def start(self) -> int: | ||
| """ | ||
| Run the worker main loop | ||
|
|
||
| Once started a Worker will loop until it is killed, or | ||
| completes its max_task_count when it shuts down. | ||
| When in PULL mode, this starts a loop that runs until the worker completes its `max_task_count` or it is killed. | ||
| When in PUSH mode, this starts the worker gRPC server. | ||
| """ | ||
| self.start_result_thread() | ||
| self.start_spawn_children_thread() | ||
|
|
@@ -128,12 +171,37 @@ def signal_handler(*args: Any) -> None: | |
| signal.signal(signal.SIGINT, signal_handler) | ||
| signal.signal(signal.SIGTERM, signal_handler) | ||
|
|
||
| try: | ||
| while True: | ||
| self.run_once() | ||
| except KeyboardInterrupt: | ||
| self.shutdown() | ||
| raise | ||
| if self._push_mode: | ||
| server = None | ||
|
|
||
| try: | ||
| # Start gRPC server | ||
james-mcnulty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| server = grpc.server(ThreadPoolExecutor(max_workers=self._concurrency)) | ||
| taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( | ||
| WorkerServicer(self), server | ||
| ) | ||
| server.add_insecure_port(f"[::]:{self._grpc_port}") | ||
| server.start() | ||
| logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) | ||
|
|
||
| # Wait for shutdown signal | ||
| server.wait_for_termination() | ||
|
|
||
| except KeyboardInterrupt: | ||
| # This may be triggered before the server is initialized | ||
| if server: | ||
| server.stop(grace=5) | ||
sentry[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| self.shutdown() | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| else: | ||
|
Comment on lines
+193
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: In push mode, Suggested FixWrap the Prompt for AI Agent |
||
| try: | ||
| while True: | ||
| self.run_once() | ||
| except KeyboardInterrupt: | ||
| self.shutdown() | ||
| raise | ||
|
|
||
| return 0 | ||
|
|
||
| def run_once(self) -> None: | ||
| """Access point for tests to run a single worker loop""" | ||
|
|
@@ -172,6 +240,36 @@ def shutdown(self) -> None: | |
|
|
||
| logger.info("taskworker.worker.shutdown.complete") | ||
|
|
||
| def push_task(self, inflight: InflightTaskActivation, timeout: float | None = None) -> bool: | ||
| """ | ||
| Push a task to child tasks queue. | ||
|
|
||
| When timeout is `None`, blocks until the queue has space. When timeout is | ||
| set (e.g. 5.0), waits at most that many seconds and returns `False` if the | ||
| queue is still full (worker busy). | ||
| """ | ||
| try: | ||
| self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize()) | ||
| except Exception as e: | ||
| # 'qsize' does not work on macOS | ||
| logger.warning(f"Could not report size of child tasks queue - {e}") | ||
|
|
||
| start_time = time.monotonic() | ||
| try: | ||
| self._child_tasks.put(inflight, timeout=timeout) | ||
| except queue.Full: | ||
| self._metrics.incr( | ||
| "taskworker.worker.push_task.busy", | ||
| tags={"processing_pool": self._processing_pool_name}, | ||
| ) | ||
| return False | ||
| self._metrics.distribution( | ||
| "taskworker.worker.child_task.put.duration", | ||
| time.monotonic() - start_time, | ||
| tags={"processing_pool": self._processing_pool_name}, | ||
| ) | ||
| return True | ||
|
|
||
james-mcnulty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def _add_task(self) -> bool: | ||
| """ | ||
| Add a task to child tasks queue. Returns False if no new task was fetched. | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.