Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 125 additions & 53 deletions src/pubsub/src/publisher/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::options::BatchingOptions;
use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
use crate::publisher::batch::Batch;
use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinSet;
use tokio_util::task::JoinMap;
Expand Down Expand Up @@ -74,38 +75,6 @@ impl Dispatcher {
}
}

pub(crate) fn spawn_actor(
&mut self,
key: String,
tasks: &mut JoinMap<String, ()>,
) -> BatchActorHandle {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
if key.is_empty() {
tasks.spawn(
key,
ConcurrentBatchActor::new(
self.topic_name.clone(),
self.client.clone(),
self.batching_options.clone(),
rx,
)
.run(),
);
} else {
tasks.spawn(
key,
SequentialBatchActor::new(
self.topic_name.clone(),
self.client.clone(),
self.batching_options.clone(),
rx,
)
.run(),
);
}
BatchActorHandle { sender: tx }
}

/// The main loop of the Dispatcher.
///
/// This method continuously handles the following events:
Expand All @@ -121,9 +90,17 @@ impl Dispatcher {
/// The loop terminates when the `rx` channel is closed, which happens when all
/// `Publisher` clones have been dropped.
pub(crate) async fn run(mut self) {
// A dictionary of ordering key to outstanding publish operations.
// We batch publish operations on the same ordering key together.
// A dictionary of ordering keys to batch actors.
// Publish operations on the same ordering key are processed and batched together.
// Publish without ordering keys are treated as having the key "".
// If there are no batch actor for an ordering key, then we spawn a new batch actor
// in actor_tasks.
// A batch actor will shutdown and close its channel when it has no
// additional messages to process.
// When this occurs, we join the actor's task and remove its entry.
// Since sequential actors requires batches to complete before the next
// batch can be sent, we keep a buffer of messages that arrives after the channel
// is closed. This buffer is passed onto the next batch actor if needed.
let mut batch_actors: HashMap<String, BatchActorHandle> = HashMap::new();
let mut actor_tasks: JoinMap<String, ()> = JoinMap::new();
let delay = self.batching_options.delay_threshold;
Expand All @@ -132,9 +109,24 @@ impl Dispatcher {
tokio::pin!(timer);
loop {
tokio::select! {
_ = actor_tasks.join_next(), if !actor_tasks.is_empty() => {
// TODO(#4012): Remove batch actors when there are no outstanding operations
// on the ordering key.
Some((key, _)) = actor_tasks.join_next(), if !actor_tasks.is_empty() => {
if let Some(batch_actor) = batch_actors.get_mut(&key) {
if !batch_actor.buffer.1.is_empty() {
batch_actors.entry(key.clone()).and_modify(|handle| {
handle.respawn_actor(
self.topic_name.clone(),
self.client.clone(),
self.batching_options.clone(),
key,
&mut actor_tasks,
)
});
} else {
// The batch actor has shutdown and there are no buffered messages
// to process. It is safe to delete the entry.
batch_actors.remove(&key);
}
}
continue;
}
// Currently, the Dispatcher periodically flushes all batches on a shared timer.
Expand All @@ -154,20 +146,22 @@ impl Dispatcher {
match msg {
Some(ToDispatcher::Publish(msg)) => {
let ordering_key = msg.msg.ordering_key.clone();
let batch_actor = batch_actors
.entry(ordering_key.clone())
.or_insert_with(|| self.spawn_actor(ordering_key.clone(), &mut actor_tasks));
if batch_actor.sender.send(ToBatchActor::Publish(msg)).is_err() {
return; // Stop the dispatcher if a batch actor is dropped.
}
let batch_actor = batch_actors.entry(ordering_key.clone()).or_insert_with(|| {
BatchActorHandle::new(
self.topic_name.clone(),
self.client.clone(),
self.batching_options.clone(),
ordering_key,
&mut actor_tasks,
)
});
batch_actor.send(ToBatchActor::Publish(msg));
},
Some(ToDispatcher::Flush(tx)) => {
let mut flush_set = JoinSet::new();
for (_, batch_actor) in batch_actors.iter() {
for (_, batch_actor) in batch_actors.iter_mut() {
let (tx, rx) = oneshot::channel();
if batch_actor.sender.send(ToBatchActor::Flush(tx)).is_err() {
return; // Stop the dispatcher if a batch actor is dropped.
}
batch_actor.send(ToBatchActor::Flush(tx));
flush_set.spawn(rx);
}
tokio::spawn(async move {
Expand All @@ -180,9 +174,7 @@ impl Dispatcher {
if let Some(batch_actor) = batch_actors.get_mut(&ordering_key) {
// Send down the same tx for the BatchActors to directly signal completion
// instead of spawning a new task.
if batch_actor.sender.send(ToBatchActor::ResumePublish()).is_err() {
return; // Stop the dispatcher if a batch actor is dropped.
}
batch_actor.send(ToBatchActor::ResumePublish());
}
}
None => {
Expand All @@ -205,9 +197,83 @@ impl Dispatcher {
}
}

fn spawn_actor(
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
key: String,
tasks: &mut JoinMap<String, ()>,
receiver: mpsc::UnboundedReceiver<ToBatchActor>,
) {
if key.is_empty() {
tasks.spawn(
key,
ConcurrentBatchActor::new(topic, client, batching_options, receiver).run(),
);
} else {
tasks.spawn(
key,
SequentialBatchActor::new(topic, client, batching_options, receiver).run(),
);
}
}

#[derive(Debug)]
pub(crate) struct BatchActorHandle {
sender: mpsc::UnboundedSender<ToBatchActor>,
// Use buffer to store new messages when the batch actor is in shutdown
// (sender is closed) but has not yet joined the Dispatcher loop.
buffer: (
mpsc::UnboundedSender<ToBatchActor>,
mpsc::UnboundedReceiver<ToBatchActor>,
),
}

impl BatchActorHandle {
// Create a new BatchActorHandle and spawn a batch actor to process messages.
pub fn new(
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
key: String,
tasks: &mut JoinMap<String, ()>,
) -> Self {
let (rx, tx) = tokio::sync::mpsc::unbounded_channel::<ToBatchActor>();
spawn_actor(topic, client, batching_options, key, tasks, tx);
let next = tokio::sync::mpsc::unbounded_channel::<ToBatchActor>();
BatchActorHandle {
sender: rx,
buffer: next,
}
}

// Respawn a batch actor to process new messages.
// This should be used only when the previous batch actor has shutdown.
pub fn respawn_actor(
&mut self,
topic: String,
client: GapicPublisher,
batching_options: BatchingOptions,
key: String,
tasks: &mut JoinMap<String, ()>,
) {
let (rx, tx) = std::mem::replace(
&mut self.buffer,
tokio::sync::mpsc::unbounded_channel::<ToBatchActor>(),
);
_ = std::mem::replace(&mut self.sender, rx);
spawn_actor(topic, client, batching_options, key, tasks, tx);
}

// Send msg to the batch actor or the buffer if sender is closed.
pub(crate) fn send(&mut self, msg: ToBatchActor) {
if let Err(SendError(msg)) = self.sender.send(msg) {
self.buffer
.0
.send(msg)
.expect("buffer should not be closed");
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -315,6 +381,9 @@ impl ConcurrentBatchActor {
self.context.topic.clone(),
inflight,
);
// The batch is now empty, we can close the channel and begin shutdown.
// Any remaining messages in the channel will be handled by the run loop.
self.context.rx.close();
}
}

Expand Down Expand Up @@ -461,6 +530,12 @@ impl SequentialBatchActor {
);
}
self.handle_inflight_join(inflight.join_next().await);
// The batch is now empty and there are no pending messages.
// If we did not pause (from the inflight join), then we can begin shutdown.
// Any remaining messages in the channel will be handled by the run loop.
if !self.paused {
self.context.rx.close();
}
}

// Move message to the pending batch respecting batch thresholds
Expand Down Expand Up @@ -869,9 +944,6 @@ mod tests {
.run(),
);

// Flush on empty.
assert_flush!(actor_tx);

// Publish 10 messages then Flush.
let start = tokio::time::Instant::now();
let mut publish_rxs = VecDeque::new();
Expand Down
1 change: 0 additions & 1 deletion src/pubsub/src/publisher/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ mod tests {
let mut mock = MockGapicPublisher::new();
mock.expect_publish()
.withf(|r, _| r.messages.len() == 1)
.times(2)
.returning(publish_ok);

let client = GapicPublisher::from_stub(mock);
Expand Down
Loading