Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ readme = "README.md"
dependencies = [
"sentry-arroyo>=2.33.1",
"sentry-sdk[http2]>=2.43.0",
"sentry-protos>=0.4.11",
"sentry-protos>=0.8.5",
"confluent_kafka>=2.3.0",
"cronsim>=2.6",
"grpcio>=1.67.0",
Expand Down
13 changes: 12 additions & 1 deletion clients/python/src/examples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,16 @@ def scheduler() -> None:
help="The number of child processes to start.",
default=2,
)
def worker(rpc_host: str, concurrency: int) -> None:
@click.option(
"--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True
)
@click.option(
"--grpc-port",
help="Port for the gRPC server to listen on.",
default=50052,
type=int,
)
def worker(rpc_host: str, concurrency: int, push_mode: bool, grpc_port: int) -> None:
from taskbroker_client.worker import TaskWorker

click.echo("Starting worker")
Expand All @@ -87,6 +96,8 @@ def worker(rpc_host: str, concurrency: int) -> None:
rebalance_after=32,
processing_pool_name="examples",
process_type="forkserver",
push_mode=push_mode,
grpc_port=grpc_port,
)
exitcode = worker.start()
raise SystemExit(exitcode)
Expand Down
28 changes: 28 additions & 0 deletions clients/python/src/taskbroker_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ class MetricsBackend(Protocol):
An abstract class that defines the interface for metrics backends.
"""

@abstractmethod
def gauge(
self,
key: str,
value: float,
instance: str | None = None,
tags: Tags | None = None,
sample_rate: float = 1,
unit: str | None = None,
stacklevel: int = 0,
) -> None:
"""
Records a gauge metric (a point-in-time value).
"""
raise NotImplementedError

@abstractmethod
def incr(
self,
Expand Down Expand Up @@ -71,6 +87,18 @@ class NoOpMetricsBackend(MetricsBackend):
Default metrics backend that does not record anything.
"""

def gauge(
self,
key: str,
value: float,
instance: str | None = None,
tags: Tags | None = None,
sample_rate: float = 1,
unit: str | None = None,
stacklevel: int = 0,
) -> None:
pass

def incr(
self,
name: str,
Expand Down
14 changes: 13 additions & 1 deletion clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,16 @@ def __init__(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
push_mode: bool = False,
grpc_port: int = 50052,
) -> None:
assert len(hosts) > 0, "You must provide at least one RPC host to connect to"
self._application = application
self._hosts = hosts
self._rpc_secret = rpc_secret
self._metrics = metrics
self._push_mode = push_mode
self._grpc_port = grpc_port

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
Expand All @@ -157,6 +161,7 @@ def __init__(
)

self._cur_host = random.choice(self._hosts)
self._host_to_stubs_lock = threading.Lock()
self._host_to_stubs: dict[str, ConsumerServiceStub] = {
self._cur_host: self._connect_to_host(self._cur_host)
}
Expand Down Expand Up @@ -200,6 +205,12 @@ def _connect_to_host(self, host: str) -> ConsumerServiceStub:
channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets))
return ConsumerServiceStub(channel)

def _get_stub(self, host: str) -> ConsumerServiceStub:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we switch the taskbroker to be a deployment instead of a statefulset all of this logic will go away, the workers don't need to keep track of individual hosts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the workers use the callback_url on the task requests instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think we will even need that, we can just have one service that gets configured that all the workers talk too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One good thing about this is that you can still use push mode with SQLite if you'd like. If we drop callback_url completely, you will have no choice but to use a shared store. It's more flexibility for minimal cost, no?

In our case, callback_url will point to the single broker service that all workers will talk to. But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.

If we're not going to be running sqlite long term, then we should remove it (after providing a way to transition to postgres/in memory)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true, we want to leave it as we do the migration. Once everything is migrated we can point self-hosted to the existing postgres DB.

with self._host_to_stubs_lock:
if host not in self._host_to_stubs:
self._host_to_stubs[host] = self._connect_to_host(host)
return self._host_to_stubs[host]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent lock usage on _host_to_stubs dict

Low Severity

The new _host_to_stubs_lock protects _host_to_stubs in _get_stub, but _get_cur_stub reads and writes the same dict (lines 260–264) without acquiring the lock. In pull mode, _get_cur_stub runs on the main thread while _get_stub runs on the result thread, creating a race on the shared dict with inconsistent locking.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid bug.


