Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,28 @@ public class ConfigOptions {
+ "The RateLimiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). "
+ "Set to a lower value (e.g., 100MB) to limit the rate.");

public static final ConfigOption<Boolean> KV_SHARED_BLOCK_CACHE_ENABLED =
key("kv.rocksdb.shared-block-cache.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable a shared block cache for all RocksDB instances "
+ "in the TabletServer. When enabled, all KV tablets share a single "
+ "block cache to improve memory utilization. When disabled, each "
+ "tablet creates its own independent block cache.");

public static final ConfigOption<MemorySize> KV_SHARED_BLOCK_CACHE_SIZE =
key("kv.rocksdb.shared-block-cache.size")
.memoryType()
.defaultValue(MemorySize.parse("256mb"))
.withDescription(
"The total size of the shared block cache for all RocksDB instances "
+ "in the TabletServer. All KV tablets share a single block cache "
+ "to improve memory utilization. Hot tablets can use more cache "
+ "while cold tablets use less. The default value is 256MB. "
+ "Only takes effect when kv.rocksdb.shared-block-cache.enabled "
+ "is set to true.");

// --------------------------------------------------------------------------
// Provided configurable ColumnFamilyOptions within Fluss
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -1767,7 +1789,9 @@ public class ConfigOptions {
.defaultValue(MemorySize.parse("8mb"))
.withDescription(
"The amount of the cache for data blocks in RocksDB. "
+ "The default block-cache size is `8MB`.");
+ "The default block-cache size is `8MB`. "
+ "This setting is ignored when shared block cache is enabled "
+ "(kv.rocksdb.shared-block-cache.enabled=true).");

public static final ConfigOption<Boolean> KV_USE_BLOOM_FILTER =
key("kv.rocksdb.use-bloom-filter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.types.Tuple2;

import org.rocksdb.Cache;
import org.rocksdb.LRUCache;
import org.rocksdb.RateLimiter;
import org.rocksdb.RateLimiterMode;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.File;
Expand Down Expand Up @@ -135,6 +138,27 @@ public static RateLimiter getDefaultRateLimiter() {
*/
private final RateLimiter sharedRocksDBRateLimiter;

/** The shared block cache for all RocksDB instances, null if disabled. */
@Nullable private final Cache sharedBlockCache;

/**
* Returns the shared block cache usage in bytes, or 0 if shared cache is disabled.
*
* @return shared block cache usage in bytes
*/
public long getSharedBlockCacheUsage() {
return sharedBlockCache != null ? sharedBlockCache.getUsage() : 0L;
}

/**
* Returns the shared block cache pinned usage in bytes, or 0 if shared cache is disabled.
*
* @return shared block cache pinned usage in bytes
*/
public long getSharedBlockCachePinnedUsage() {
return sharedBlockCache != null ? sharedBlockCache.getPinnedUsage() : 0L;
}

/** Current shared rate limiter configuration in bytes per second. */
private volatile long currentSharedRateLimitBytesPerSec;

Expand All @@ -157,6 +181,10 @@ private KvManager(
this.remoteFileSystem = remoteKvDir.getFileSystem();
this.serverMetricGroup = tabletServerMetricGroup;
this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf);
this.sharedBlockCache = createSharedBlockCache(conf);
if (sharedBlockCache != null) {
tabletServerMetricGroup.setSharedBlockCacheUsageSupplier(sharedBlockCache::getUsage);
}
this.currentSharedRateLimitBytesPerSec =
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
}
Expand All @@ -179,6 +207,15 @@ private static RateLimiter createSharedRateLimiter(Configuration conf) {
false);
}

private static @Nullable Cache createSharedBlockCache(Configuration conf) {
if (!conf.get(ConfigOptions.KV_SHARED_BLOCK_CACHE_ENABLED)) {
return null;
}
long sharedBlockCacheSize = conf.get(ConfigOptions.KV_SHARED_BLOCK_CACHE_SIZE).getBytes();
RocksDB.loadLibrary();
return new LRUCache(sharedBlockCacheSize);
}

public static KvManager create(
Configuration conf,
ZooKeeperClient zkClient,
Expand Down Expand Up @@ -216,6 +253,9 @@ public void shutdown() {
if (sharedRocksDBRateLimiter != null) {
sharedRocksDBRateLimiter.close();
}
if (sharedBlockCache != null) {
sharedBlockCache.close();
}
LOG.info("Shut down KvManager complete.");
}

Expand Down Expand Up @@ -273,6 +313,7 @@ public KvTablet getOrCreateKv(
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter,
sharedBlockCache,
autoIncrementManager);
currentKvs.put(tableBucket, tablet);

Expand Down Expand Up @@ -390,6 +431,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter,
sharedBlockCache,
autoIncrementManager);
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.fluss.utils.BytesUtils;
import org.apache.fluss.utils.FileUtils;

