-
Notifications
You must be signed in to change notification settings - Fork 4
Worker event stream #1665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Worker event stream #1665
Conversation
5c00f7a to
6942be7
Compare
|
Questions:
|
cmwhited
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to keep reviewing. But I have concerns about the KafkaConfig
like I commented:
- it is currently duplicated between the kafka client crate and the config crate
- it doesn't expose all available connection params, including things like auth, which is concerning
| package amp.worker.events.v1; | ||
|
|
||
| // Event envelope - common wrapper for all events | ||
| message WorkerEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if my dataset has 3 tables, would each table emit this full WorkerEvent envelope? Seems like yes?
wonder if it would be less noise to make the payload an array for each tables status? so I could subscribe to a dataset stream and get the status of each table, to compute a total status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's right.
So a 3 table dataset would emit 3x sync started events.
The partitioning is {namespace}/{name}/{hash}/{table_name} so consumers can subscribe to specific tables and get ordering guarantees per table.
Initially I had it so that I didn't have the lifecycle events following this process, but in the end I ended up deciding to include those as well.
21c4346 to
0d4e966
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
|
As a piece of advice, this is a considerable PR (many logical changes and many files changed). Although it is ok from the POV of having an overall vision of the feature, I would recommend that you work on small incremental PR instead, as big PRs are prone to reworks due to merge conflicts. Also, I recommend that you run the |
b808ecf to
18552bf
Compare
1ebcfb2 to
c0da2c5
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
| let notification_multiplexer = Arc::new(notification_multiplexer::spawn(metadata_db.clone())); | ||
|
|
||
| // Use injected event emitter or create one based on configuration | ||
| let event_emitter: Arc<dyn EventEmitter> = match event_emitter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this more readable? More linear? 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will add a helper function.
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The E2E tests are the tests for the fixtures. This adds maintenance overhead with little value, as the fixture is not production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, no worries.
Good point. I will remove it.
| [group: 'codegen'] | ||
| gen-worker-events-proto: | ||
| RUSTFLAGS="--cfg gen_worker_proto" cargo check -p worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also add this to gen target
|
|
||
| /// Test that completed events are recorded correctly. | ||
| #[tokio::test] | ||
| async fn test_completed_events_recorded() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test_* in the function name is redundant. We already know that this is a test 😅
The test function names should be descriptive to facilitate the test case navigation and the test coverage assessment.
Run the /code-review skill and tell Claude/Codex to focus on the test coding patterns. You can ask it to suggest better fn names and improvements based on the guidelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright :)
| async fn test_progress_reporter_forwards_to_emitter() { | ||
| logging::init(); | ||
|
|
||
| // Given |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip/Nit: If you use //* Given instead of // Given, it helps to differentiate sections from comments
This is also stated in the coding guidelines.
Summary
Adds opt-in Kafka event streaming for worker sync progress, enabling real-time monitoring of dataset sync jobs.
sync.started,sync.progress,sync.completed,sync.failed) per-tableSee app-ampd-worker-events.md for full documentation.
raw_datasetcouples the lifecycle timing of all tables because they share a single block stream.job_idprovides job-level view.ProgressCallbacktrait avoids async overhead;WorkerProgressCallbackbridges to async viatokio::spawn.Test plan