Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,24 @@ replication-delay-bytes 16384
# Default: 16 updates
replication-delay-updates 16

# Sequence padding allows the master to insert dummy WriteBatch records into
# the replication stream when the replica's requested sequence number does not
# align with the start sequence of any WriteBatch in the master's WAL.
#
# For example, if a WriteBatch starts at sequence 10 and contains 3 records,
# valid starting sequences for partial sync are 10 or 13. A request starting
# at 11 or 12 would normally be considered invalid and trigger a full sync.
# When enabled, the master will send padding batches to advance the replica's
# sequence to the next valid position, allowing incremental replication to
# continue while skipping the missing records.
#
# This can avoid expensive full synchronization when only a small number of
# log entries are missing. The trade-off is that the skipped records will not
# be applied on the replica, potentially causing minor data inconsistency.
#
# Default: no
replication-enable-sequence-padding no

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down
27 changes: 26 additions & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@
#include <openssl/ssl.h>
#endif

FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq,
uint32_t padded_seq_count /*= 0*/)
: srv_(srv),
conn_(conn),
next_repl_seq_(next_repl_seq),
padded_seq_count_(padded_seq_count),
req_(srv),
max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
Expand Down Expand Up @@ -174,6 +176,29 @@ bool FeedSlaveThread::shouldSendGetAck(rocksdb::SequenceNumber seq) {
}

void FeedSlaveThread::loop() {
// If there are padded sequences, send dummy no-op entries to keep replication sequence continuous
if (padded_seq_count_ > 0) {
static constexpr std::string_view kPaddingSequenceKey = "__kvrocks_internal_padding_seq__";
rocksdb::WriteBatch padding_write_batch;
for (uint32_t i = 0; i < padded_seq_count_; ++i) {
padding_write_batch.Delete(kPaddingSequenceKey);
}

// Encode the dummy WriteBatch as a Redis BulkString to send via replication protocol
std::string padding_seq_encoded = redis::BulkString(padding_write_batch.Data());
auto s = util::SockSend(conn_->GetFD(), padding_seq_encoded, conn_->GetBufferEvent());
if (!s.IsOK()) {
ERROR("Write error while sending padding sequence write batch to slave: {}. batches: 0x{}", s.Msg(),
util::StringToHex(padding_seq_encoded));
Stop();
return;
}

next_repl_seq_.fetch_add(rocksdb::SequenceNumber(padded_seq_count_), std::memory_order_relaxed);
WARN("Sent {} padding sequence write batch to slave {}, next replicate sequence {}", padded_seq_count_,
conn_->GetAddr(), next_repl_seq_.load());
}

// is_first_repl_batch was used to fix that replication may be stuck in a dead loop
// when some seqs might be lost in the middle of the WAL log, so forced to replicate
// first batch here to work around this issue instead of waiting for enough batch size.
Expand Down
4 changes: 3 additions & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ using FetchFileCallback = std::function<void(const std::string &, uint32_t)>;

class FeedSlaveThread {
public:
explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq,
uint32_t padded_seq_count = 0);
~FeedSlaveThread() = default;

Status Start();
Expand All @@ -80,6 +81,7 @@ class FeedSlaveThread {
Server *srv_ = nullptr;
std::unique_ptr<redis::Connection> conn_ = nullptr;
std::atomic<rocksdb::SequenceNumber> next_repl_seq_ = 0;
uint32_t padded_seq_count_ = 0;
std::thread t_;
std::unique_ptr<rocksdb::TransactionLogIterator> iter_ = nullptr;
// used to parse the ack response from the slave
Expand Down
21 changes: 18 additions & 3 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class CommandPSync : public Commander {
}

// Check Log sequence
if (!need_full_sync && !checkWALBoundary(srv->storage, next_repl_seq_).IsOK()) {
uint32_t padded_seq_count = 0;
if (!need_full_sync && !checkWALBoundary(srv->storage, next_repl_seq_, padded_seq_count).IsOK()) {
*output = "sequence out of range, please use fullsync";
need_full_sync = true;
}
Expand All @@ -100,7 +101,10 @@ class CommandPSync : public Commander {
}

srv->stats.IncrPSyncOKCount();
s = srv->AddSlave(conn, next_repl_seq_);
if (padded_seq_count > 0) {
srv->stats.IncrPSyncPaddingCount();
}
s = srv->AddSlave(conn, next_repl_seq_, padded_seq_count);
if (!s.IsOK()) {
std::string err = redis::Error(s);
s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent());
Expand All @@ -121,7 +125,9 @@ class CommandPSync : public Commander {
std::string replica_replid_;

// Return OK if the seq is in the range of the current WAL
static Status checkWALBoundary(engine::Storage *storage, rocksdb::SequenceNumber seq) {
static Status checkWALBoundary(engine::Storage *storage, rocksdb::SequenceNumber seq, uint32_t &padded_seq_count) {
padded_seq_count = 0;

if (seq == storage->LatestSeqNumber() + 1) {
return Status::OK();
}
Expand All @@ -138,6 +144,15 @@ class CommandPSync : public Commander {
auto batch = iter->GetBatch();
if (seq != batch.sequence) {
if (seq > batch.sequence) {
if (storage->GetConfig()->replication_enable_sequence_padding) {
// RocksDB WriteBatch::Count() returns a uint32_t
// In practice, batch sizes are much smaller. Since GetWALIter() ensures this
// batch contains the requested sequence, (seq - batch.sequence) is always
// <= batch.Count(), so overflow checks are unnecessary.
padded_seq_count = batch.writeBatchPtr->GetWriteBatch()->Count() - uint32_t(seq - batch.sequence);
return Status::OK();
}

ERROR("checkWALBoundary with sequence: {}, but GetWALIter return older sequence: {}", seq, batch.sequence);
}
return {Status::NotOK};
Expand Down
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Config::Config() {
{"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)},
{"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
{"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
{"replication-enable-sequence-padding", false, new YesNoField(&replication_enable_sequence_padding, false)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct Config {
bool use_rsid_psync = false;
bool replication_group_sync = false;
bool replication_no_slowdown = false;
bool replication_enable_sequence_padding = false;
std::vector<std::string> binds;
std::string dir;
std::string db_dir;
Expand Down
6 changes: 4 additions & 2 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ Status Server::RemoveMaster() {
return Status::OK();
}

Status Server::AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) {
auto t = std::make_unique<FeedSlaveThread>(this, conn, next_repl_seq);
Status Server::AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq,
uint32_t padded_seq_count /*= 0*/) {
auto t = std::make_unique<FeedSlaveThread>(this, conn, next_repl_seq, padded_seq_count);
auto s = t->Start();
if (!s.IsOK()) {
return s;
Expand Down Expand Up @@ -1323,6 +1324,7 @@ Server::InfoEntries Server::GetStatsInfo() {
entries.emplace_back("sync_full", stats.fullsync_count.load());
entries.emplace_back("sync_partial_ok", stats.psync_ok_count.load());
entries.emplace_back("sync_partial_err", stats.psync_err_count.load());
entries.emplace_back("sync_partial_padding", stats.psync_padding_count.load());

auto db_stats = storage->GetDBStats();
entries.emplace_back("keyspace_hits", db_stats->keyspace_hits.load());
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class Server {

Status AddMaster(const std::string &host, uint32_t port, bool force_reconnect);
Status RemoveMaster();
Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq, uint32_t padded_seq_count = 0);
void DisconnectSlaves();
void CleanupExitedSlaves();
bool IsSlave() const { return !master_host_.empty(); }
Expand Down
2 changes: 2 additions & 0 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Stats {
std::atomic<uint64_t> fullsync_count = {0};
std::atomic<uint64_t> psync_err_count = {0};
std::atomic<uint64_t> psync_ok_count = {0};
std::atomic<uint64_t> psync_padding_count = {0};
std::map<std::string, CommandStat> commands_stats;

using BucketBoundaries = std::vector<double>;
Expand All @@ -91,6 +92,7 @@ class Stats {
void IncrFullSyncCount() { fullsync_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCount() { psync_err_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCount() { psync_ok_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncPaddingCount() { psync_padding_count.fetch_add(1, std::memory_order_relaxed); }
static int64_t GetMemoryRSS();
void TrackInstantaneousMetric(int metric, uint64_t current_reading);
uint64_t GetInstantaneousMetric(int metric) const;
Expand Down
70 changes: 70 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,3 +711,73 @@ func TestReplicationWatermark(t *testing.T) {
// The small command should be processed much faster than 1 second
require.Less(t, duration, 1*time.Second, "small command should be processed promptly")
}

func TestReplicationSequencePadding(t *testing.T) {
t.Parallel()
ctx := context.Background()

// Start master with sequence padding disabled
masterSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"replication-enable-sequence-padding": "no",
})
defer func() { masterSrv.Close() }()
masterClient := masterSrv.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()
masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err())

replicaSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
})
defer func() { replicaSrv.Close() }()
replicaClient := replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
defer func() { require.NoError(t, replicaClient.Close()) }()
replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err())

// Configure master cluster topology
masterClusterConfigStr := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", masterNodeID, masterSrv.Port())
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", masterClusterConfigStr, "1").Err())

// Write to master to increase sequence number (creates gap with replica)
require.NoError(t, masterClient.Do(ctx, "hmset", "testHashKey", "1", "1", "2", "2").Err())

// Configure replica as standalone master and increase its sequence number (creates mismatch)
replicaClusterConfigStr := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", replicaNodeID, replicaSrv.Port())
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", replicaClusterConfigStr, "1").Err())
require.NoError(t, replicaClient.Do(ctx, "hmset", "testHashKey", "1", "1").Err())

t.Run("Psync reports sequence out of range when padding disabled", func(t *testing.T) {
require.Contains(t, masterClient.Do(ctx, "psync", "2").Err().Error(), "sequence out of range")
require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_err"))
})

t.Run("Psync avoids full sync when padding enabled", func(t *testing.T) {
// Enable sequence padding on master
require.NoError(t, masterClient.Do(ctx, "config", "set", "replication-enable-sequence-padding", "yes").Err())

// Configure replica as slave of master
replicaClusterConfigStr = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", masterClusterConfigStr, replicaNodeID, replicaSrv.Port(), masterNodeID)
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", replicaClusterConfigStr, "2").Err())

// Wait until replication catches up
util.WaitForSync(t, replicaClient)

// Verify roles and sync stats
require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role"))
require.Equal(t, "0", util.FindInfoEntry(replicaClient, "sync_full"))
require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_ok"))
require.Equal(t, "1", util.FindInfoEntry(masterClient, "sync_partial_padding"))

// Perform writes on master and verify replication
masterClient.Set(ctx, "k0", "v0", 0)
masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
util.WaitForOffsetSync(t, masterClient, replicaClient, 5*time.Second)

require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val())
})
}