import org.rocksdb.Cache;
import org.rocksdb.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -195,9 +196,10 @@ public static KvTablet create(
SchemaGetter schemaGetter,
ChangelogImage changelogImage,
RateLimiter sharedRateLimiter,
@Nullable Cache sharedBlockCache,
AutoIncrementManager autoIncrementManager)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter);
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter, sharedBlockCache);

// Create RocksDB statistics accessor (will be registered to TableMetricGroup by Replica)
// Pass ResourceGuard to ensure thread-safe access during concurrent close operations
Expand All @@ -209,7 +211,8 @@ public static KvTablet create(
kv.getStatistics(),
kv.getResourceGuard(),
kv.getDefaultColumnFamilyHandle(),
kv.getBlockCache());
kv.getBlockCache(),
sharedBlockCache != null);

return new KvTablet(
tablePath,
Expand All @@ -232,11 +235,15 @@ public static KvTablet create(
}

private static RocksDBKv buildRocksDBKv(
Configuration configuration, File kvDir, RateLimiter sharedRateLimiter)
Configuration configuration,
File kvDir,
RateLimiter sharedRateLimiter,
@Nullable Cache sharedBlockCache)
throws IOException {
// Enable statistics to support RocksDB statistics collection
RocksDBResourceContainer rocksDBResourceContainer =
new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter);
new RocksDBResourceContainer(
configuration, kvDir, true, sharedRateLimiter, sharedBlockCache);
RocksDBKvBuilder rocksDBKvBuilder =
new RocksDBKvBuilder(
kvDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class RocksDBResourceContainer implements AutoCloseable {
/** The shared rate limiter for all RocksDB instances. */
private final RateLimiter sharedRateLimiter;

/** The shared block cache from KvManager, null if not using shared cache. */
@Nullable private final Cache sharedBlockCache;

/** The statistics object for RocksDB, null if statistics is disabled. */
@Nullable private Statistics statistics;

Expand All @@ -93,25 +96,39 @@ public class RocksDBResourceContainer implements AutoCloseable {

@VisibleForTesting
RocksDBResourceContainer() {
this(new Configuration(), null, false, KvManager.getDefaultRateLimiter());
this(new Configuration(), null, false, KvManager.getDefaultRateLimiter(), null);
}

public RocksDBResourceContainer(ReadableConfig configuration, @Nullable File instanceBasePath) {
this(configuration, instanceBasePath, false, KvManager.getDefaultRateLimiter());
this(configuration, instanceBasePath, false, KvManager.getDefaultRateLimiter(), null);
}

public RocksDBResourceContainer(
ReadableConfig configuration,
@Nullable File instanceBasePath,
boolean enableStatistics) {
this(configuration, instanceBasePath, enableStatistics, KvManager.getDefaultRateLimiter());
this(
configuration,
instanceBasePath,
enableStatistics,
KvManager.getDefaultRateLimiter(),
null);
}

public RocksDBResourceContainer(
ReadableConfig configuration,
@Nullable File instanceBasePath,
boolean enableStatistics,
RateLimiter sharedRateLimiter) {
this(configuration, instanceBasePath, enableStatistics, sharedRateLimiter, null);
}

public RocksDBResourceContainer(
ReadableConfig configuration,
@Nullable File instanceBasePath,
boolean enableStatistics,
RateLimiter sharedRateLimiter,
@Nullable Cache sharedBlockCache) {
this.configuration = configuration;

this.instanceRocksDBPath =
Expand All @@ -121,6 +138,7 @@ public RocksDBResourceContainer(
this.enableStatistics = enableStatistics;
this.sharedRateLimiter =
checkNotNull(sharedRateLimiter, "sharedRateLimiter must not be null");
this.sharedBlockCache = sharedBlockCache;

this.handlesToClose = new ArrayList<>();
}
Expand Down Expand Up @@ -303,10 +321,16 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes());

// Create explicit LRUCache for accurate memory tracking
long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
blockCache = new LRUCache(blockCacheSize);
handlesToClose.add(blockCache);
blockBasedTableConfig.setBlockCache(blockCache);
if (sharedBlockCache != null) {
// Use shared block cache, do NOT add to handlesToClose (managed by KvManager)
blockCache = sharedBlockCache;
blockBasedTableConfig.setBlockCache(sharedBlockCache);
} else {
long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
blockCache = new LRUCache(blockCacheSize);
handlesToClose.add(blockCache);
blockBasedTableConfig.setBlockCache(blockCache);
}

// Configure index and filter blocks caching
blockBasedTableConfig.setCacheIndexAndFilterBlocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,30 @@ public class RocksDBStatistics implements AutoCloseable {
private final ResourceGuard resourceGuard;
private final ColumnFamilyHandle defaultColumnFamilyHandle;
@Nullable private final Cache blockCache;
private final boolean isSharedBlockCache;

public RocksDBStatistics(
RocksDB db,
@Nullable Statistics statistics,
ResourceGuard resourceGuard,
ColumnFamilyHandle defaultColumnFamilyHandle,
@Nullable Cache blockCache) {
this(db, statistics, resourceGuard, defaultColumnFamilyHandle, blockCache, false);
}

public RocksDBStatistics(
RocksDB db,
@Nullable Statistics statistics,
ResourceGuard resourceGuard,
ColumnFamilyHandle defaultColumnFamilyHandle,
@Nullable Cache blockCache,
boolean isSharedBlockCache) {
this.db = db;
this.statistics = statistics;
this.resourceGuard = resourceGuard;
this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
this.blockCache = blockCache;
this.isSharedBlockCache = isSharedBlockCache;
}

// ==================== Ticker-based Metrics ====================
Expand Down Expand Up @@ -226,8 +238,10 @@ public long getTotalMemoryUsage() {
return 0L;
}

// When using shared block cache, exclude it from per-tablet stats
// to avoid double-counting during aggregation
Set<Cache> caches = null;
if (blockCache != null) {
if (blockCache != null && !isSharedBlockCache) {
caches = new HashSet<>();
caches.add(blockCache);
}
Expand Down Expand Up @@ -274,18 +288,29 @@ public long getTableReadersMemoryUsage() {
/**
* Get memory usage for block cache via MemoryUtil API.
*
* @return block cache memory usage in bytes, or 0 if not available
* <p>When using shared block cache, returns 0 to avoid double-counting during aggregation. The
* shared block cache usage should be reported separately at the server level.
*
* @return block cache memory usage in bytes, or 0 if not available or using shared cache
*/
public long getBlockCacheMemoryUsage() {
if (isSharedBlockCache) {
return 0L;
}
return getMemoryUsageByType(MemoryUsageType.kCacheTotal);
}

/**
* Get pinned memory usage in block cache.
*
* @return pinned memory usage in bytes, or 0 if not available
* <p>When using shared block cache, returns 0 to avoid double-counting during aggregation.
*
* @return pinned memory usage in bytes, or 0 if not available or using shared cache
*/
public long getBlockCachePinnedUsage() {
if (isSharedBlockCache) {
return 0L;
}
try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
if (blockCache != null) {
return blockCache.getPinnedUsage();
Expand All @@ -302,8 +327,9 @@ private long getMemoryUsageByType(MemoryUsageType type) {
return 0L;
}

// When using shared block cache, exclude it from per-tablet stats
Set<Cache> caches = null;
if (blockCache != null) {
if (blockCache != null && !isSharedBlockCache) {
caches = new HashSet<>();
caches.add(blockCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.fluss.utils.MapUtils;

import java.util.Map;
import java.util.function.LongSupplier;

/** The metric group for tablet server. */
public class TabletServerMetricGroup extends AbstractMetricGroup {
Expand Down Expand Up @@ -76,6 +77,9 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
private final Counter isrExpands;
private final Counter failedIsrUpdates;

/** Supplier for shared block cache usage, set by KvManager when shared cache is enabled. */
private volatile LongSupplier sharedBlockCacheUsageSupplier = () -> 0L;

public TabletServerMetricGroup(
MetricRegistry registry, String clusterId, String rack, String hostname, int serverId) {
super(registry, new String[] {clusterId, hostname, NAME}, null);
Expand Down Expand Up @@ -145,13 +149,24 @@ public TabletServerMetricGroup(
*/
private void registerServerRocksDBMetrics() {
// Total memory usage across all RocksDB instances in this server.
// When shared block cache is enabled, per-tablet stats exclude block cache,
// so we add the shared block cache usage once separately.
gauge(
MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL,
() ->
metricGroupByTable.values().stream()
.flatMap(TableMetricGroup::allRocksDBStatistics)
.mapToLong(RocksDBStatistics::getTotalMemoryUsage)
.sum());
.flatMap(TableMetricGroup::allRocksDBStatistics)
.mapToLong(RocksDBStatistics::getTotalMemoryUsage)
.sum()
+ sharedBlockCacheUsageSupplier.getAsLong());
}

/**
* Sets the supplier for shared block cache usage. Called by KvManager when shared block cache
* is enabled.
*/
public void setSharedBlockCacheUsageSupplier(LongSupplier supplier) {
this.sharedBlockCacheUsageSupplier = supplier;
}

@Override
Expand Down
Loading
Loading