feat(eventbus): support per-subscriber async dispatch with graceful drain#1455
Open
WyRainBow wants to merge 2 commits intoapache:developfrom
Open
feat(eventbus): support per-subscriber async dispatch with graceful drain#1455WyRainBow wants to merge 2 commits intoapache:developfrom
WyRainBow wants to merge 2 commits intoapache:developfrom
Conversation
|
Contributor
There was a problem hiding this comment.
Pull request overview
Adds opt-in asynchronous dispatching to the core EventBus to prevent slow subscribers from blocking the event pipeline, and wires graceful draining into runtime shutdown.
Changes:
- Introduces
AsyncSubscriberand per-subscriber runtime state with per-subscriber buffered queues + drainer goroutines. - Integrates EventBus graceful drain into runtime stop handling via
runtime.GracefulComponent. - Adds
eventBusconfiguration toAdminConfigand increases the default EventBus buffer size to1024.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/core/runtime/runtime.go | On stop, waits for GracefulComponent implementations to finish (enables graceful drains). |
| pkg/core/events/component.go | Refactors subscriber directory to track per-subscriber state; supports async enqueue + drainer lifecycle; adds graceful drain (WaitForDone). |
| pkg/core/events/async.go | Adds AsyncSubscriber interface and subscriberState for async runtime bookkeeping. |
| pkg/core/engine/subscriber/runtime_instance.go | Marks runtime instance subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/zk_metadata.go | Marks ZK metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/zk_config.go | Marks ZK config subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/service_provider_metadata.go | Marks service provider metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/service_consumer_metadata.go | Marks service consumer metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/rpc_instance.go | Marks RPC instance subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/nacos_service.go | Marks Nacos service subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/instance.go | Marks instance subscriber as async-capable (AsyncEnabled). |
| pkg/config/eventbus/config.go | Raises default EventBus buffer size to 1024. |
| pkg/config/app/admin.go | Adds EventBus config field, defaults, and validation wiring. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+139
to
+142
| for _, com := range components { | ||
| if gc, ok := com.(GracefulComponent); ok { | ||
| gc.WaitForDone() | ||
| } |
Comment on lines
+116
to
+123
| if isAsync { | ||
| state.ch = make(chan Event, b.bufferSize) | ||
| state.done = make(chan struct{}) | ||
| } | ||
| b.subscriberDir[rk] = append(states, state) | ||
| if isAsync && b.started.Load() { | ||
| b.launchDrainer(state) | ||
| } |
Comment on lines
+222
to
+223
|
|
||
| b.wg.Add(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Background
EventBus previously dispatched events synchronously. Slow subscribers could block
Send()and impact the whole event pipeline.What Changed
AsyncSubscriber.AdminConfig1024Core Files
pkg/core/events/async.gopkg/core/events/component.gopkg/config/app/admin.gopkg/config/eventbus/config.goBehavior Summary
select+default), drop with warn when channel is full.ProcessEventbehavior.Notes