def _check_consecutive_unavailable_errors(self) -> None:
if self._num_consecutive_unavailable_errors >= self._max_consecutive_unavailable_errors:
self._temporary_unavailable_hosts[self._cur_host] = (
Expand Down Expand Up @@ -324,10 +335,11 @@ def update_task(
f"Host: {processing_result.host} is temporarily unavailable"
)

stub = self._get_stub(processing_result.host)
with self._metrics.timer(
"taskworker.update_task.rpc", tags={"host": processing_result.host}
):
response = self._host_to_stubs[processing_result.host].SetTaskStatus(request)
response = stub.SetTaskStatus(request)
except grpc.RpcError as err:
self._metrics.incr(
"taskworker.client.rpc_error",
Expand Down
120 changes: 109 additions & 11 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
from typing import Any

import grpc
from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask
from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
FetchNextTask,
PushTaskRequest,
PushTaskResponse,
)

from taskbroker_client.app import import_app
from taskbroker_client.constants import (
Expand All @@ -33,6 +38,34 @@
logger = logging.getLogger(__name__)


class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer):
"""
gRPC servicer that receives task activations pushed from the broker
"""

def __init__(self, worker: TaskWorker) -> None:
self.worker = worker

def PushTask(
self,
request: PushTaskRequest,
context: grpc.ServicerContext,
) -> PushTaskResponse:
"""Handle incoming task activation."""
# Create `InflightTaskActivation` from the pushed task
inflight = InflightTaskActivation(
activation=request.task,
host=request.callback_url,
receive_timestamp=time.monotonic(),
)

# Push the task to the worker queue (wait at most 5 seconds)
if not self.worker.push_task(inflight, timeout=5):
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy")

return PushTaskResponse()


class TaskWorker:
"""
A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations.
Expand Down Expand Up @@ -60,6 +93,8 @@ def __init__(
process_type: str = "spawn",
health_check_file_path: str | None = None,
health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
push_mode: bool = False,
grpc_port: int = 50052,
**kwargs: dict[str, Any],
) -> None:
self.options = kwargs
Expand All @@ -69,6 +104,11 @@ def __init__(
self._concurrency = concurrency
app = import_app(app_module)

if push_mode:
logger.info("Running in PUSH mode")
else:
logger.info("Running in PULL mode")

self.client = TaskbrokerClient(
hosts=broker_hosts,
application=app.name,
Expand All @@ -81,6 +121,8 @@ def __init__(
),
rpc_secret=app.config["rpc_secret"],
grpc_config=app.config["grpc_config"],
push_mode=push_mode,
grpc_port=grpc_port,
)
self._metrics = app.metrics

Expand Down Expand Up @@ -110,12 +152,13 @@ def __init__(

self._processing_pool_name: str = processing_pool_name or "unknown"

self._push_mode = push_mode
self._grpc_port = grpc_port

def start(self) -> int:
"""
Run the worker main loop

Once started a Worker will loop until it is killed, or
completes its max_task_count when it shuts down.
When in PULL mode, this starts a loop that runs until the worker completes its `max_task_count` or it is killed.
When in PUSH mode, this starts the worker gRPC server.
"""
self.start_result_thread()
self.start_spawn_children_thread()
Expand All @@ -128,12 +171,37 @@ def signal_handler(*args: Any) -> None:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

try:
while True:
self.run_once()
except KeyboardInterrupt:
self.shutdown()
raise
if self._push_mode:
server = None

try:
# Start gRPC server
server = grpc.server(ThreadPoolExecutor(max_workers=self._concurrency))
taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server(
WorkerServicer(self), server
)
server.add_insecure_port(f"[::]:{self._grpc_port}")
server.start()
logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port})

# Wait for shutdown signal
server.wait_for_termination()

except KeyboardInterrupt:
# This may be triggered before the server is initialized
if server:
server.stop(grace=5)

self.shutdown()
else:
Comment on lines +193 to +196
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: In push mode, self.shutdown() is not called if the gRPC server terminates without a KeyboardInterrupt, causing child processes and threads to leak.
Severity: MEDIUM

Suggested Fix

Wrap the server.wait_for_termination() call in a try...finally block. The self.shutdown() method should be moved into the finally block to guarantee its execution, ensuring that resources are always cleaned up regardless of how the server terminates.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: clients/python/src/taskbroker_client/worker/worker.py#L193-L196

Potential issue: In push mode, the gRPC server's `start` method has a potential resource
leak. If `server.wait_for_termination()` returns for any reason other than a
`KeyboardInterrupt` (e.g., if `server.stop()` is called from another thread), the
`self.shutdown()` method is never executed. This prevents proper cleanup, leaving child
worker processes and background threads running after the server has stopped. The
current implementation only calls `self.shutdown()` within the `except
KeyboardInterrupt` block, failing to account for other graceful shutdown scenarios.

try:
while True:
self.run_once()
except KeyboardInterrupt:
self.shutdown()
raise

return 0

def run_once(self) -> None:
"""Access point for tests to run a single worker loop"""
Expand Down Expand Up @@ -172,6 +240,36 @@ def shutdown(self) -> None:

logger.info("taskworker.worker.shutdown.complete")

def push_task(self, inflight: InflightTaskActivation, timeout: float | None = None) -> bool:
"""
Push a task to child tasks queue.

When timeout is `None`, blocks until the queue has space. When timeout is
set (e.g. 5.0), waits at most that many seconds and returns `False` if the
queue is still full (worker busy).
"""
try:
self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize())
except Exception as e:
# 'qsize' does not work on macOS
logger.warning(f"Could not report size of child tasks queue - {e}")

start_time = time.monotonic()
try:
self._child_tasks.put(inflight, timeout=timeout)
except queue.Full:
self._metrics.incr(
"taskworker.worker.push_task.busy",
tags={"processing_pool": self._processing_pool_name},
)
return False
self._metrics.distribution(
"taskworker.worker.child_task.put.duration",
time.monotonic() - start_time,
tags={"processing_pool": self._processing_pool_name},
)
return True

def _add_task(self) -> bool:
"""
Add a task to child tasks queue. Returns False if no new task was fetched.
Expand Down
Loading
Loading