diff --git a/kvrocks.conf b/kvrocks.conf index dba7eedb68c..0512d591c6f 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -231,6 +231,24 @@ replication-delay-bytes 16384 # Default: 16 updates replication-delay-updates 16 +# Sequence padding allows the master to insert dummy WriteBatch records into +# the replication stream when the replica's requested sequence number does not +# align with the start sequence of any WriteBatch in the master's WAL. +# +# For example, if a WriteBatch starts at sequence 10 and contains 3 records, +# valid starting sequences for partial sync are 10 or 13. A request starting +# at 11 or 12 would normally be considered invalid and trigger a full sync. +# When enabled, the master will send padding batches to advance the replica's +# sequence to the next valid position, allowing incremental replication to +# continue while skipping the missing records. +# +# This can avoid expensive full synchronization when only a small number of +# log entries are missing. The trade-off is that the skipped records will not +# be applied on the replica, potentially causing minor data inconsistency. +# +# Default: no +replication-enable-sequence-padding no + # TCP listen() backlog. # # In high requests-per-second environments you need an high backlog in order diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index bb211095c51..28452e75543 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -57,10 +57,12 @@ #include #endif -FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) +FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq, + uint32_t padded_seq_count /*= 0*/) : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), + padded_seq_count_(padded_seq_count), req_(srv), max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes), max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {} @@ -174,6 +176,29 @@ bool FeedSlaveThread::shouldSendGetAck(rocksdb::SequenceNumber seq) { } void FeedSlaveThread::loop() { + // If there are padded sequences, send dummy no-op entries to keep replication sequence continuous + if (padded_seq_count_ > 0) { + static constexpr std::string_view kPaddingSequenceKey = "__kvrocks_internal_padding_seq__"; + rocksdb::WriteBatch padding_write_batch; + for (uint32_t i = 0; i < padded_seq_count_; ++i) { + padding_write_batch.Delete(kPaddingSequenceKey); + } + + // Encode the dummy WriteBatch as a Redis BulkString to send via replication protocol + std::string padding_seq_encoded = redis::BulkString(padding_write_batch.Data()); + auto s = util::SockSend(conn_->GetFD(), padding_seq_encoded, conn_->GetBufferEvent()); + if (!s.IsOK()) { + ERROR("Write error while sending padding sequence write batch to slave: {}. batches: 0x{}", s.Msg(), + util::StringToHex(padding_seq_encoded)); + Stop(); + return; + } + + next_repl_seq_.fetch_add(rocksdb::SequenceNumber(padded_seq_count_), std::memory_order_relaxed); + WARN("Sent {} padding sequence write batch to slave {}, next replicate sequence {}", padded_seq_count_, + conn_->GetAddr(), next_repl_seq_.load()); + } + // is_first_repl_batch was used to fix that replication may be stuck in a dead loop // when some seqs might be lost in the middle of the WAL log, so forced to replicate // first batch here to work around this issue instead of waiting for enough batch size. diff --git a/src/cluster/replication.h b/src/cluster/replication.h index ab9ab7ae2d5..6df4ea3acbf 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -64,7 +64,8 @@ using FetchFileCallback = std::function; class FeedSlaveThread { public: - explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq); + explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq, + uint32_t padded_seq_count = 0); ~FeedSlaveThread() = default; Status Start(); @@ -80,6 +81,7 @@ class FeedSlaveThread { Server *srv_ = nullptr; std::unique_ptr conn_ = nullptr; std::atomic next_repl_seq_ = 0; + uint32_t padded_seq_count_ = 0; std::thread t_; std::unique_ptr iter_ = nullptr; // used to parse the ack response from the slave diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 73fbca8306b..ab7f8e375c3 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -79,7 +79,8 @@ class CommandPSync : public Commander { } // Check Log sequence - if (!need_full_sync && !checkWALBoundary(srv->storage, next_repl_seq_).IsOK()) { + uint32_t padded_seq_count = 0; + if (!need_full_sync && !checkWALBoundary(srv->storage, next_repl_seq_, padded_seq_count).IsOK()) { *output = "sequence out of range, please use fullsync"; need_full_sync = true; } @@ -100,7 +101,10 @@ class CommandPSync : public Commander { } srv->stats.IncrPSyncOKCount(); - s = srv->AddSlave(conn, next_repl_seq_); + if (padded_seq_count > 0) { + srv->stats.IncrPSyncPaddingCount(); + } + s = srv->AddSlave(conn, next_repl_seq_, padded_seq_count); if (!s.IsOK()) { std::string err = redis::Error(s); s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent()); @@ -121,7 +125,9 @@ class CommandPSync : public Commander { std::string replica_replid_; // Return OK if the seq is in the range of the current WAL - static Status checkWALBoundary(engine::Storage *storage, rocksdb::SequenceNumber seq) { + static Status checkWALBoundary(engine::Storage *storage, rocksdb::SequenceNumber seq, uint32_t &padded_seq_count) { + padded_seq_count = 0; + if (seq == storage->LatestSeqNumber() + 1) { return Status::OK(); } @@ -138,6 +144,15 @@ class CommandPSync : public Commander { auto batch = iter->GetBatch(); if (seq != batch.sequence) { if (seq > batch.sequence) { + if (storage->GetConfig()->replication_enable_sequence_padding) { + // RocksDB WriteBatch::Count() returns a uint32_t + // In practice, batch sizes are much smaller. Since GetWALIter() ensures this + // batch contains the requested sequence, (seq - batch.sequence) is always + // <= batch.Count(), so overflow checks are unnecessary. + padded_seq_count = batch.writeBatchPtr->GetWriteBatch()->Count() - uint32_t(seq - batch.sequence); + return Status::OK(); + } + ERROR("checkWALBoundary with sequence: {}, but GetWALIter return older sequence: {}", seq, batch.sequence); } return {Status::NotOK}; diff --git a/src/config/config.cc b/src/config/config.cc index 863bdd43224..66e4e9e594b 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -206,6 +206,7 @@ Config::Config() { {"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)}, {"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)}, {"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)}, + {"replication-enable-sequence-padding", false, new YesNoField(&replication_enable_sequence_padding, false)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 4fbb80137a0..491d96ea543 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -137,6 +137,7 @@ struct Config { bool use_rsid_psync = false; bool replication_group_sync = false; bool replication_no_slowdown = false; + bool replication_enable_sequence_padding = false; std::vector binds; std::string dir; std::string db_dir; diff --git a/src/server/server.cc b/src/server/server.cc index e3a5ec33e48..60e910b5bdc 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -330,8 +330,9 @@ Status Server::RemoveMaster() { return Status::OK(); } -Status Server::AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) { - auto t = std::make_unique(this, conn, next_repl_seq); +Status Server::AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq, + uint32_t padded_seq_count /*= 0*/) { + auto t = std::make_unique(this, conn, next_repl_seq, padded_seq_count); auto s = t->Start(); if (!s.IsOK()) { return s; @@ -1323,6 +1324,7 @@ Server::InfoEntries Server::GetStatsInfo() { entries.emplace_back("sync_full", stats.fullsync_count.load()); entries.emplace_back("sync_partial_ok", stats.psync_ok_count.load()); entries.emplace_back("sync_partial_err", stats.psync_err_count.load()); + entries.emplace_back("sync_partial_padding", stats.psync_padding_count.load()); auto db_stats = storage->GetDBStats(); entries.emplace_back("keyspace_hits", db_stats->keyspace_hits.load()); diff --git a/src/server/server.h b/src/server/server.h index 84593015006..116fb301694 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -198,7 +198,7 @@ class Server { Status AddMaster(const std::string &host, uint32_t port, bool force_reconnect); Status RemoveMaster(); - Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq); + Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq, uint32_t padded_seq_count = 0); void DisconnectSlaves(); void CleanupExitedSlaves(); bool IsSlave() const { return !master_host_.empty(); } diff --git a/src/stats/stats.h b/src/stats/stats.h index 0bae042d640..4e1cf8bb83a 100644 --- a/src/stats/stats.h +++ b/src/stats/stats.h @@ -76,6 +76,7 @@ class Stats { std::atomic fullsync_count = {0}; std::atomic psync_err_count = {0}; std::atomic psync_ok_count = {0}; + std::atomic psync_padding_count = {0}; std::map commands_stats; using BucketBoundaries = std::vector; @@ -91,6 +92,7 @@ class Stats { void IncrFullSyncCount() { fullsync_count.fetch_add(1, std::memory_order_relaxed); } void IncrPSyncErrCount() { psync_err_count.fetch_add(1, std::memory_order_relaxed); } void IncrPSyncOKCount() { psync_ok_count.fetch_add(1, std::memory_order_relaxed); } + void IncrPSyncPaddingCount() { psync_padding_count.fetch_add(1, std::memory_order_relaxed); } static int64_t GetMemoryRSS(); void TrackInstantaneousMetric(int metric, uint64_t current_reading); uint64_t GetInstantaneousMetric(int metric) const; diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index 39ed535cafd..a6d4ab21299 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -711,3 +711,73 @@ func TestReplicationWatermark(t *testing.T) { // The small command should be processed much faster than 1 second require.Less(t, duration, 1*time.Second, "small command should be processed promptly") } + +func TestReplicationSequencePadding(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // Start master with sequence padding disabled + masterSrv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "replication-enable-sequence-padding": "no", + }) + defer func() { masterSrv.Close() }() + masterClient := masterSrv.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err()) + + replicaSrv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + }) + defer func() { replicaSrv.Close() }() + replicaClient := replicaSrv.NewClient() + // allow to run the read-only command in the replica + require.NoError(t, replicaClient.ReadOnly(ctx).Err()) + defer func() { require.NoError(t, replicaClient.Close()) }() + replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err()) + + // Configure master cluster topology + masterClusterConfigStr := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", masterNodeID, masterSrv.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", masterClusterConfigStr, "1").Err()) + + // Write to master to increase sequence number (creates gap with replica) + require.NoError(t, masterClient.Do(ctx, "hmset", "testHashKey", "1", "1", "2", "2").Err()) + + // Configure replica as standalone master and increase its sequence number (creates mismatch) + replicaClusterConfigStr := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", replicaNodeID, replicaSrv.Port()) + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", replicaClusterConfigStr, "1").Err()) + require.NoError(t, replicaClient.Do(ctx, "hmset", "testHashKey", "1", "1").Err()) + + t.Run("Psync reports sequence out of range when padding disabled", func(t *testing.T) { + require.Contains(t, masterClient.Do(ctx, "psync", "2").Err().Error(), "sequence out of range") + require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_err")) + }) + + t.Run("Psync avoids full sync when padding enabled", func(t *testing.T) { + // Enable sequence padding on master + require.NoError(t, masterClient.Do(ctx, "config", "set", "replication-enable-sequence-padding", "yes").Err()) + + // Configure replica as slave of master + replicaClusterConfigStr = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", masterClusterConfigStr, replicaNodeID, replicaSrv.Port(), masterNodeID) + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", replicaClusterConfigStr, "2").Err()) + + // Wait until replication catches up + util.WaitForSync(t, replicaClient) + + // Verify roles and sync stats + require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role")) + require.Equal(t, "0", util.FindInfoEntry(replicaClient, "sync_full")) + require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_ok")) + require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_padding")) + + // Perform writes on master and verify replication + masterClient.Set(ctx, "k0", "v0", 0) + masterClient.LPush(ctx, "k1", "e0", "e1", "e2") + util.WaitForOffsetSync(t, masterClient, replicaClient, 5*time.Second) + + require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val()) + require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val()) + }) +}