Skip to content

feat(eventbus): support per-subscriber async dispatch with graceful drain#1455

Open
WyRainBow wants to merge 2 commits intoapache:developfrom
WyRainBow:feature/fix-1447
Open

feat(eventbus): support per-subscriber async dispatch with graceful drain#1455
WyRainBow wants to merge 2 commits intoapache:developfrom
WyRainBow:feature/fix-1447

Conversation

@WyRainBow
Copy link
Copy Markdown
Contributor

Background

EventBus previously dispatched events synchronously. Slow subscribers could block Send() and impact the whole event pipeline.

What Changed

  • Added optional async subscriber support via AsyncSubscriber.
  • Refactored EventBus subscriber directory to maintain per-subscriber runtime state.
  • Added per-subscriber async queue and drainer lifecycle control.
  • Added lifecycle guard and graceful integration changes:
    • compile-time graceful component contract check
    • safer config validation path in AdminConfig
    • default eventbus buffer size aligned to 1024

Core Files

  • pkg/core/events/async.go
  • pkg/core/events/component.go
  • pkg/config/app/admin.go
  • pkg/config/eventbus/config.go

Behavior Summary

  • Async subscribers: non-blocking enqueue (select + default), drop with warn when channel is full.
  • Sync subscribers: keep existing synchronous ProcessEvent behavior.
  • Unsubscribe and shutdown paths keep graceful drain semantics.

Notes

  • This PR intentionally includes only core feature/code path updates.
  • Test/documentation local changes are not included in this PR commit scope.

@sonarqubecloud
Copy link
Copy Markdown

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds opt-in asynchronous dispatching to the core EventBus to prevent slow subscribers from blocking the event pipeline, and wires graceful draining into runtime shutdown.

Changes:

  • Introduces AsyncSubscriber and per-subscriber runtime state with per-subscriber buffered queues + drainer goroutines.
  • Integrates EventBus graceful drain into runtime stop handling via runtime.GracefulComponent.
  • Adds eventBus configuration to AdminConfig and increases the default EventBus buffer size to 1024.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pkg/core/runtime/runtime.go On stop, waits for GracefulComponent implementations to finish (enables graceful drains).
pkg/core/events/component.go Refactors subscriber directory to track per-subscriber state; supports async enqueue + drainer lifecycle; adds graceful drain (WaitForDone).
pkg/core/events/async.go Adds AsyncSubscriber interface and subscriberState for async runtime bookkeeping.
pkg/core/engine/subscriber/runtime_instance.go Marks runtime instance subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/zk_metadata.go Marks ZK metadata subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/zk_config.go Marks ZK config subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/service_provider_metadata.go Marks service provider metadata subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/service_consumer_metadata.go Marks service consumer metadata subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/rpc_instance.go Marks RPC instance subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/nacos_service.go Marks Nacos service subscriber as async-capable (AsyncEnabled).
pkg/core/discovery/subscriber/instance.go Marks instance subscriber as async-capable (AsyncEnabled).
pkg/config/eventbus/config.go Raises default EventBus buffer size to 1024.
pkg/config/app/admin.go Adds EventBus config field, defaults, and validation wiring.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +139 to +142
for _, com := range components {
if gc, ok := com.(GracefulComponent); ok {
gc.WaitForDone()
}
Comment on lines +116 to +123
if isAsync {
state.ch = make(chan Event, b.bufferSize)
state.done = make(chan struct{})
}
b.subscriberDir[rk] = append(states, state)
if isAsync && b.started.Load() {
b.launchDrainer(state)
}
Comment on lines +222 to +223

b.wg.Add(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants