Skip to content

Conversation

@mitchhs12
Copy link
Contributor

@mitchhs12 mitchhs12 commented Jan 30, 2026

Summary

Adds opt-in Kafka event streaming for worker sync progress, enabling real-time monitoring of dataset sync jobs.

  • Emit lifecycle events (sync.started, sync.progress, sync.completed, sync.failed) per-table
  • Protobuf encoding via prost; Kafka delivery via rskafka (no librdkafka dependency)
  • 1% throttling for progress events to emit every 1% increment; avoids noise on high-throughput chains
  • Configurable partition count (default: 16)
  • Best-effort delivery with retry backoff (1s, 5s, 60s) - Kafka failures don't fail jobs

See app-ampd-worker-events.md for full documentation.

  • Events are per-table (correct for Kafka partitioning), but raw_dataset couples the lifecycle timing of all tables because they share a single block stream.
  • Retry policy uses fixed delays (1s, 5s, 60s), max 66s wait. Silent failure after exhaustion is intentional: events are best-effort.
  • Per-table partitioning enables consumer parallelism; aggregation by job_id provides job-level view.
  • Sync ProgressCallback trait avoids async overhead; WorkerProgressCallback bridges to async via tokio::spawn.

Test plan

  • Unit tests for retry backoff and error handling
  • Integration tests for all event types
  • E2E test with Anvil verifying full lifecycle (3 tables × lifecycle events)

@mitchhs12 mitchhs12 self-assigned this Jan 30, 2026
@mitchhs12 mitchhs12 force-pushed the worker-event-stream branch from 5c00f7a to 6942be7 Compare January 30, 2026 18:21
@mitchhs12
Copy link
Contributor Author

mitchhs12 commented Jan 30, 2026

Questions:

  1. Should we add a sync.caught_up event for continuous datasets when they first reach chain head? I think this could be useful for consumers.
  2. When a chain reorg causes progress to go backwards, should we:
    • Emit a sync.reorg event?
    • Suppress backwards progress events?
    • Let consumers handle it (current behavior)?

@mitchhs12 mitchhs12 closed this Jan 30, 2026
@mitchhs12 mitchhs12 reopened this Jan 30, 2026
@mitchhs12 mitchhs12 requested review from LNSD and cmwhited January 30, 2026 18:32
Copy link
Contributor

@cmwhited cmwhited left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to keep reviewing. But I have concerns about the KafkaConfig
like I commented:

  • it is currently duplicated between the kafka client crate and the config crate
  • it doesn't expose all available connection params, including things like auth, which is concerning

package amp.worker.events.v1;

// Event envelope - common wrapper for all events
message WorkerEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if my dataset has 3 tables, would each table emit this full WorkerEvent envelope? Seems like yes?
wonder if it would be less noise to make the payload an array for each tables status? so I could subscribe to a dataset stream and get the status of each table, to compute a total status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right.
So a 3 table dataset would emit 3x sync started events.
The partitioning is {namespace}/{name}/{hash}/{table_name} so consumers can subscribe to specific tables and get ordering guarantees per table.
Initially I had it so that I didn't have the lifecycle events following this process, but in the end I ended up deciding to include those as well.

@mitchhs12 mitchhs12 force-pushed the worker-event-stream branch 2 times, most recently from 21c4346 to 0d4e966 Compare February 2, 2026 18:04
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

@LNSD
Copy link
Contributor

LNSD commented Feb 3, 2026

As a piece of advice, this is a considerable PR (many logical changes and many files changed). Although it is ok from the POV of having an overall vision of the feature, I would recommend that you work on small incremental PR instead, as big PRs are prone to reworks due to merge conflicts.

Also, I recommend that you run the /code-review agent skill locally to catch code style issues in advance and accelerate the feedback loop.

@mitchhs12 mitchhs12 force-pushed the worker-event-stream branch 4 times, most recently from b808ecf to 18552bf Compare February 4, 2026 19:59
@mitchhs12 mitchhs12 force-pushed the worker-event-stream branch from 1ebcfb2 to c0da2c5 Compare February 4, 2026 20:27
@mitchhs12 mitchhs12 requested review from LNSD and cmwhited February 4, 2026 20:30
Copy link
Contributor

@LNSD LNSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check my comments 🙂

let notification_multiplexer = Arc::new(notification_multiplexer::spawn(metadata_db.clone()));

// Use injected event emitter or create one based on configuration
let event_emitter: Arc<dyn EventEmitter> = match event_emitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this more readable? More linear? 👀

https://github.com/zakirullin/cognitive-load#nested-ifs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will add a helper function.

Comment on lines +191 to +192
#[cfg(test)]
mod tests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The E2E tests are the tests for the fixtures. This adds maintenance overhead with little value, as the fixture is not production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no worries.
Good point. I will remove it.

Comment on lines +311 to +313
[group: 'codegen']
gen-worker-events-proto:
RUSTFLAGS="--cfg gen_worker_proto" cargo check -p worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also add this to gen target


/// Test that completed events are recorded correctly.
#[tokio::test]
async fn test_completed_events_recorded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test_* in the function name is redundant. We already know that this is a test 😅

The test function names should be descriptive to facilitate the test case navigation and the test coverage assessment.

Run the /code-review skill and tell Claude/Codex to focus on the test coding patterns. You can ask it to suggest better fn names and improvements based on the guidelines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright :)

async fn test_progress_reporter_forwards_to_emitter() {
logging::init();

// Given
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip/Nit: If you use //* Given instead of // Given, it helps to differentiate sections from comments

This is also stated in the coding guidelines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants