-
Notifications
You must be signed in to change notification settings - Fork 622
feat(cluster): Introduce lease mechanism for Kvrocks cluster to mitigate the brain-split issue #3397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
feat(cluster): Introduce lease mechanism for Kvrocks cluster to mitigate the brain-split issue #3397
Changes from all commits
01f51da
0022c9a
fab206c
9114bd8
222e277
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,12 @@ const std::vector<ConfigEnum<BlockCacheType>> cache_types{[] { | |
| const std::vector<ConfigEnum<MigrationType>> migration_types{{"redis-command", MigrationType::kRedisCommand}, | ||
| {"raw-key-value", MigrationType::kRawKeyValue}}; | ||
|
|
||
| const std::vector<ConfigEnum<MasterLeaseMode>> master_lease_modes{ | ||
| {"disabled", MasterLeaseMode::kDisabled}, | ||
| {"log-only", MasterLeaseMode::kLogOnly}, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| {"block-write", MasterLeaseMode::kBlockWrite}, | ||
| }; | ||
|
|
||
| std::string TrimRocksDbPrefix(std::string s) { | ||
| constexpr std::string_view prefix = "rocksdb."; | ||
| if (!util::StartsWithICase(s, prefix)) return s; | ||
|
|
@@ -243,6 +249,8 @@ Config::Config() { | |
| {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)}, | ||
| {"json-storage-format", false, | ||
| new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, | ||
| {"master-lease-mode", false, | ||
| new EnumField<MasterLeaseMode>(&master_lease_mode, master_lease_modes, MasterLeaseMode::kDisabled)}, | ||
| {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, | ||
| {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, | ||
| {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -719,6 +719,23 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions | |
|
|
||
| rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, | ||
| rocksdb::WriteBatch *updates) { | ||
| // Master lease check: applied here so it covers both Storage::Write() and CommitTxn(). | ||
| // Only active when master_lease_mode != disabled. Read mode once to avoid TOCTOU. | ||
| auto lease_mode = config_->master_lease_mode; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lease state should be maintained inside the server instead of the storage. And you can refuse the write operation like the slave's read-only mode. And the lease mode should only take effect if the cluster mode was enabled. |
||
| if (lease_mode != MasterLeaseMode::kDisabled) { | ||
| uint64_t deadline = lease_deadline_ms_.load(std::memory_order_relaxed); | ||
| // deadline == 0 means cold start (never renewed): writes always allowed. | ||
| if (deadline > 0 && util::GetTimeStampMS() > deadline) { | ||
| if (lease_mode == MasterLeaseMode::kBlockWrite) { | ||
| return rocksdb::Status::Aborted( | ||
| "Write rejected: master lease expired (master_lease_mode=block-write)"); | ||
| } else { // kLogOnly | ||
| LOG(ERROR) << "Master lease expired but write allowed (master_lease_mode=log-only)"; | ||
| // TODO: increment stats counter lease_expired_writes | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // No point trying to commit an empty write batch: in fact this will fail on read-only DBs | ||
| // even if the write batch is empty. | ||
| if (updates->Count() == 0) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| * | ||
| */ | ||
|
|
||
| #include <gtest/gtest.h> | ||
|
|
||
| #include <atomic> | ||
|
|
||
| #include "common/time_util.h" | ||
|
|
||
| // Standalone tests for lease atomic logic, mirroring Storage's internal behavior. | ||
| // NOTE: Storage cannot be instantiated without RocksDB, so these tests verify the | ||
| // identical atomic logic directly. This is an intentional tradeoff: if Storage's | ||
| // method implementations diverge from this logic, these tests won't catch it. | ||
|
|
||
| TEST(Lease, UpdateLeaseStoresCorrectValues) { | ||
| std::atomic<uint64_t> deadline{0}; | ||
| std::atomic<uint64_t> version{0}; | ||
|
|
||
| uint64_t now = util::GetTimeStampMS(); | ||
| uint64_t lease_ms = 2000; | ||
| // Mirrors UpdateLease(5, now + lease_ms): deadline written first, then version | ||
| deadline.store(now + lease_ms, std::memory_order_relaxed); | ||
| version.store(5, std::memory_order_relaxed); | ||
|
|
||
| EXPECT_EQ(version.load(std::memory_order_relaxed), 5U); | ||
| EXPECT_GE(deadline.load(std::memory_order_relaxed), now + lease_ms - 10); | ||
| EXPECT_LE(deadline.load(std::memory_order_relaxed), now + lease_ms + 10); | ||
| } | ||
|
|
||
| TEST(Lease, ResetLeaseZerosDeadlineAndVersion) { | ||
| std::atomic<uint64_t> deadline{12345}; | ||
| std::atomic<uint64_t> version{99}; | ||
|
|
||
| // Mirrors ResetLease(): deadline cleared first, then version | ||
| deadline.store(0, std::memory_order_relaxed); | ||
| version.store(0, std::memory_order_relaxed); | ||
|
|
||
| EXPECT_EQ(deadline.load(std::memory_order_relaxed), 0U); | ||
| EXPECT_EQ(version.load(std::memory_order_relaxed), 0U); | ||
| } | ||
|
|
||
| TEST(Lease, DeadlineZeroMeansNeverExpired) { | ||
| // deadline == 0 is the cold-start state: writes always allowed | ||
| std::atomic<uint64_t> deadline{0}; | ||
| uint64_t now = util::GetTimeStampMS(); | ||
| // Simulate the check in Storage::Write() | ||
| bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); | ||
| EXPECT_FALSE(expired); | ||
| } | ||
|
|
||
| TEST(Lease, DeadlineInFutureNotExpired) { | ||
| std::atomic<uint64_t> deadline{0}; | ||
| uint64_t now = util::GetTimeStampMS(); | ||
| deadline.store(now + 5000, std::memory_order_relaxed); | ||
| bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); | ||
| EXPECT_FALSE(expired); | ||
| } | ||
|
|
||
| TEST(Lease, DeadlineInPastExpired) { | ||
| std::atomic<uint64_t> deadline{0}; | ||
| uint64_t now = util::GetTimeStampMS(); | ||
| deadline.store(now - 1000, std::memory_order_relaxed); // 1 second ago | ||
| bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); | ||
| EXPECT_TRUE(expired); | ||
| } | ||
|
|
||
| TEST(Lease, ElectionVersionGuard) { | ||
| // Mirrors the version guard in CLUSTERX HEARTBEAT Execute(): | ||
| // if received_version >= local_version -> renew; else -> reject | ||
| std::atomic<uint64_t> local_ver{10}; | ||
| std::atomic<uint64_t> deadline{0}; | ||
|
|
||
| // Case: received version >= local -> renew | ||
| uint64_t received = 10; | ||
| if (received >= local_ver.load(std::memory_order_relaxed)) { | ||
| deadline.store(util::GetTimeStampMS() + 2000, std::memory_order_relaxed); | ||
| local_ver.store(received, std::memory_order_relaxed); | ||
| } | ||
| EXPECT_EQ(local_ver.load(std::memory_order_relaxed), 10U); | ||
| EXPECT_GT(deadline.load(std::memory_order_relaxed), 0U); | ||
|
|
||
| // Case: received version < local -> no renew | ||
| deadline.store(0, std::memory_order_relaxed); | ||
| received = 9; | ||
| if (received >= local_ver.load(std::memory_order_relaxed)) { | ||
| deadline.store(util::GetTimeStampMS() + 2000, std::memory_order_relaxed); | ||
| local_ver.store(received, std::memory_order_relaxed); | ||
| } | ||
| EXPECT_EQ(deadline.load(std::memory_order_relaxed), 0U); // not renewed | ||
| EXPECT_EQ(local_ver.load(std::memory_order_relaxed), 10U); // unchanged | ||
| } | ||
|
|
||
| TEST(Lease, ResetOnRoleTransition) { | ||
| // Simulate: node had a lease as master, then became slave via SLAVEOF | ||
| std::atomic<uint64_t> deadline{util::GetTimeStampMS() + 5000}; | ||
| std::atomic<uint64_t> version{42}; | ||
|
|
||
| // Simulate SLAVEOF path: ResetLease() | ||
| deadline.store(0, std::memory_order_relaxed); | ||
| version.store(0, std::memory_order_relaxed); | ||
|
|
||
| // After reset, the Write() check should treat it as cold start (writes allowed) | ||
| uint64_t now = util::GetTimeStampMS(); | ||
| bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); | ||
| EXPECT_FALSE(expired); // deadline==0 means not expired | ||
| EXPECT_EQ(version.load(std::memory_order_relaxed), 0U); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have a proper range for the lease timeout?