feat(taskbroker): Add Push Mode to Taskbroker#573
feat(taskbroker): Add Push Mode to Taskbroker#573james-mcnulty wants to merge 17 commits intomainfrom
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: GRPC server address ignores configurable
grpc_addrfield- Updated gRPC bind address construction to use
config.grpc_addrtogether withconfig.grpc_portinstead of hardcoding0.0.0.0.
- Updated gRPC bind address construction to use
- ✅ Fixed: Callback URL missing protocol scheme for worker callbacks
- Changed push callback URL formatting to include the
http://scheme so workers receive a valid URI.
- Changed push callback URL formatting to include the
- ✅ Fixed:
push_threads=0causes deadlock unlike guardedfetch_threads- Aligned push worker spawning with fetch behavior by using
self.config.push_threads.max(1)to guarantee at least one consumer.
- Aligned push worker spawning with fetch behavior by using
Or push these changes by commenting:
@cursor push effe733488
Preview (effe733488)
diff --git a/src/main.rs b/src/main.rs
--- a/src/main.rs
+++ b/src/main.rs
@@ -196,7 +196,7 @@
let config = config.clone();
async move {
- let addr = format!("0.0.0.0:{}", config.grpc_port)
+ let addr = format!("{}:{}", config.grpc_addr, config.grpc_port)
.parse()
.expect("Failed to parse address");
diff --git a/src/push.rs b/src/push.rs
--- a/src/push.rs
+++ b/src/push.rs
@@ -57,11 +57,11 @@
pub async fn start(&self) -> Result<()> {
let mut handles = vec![];
- for _ in 0..self.config.push_threads {
+ for _ in 0..self.config.push_threads.max(1) {
let endpoint = self.config.worker_endpoint.clone();
let callback_url = format!(
- "{}:{}",
+ "http://{}:{}",
self.config.callback_addr, self.config.callback_port
);This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
evanh
left a comment
There was a problem hiding this comment.
There is now a potential deadlock scenario where the push channels get full and the fetch activations block trying to send. Is that tracked somewhere with a metric?
src/fetch.rs
Outdated
|
|
||
| debug!("Fetching next pending activation..."); | ||
|
|
||
| match store.get_pending_activation(None, None).await { |
There was a problem hiding this comment.
This will need to use the namespace and application parameters passed into it in order to work correctly.
There was a problem hiding this comment.
This is actually something I wanted to bring up. Right now, namespace and application come from the get_task request body. In push mode, there is no easy way to know what values to use here. Should they be provided in the configuration?
There was a problem hiding this comment.
If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?
There was a problem hiding this comment.
Should they be provided in the configuration?
Yes this is how it will have to work.
There was a problem hiding this comment.
Done! Config now takes an optional application and an optional list of namespaces.
src/fetch.rs
Outdated
|
|
||
| Err(e) => { | ||
| error!("Failed to fetch pending activation - {:?}", e); | ||
| sleep(Duration::from_millis(100)).await; |
There was a problem hiding this comment.
There is no need for a sleep here.
There was a problem hiding this comment.
I had a sleep there because if this fails, it's either because the store is having issues (e.g. due to scaling AlloyDB up) or because the push queue is full. In both cases it makes sense to wait a little, no?
There was a problem hiding this comment.
Can we tell the difference between the two? I would handle those two scenarios differently. For the queue being full I would wait, but for an actual error we might want to take other actions (e.g. crash the entire producer).
There was a problem hiding this comment.
Yes, definitely. Any idea what should happen if it's the queue being full versus a store error? For now, I can just distinguish between the two and simply log which one it was without doing anything else until we decide for sure.
| ```bash | ||
| # Run unit/integration tests | ||
| make test | ||
| make unit-test |
There was a problem hiding this comment.
It's make unit-test, not make test - that doesn't do anything right now.
There was a problem hiding this comment.
Or rather, there is no test target in the Makefile for doing both unit and integration tests, which seemed to be the intention here.
| fetch_threads: 1, | ||
| push_threads: 1, | ||
| push_queue_size: 1, | ||
| worker_endpoint: "http://127.0.0.1:50052".into(), |
There was a problem hiding this comment.
Is this the port that we'll be using for the worker in self-hosted and local dev? Ideally local development 'just works' and doesn't require additional configuration.
src/fetch.rs
Outdated
|
|
||
| debug!("Fetching next pending activation..."); | ||
|
|
||
| match store.get_pending_activation(None, None).await { |
There was a problem hiding this comment.
If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?
fpacifici
left a comment
There was a problem hiding this comment.
Will do a more in depth review later.
Though please consider doing a refactoring of the config before to separate the push attribute from the pull ones.
That would require its own PR.
src/config.rs
Outdated
| /// Run the taskbroker in push mode (as opposed to pull mode). | ||
| pub push_mode: bool, |
There was a problem hiding this comment.
Please change the push_mode boolean into a delivery_mode.
Now I think this configuration will be very hard to set. We have more than ten push specific config elements and more than 10 are pull specific. We need some ways to make it more intuitive.
One option would be to give it a structure, though I do not know whether we rely on these fields to be simple fields. @evanh may know better.
Otherwise, a common pattern for these scenarios is to prefix the poll specific parameters with poll_ and the push specific ones with push_
There was a problem hiding this comment.
Please avoid a helper module. It risks becoming a god object without a clear responsibility. If we need a function to spawn pools have a tokio module for this
There was a problem hiding this comment.
Created a tokio module as suggested.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| .get_pending_activations_from_namespaces( | ||
| config.application.as_deref(), | ||
| config.namespaces.as_deref(), | ||
| Some(1), | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
Bug: In push mode, fetch_activation bypasses validation, causing it to fetch tasks across all applications if only namespaces are configured without an application, breaking tenant isolation.
Severity: HIGH
Suggested Fix
Ensure that fetch_activation in src/fetch/mod.rs validates its configuration. Before calling store.get_pending_activations_from_namespaces, add a check to confirm that if config.namespaces is Some, then config.application must also be Some. If the validation fails, log a warning and return early, mirroring the logic in get_pending_activation.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/fetch/mod.rs#L118-L123
Potential issue: In push mode, if `TASKBROKER_NAMESPACES` is configured without
`TASKBROKER_APPLICATION`, the `fetch_activation` function bypasses a critical validation
check. It calls `get_pending_activations_from_namespaces` directly, which proceeds to
fetch tasks from the specified namespaces across all applications. This breaks the
intended multi-tenant isolation model by allowing unintended cross-application task
leakage. The pull mode is not affected as it correctly uses a wrapper function
containing the necessary validation logic.
| /// - `Ok(true)` if an activation was found | ||
| /// - `Ok(false)` if none pending | ||
| /// - `Err` if fetching failed. | ||
| pub async fn fetch_activation<T: TaskPusher>( |
There was a problem hiding this comment.
Why is this a standalone function instead of part of TaskPusher? That would cut down on the number of clone calls (which are memcpy commands).
|
|
||
| // Instead of returning when `fetch_activation` fails, we just try again | ||
| match fetch_activation(store.clone(), pusher.clone(), config.clone()).await { | ||
| Ok(false) | Err(_) => { |
There was a problem hiding this comment.
We should separate these cases. I would move any error handling (logging etc.) up to this function. That way we also handle any unexpected errors.


Linear
Completes STREAM-820
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
This PR allows users to run the taskbroker in push mode that can be adjusted using several new configuration parameters.
TASKBROKER_PUSH_MODEboolfalseTASKBROKER_FETCH_THREADSusize1TASKBROKER_PUSH_THREADSusize1TASKBROKER_PUSH_QUEUE_SIZEusize1TASKBROKER_WORKER_ENDPOINTStringhttp://127.0.0.1:50052TASKBROKER_CALLBACK_ADDRString0.0.0.0TASKBROKER_CALLBACK_PORTusize50051TASKBROKER_FETCH_WAIT_MSu64100TASKBROKER_PUSH_TIMEOUT_MSu645000Push Threads
On startup, the taskbroker now creates a "push pool," which is a pool of push threads. All of them wait to receive activations from the same MPMC channel provided by the
flumecrate. When a push thread receives an activation, it sends it to the worker service. Note that each push thread has its own connection to the worker service.Push threads are grouped together by the
PushPooldata structure, which exposes astartmethod to actually spawn the threads and asubmitmethod to receive activations.Fetch Threads
On startup, the taskbroker also creates a "fetch pool," which is a pool of fetch threads. Each one retrieves a pending activation from the store, passes it to the push pool (waiting until it accepts), and repeats.
Notes on Naming
Fetch threads and push threads are actually asynchronous tasks provided by the Tokio crate. They are not real threads.
Details
Dependencies
flume0.12.0 as a dependency (I didn't want to add any dependencies, but Tokio does not provide an asynchronous MPMC queue - only MPSC)sentry-protosfrom 0.4.11 to 0.8.5 (to use the new worker service schema)tonic,tonic-health,prost, andprost-typesto 0.14 (to match the version used bysentry-protos)Additions
FetchPoolabstraction insrc/fetch.rsPushPoolabstraction insrc/push.rssrc/main.rsModifications
get_taskwhen operating in push modeFuture Changes