Skip to content
Open
141 changes: 141 additions & 0 deletions src/commands/cmd_cuckoo_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_cuckoo_chain.h"

namespace redis {

class CommandCFReserve : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex]
if (args.size() < 3) {
return {Status::RedisParseErr, "wrong number of arguments"};
}

// Parse capacity (required)
auto parse_capacity = ParseInt<uint64_t>(args[2], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, "invalid capacity"};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
return {Status::RedisParseErr, "capacity must be larger than 0"};
}

// Parse optional parameters
CommandParser parser(args, 3);
while (parser.Good()) {
if (parser.EatEqICase("BUCKETSIZE")) {
auto parse_bucket_size = parser.TakeInt<uint8_t>();
if (!parse_bucket_size.IsOK()) {
return {Status::RedisParseErr, "invalid bucket size"};
}
bucket_size_ = parse_bucket_size.GetValue();
if (bucket_size_ == 0 || bucket_size_ > 255) {
return {Status::RedisParseErr, "bucket size must be between 1 and 255"};
}
} else if (parser.EatEqICase("MAXITERATIONS")) {
auto parse_max_iterations = parser.TakeInt<uint16_t>();
if (!parse_max_iterations.IsOK()) {
return {Status::RedisParseErr, "invalid max iterations"};
}
max_iterations_ = parse_max_iterations.GetValue();
if (max_iterations_ == 0) {
return {Status::RedisParseErr, "max iterations must be larger than 0"};
}
} else if (parser.EatEqICase("EXPANSION")) {
auto parse_expansion = parser.TakeInt<uint8_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "invalid expansion factor"};
}
expansion_ = parse_expansion.GetValue();
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_);

if (!s.ok()) {
if (s.IsInvalidArgument()) {
// Return error message to client
return {Status::RedisExecErr, s.ToString()};
}
return {Status::RedisExecErr, "failed to create cuckoo filter"};
}

*output = redis::SimpleString("OK");
return Status::OK();
}

private:
uint64_t capacity_ = kCFDefaultCapacity;
uint8_t bucket_size_ = kCFDefaultBucketSize;
uint16_t max_iterations_ = kCFDefaultMaxIterations;
uint8_t expansion_ = kCFDefaultExpansion;
};

class CommandCFAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// CF.ADD key item
if (args.size() != 3) {
return {Status::RedisParseErr, "wrong number of arguments"};
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
bool added = false;
auto s = cuckoo_db.Add(ctx, args_[1], args_[2], &added);

if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, "key not found"};
}
if (s.IsAborted()) {
// Filter is full
return {Status::RedisExecErr, s.ToString()};
}
return {Status::RedisExecErr, "failed to add item to cuckoo filter"};
}

// Return 1 if added, 0 if already exists (though we don't check for duplicates in this version)
*output = redis::Integer(added ? 1 : 0);
return Status::OK();
}
};

// Register the CF.RESERVE and CF.ADD commands
REDIS_REGISTER_COMMANDS(CuckooFilter, MakeCmdAttr<CommandCFReserve>("cf.reserve", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandCFAdd>("cf.add", 3, "write", 1, 1, 1))

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CuckooFilter,
Function,
Geo,
Hash,
Expand Down
43 changes: 43 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,46 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void CuckooChainMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);

PutFixed16(dst, n_filters);
PutFixed16(dst, expansion);
PutFixed64(dst, base_capacity);
PutFixed8(dst, bucket_size);
PutFixed16(dst, max_iterations);
PutFixed64(dst, num_deleted_items);
}

rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}

if (input->size() < 21) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed16(input, &n_filters);
GetFixed16(input, &expansion);
GetFixed64(input, &base_capacity);
GetFixed8(input, &bucket_size);
GetFixed16(input, &max_iterations);
GetFixed64(input, &num_deleted_items);

return rocksdb::Status::OK();
}

uint64_t CuckooChainMetadata::GetTotalCapacity() const {
if (expansion == 0 || n_filters == 1) {
return base_capacity;
}

// Calculate total capacity across all filters
uint64_t total = 0;
for (uint16_t i = 0; i < n_filters; i++) {
total += base_capacity * std::pow(expansion, i);
}
return total;
}
43 changes: 41 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ enum RedisType : uint8_t {
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
kRedisTimeSeries = 13,
kRedisCuckooFilter = 14,
kRedisTypeMax
};

inline constexpr const std::array<std::string_view, kRedisTypeMax> RedisTypeNames = {
"none", "string", "hash", "list", "set", "zset", "bitmap",
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"};
"none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "cuckoofilter"};

struct RedisTypes {
RedisTypes(std::initializer_list<RedisType> list) {
Expand Down Expand Up @@ -306,6 +307,44 @@ class BloomChainMetadata : public Metadata {
bool IsScaling() const { return expansion != 0; };
};

class CuckooChainMetadata : public Metadata {
public:
/// The number of sub-filters in the chain
uint16_t n_filters;

/// Expansion factor for new filters
/// When a filter is full, a new one is created with capacity = base_capacity * expansion^n
uint16_t expansion;

/// The capacity of the first filter
uint64_t base_capacity;

/// Number of fingerprints per bucket
uint8_t bucket_size;

/// Maximum number of cuckoo kicks before considering filter full
uint16_t max_iterations;

/// Track number of deleted items for maintenance
uint64_t num_deleted_items;

explicit CuckooChainMetadata(bool generate_version = true)
: Metadata(kRedisCuckooFilter, generate_version),
n_filters(0),
expansion(0),
base_capacity(0),
bucket_size(0),
max_iterations(0),
num_deleted_items(0) {}

void Encode(std::string *dst) const override;
using Metadata::Decode;
rocksdb::Status Decode(Slice *input) override;

uint64_t GetTotalCapacity() const;
bool IsScaling() const { return expansion > 0; }
};

enum class JsonStorageFormat : uint8_t {
JSON = 0,
CBOR = 1,
Expand Down
84 changes: 84 additions & 0 deletions src/types/cuckoo_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <cstdint>
#include <string>
#include <utility>
#include <vector>

#include "vendor/murmurhash2.h"

namespace redis {

// Cuckoo filter implementation from the paper:
// "Cuckoo Filter: Practically Better Than Bloom" by Fan et al.
// This is a bucket-based storage implementation where each bucket is stored
// as an independent key-value pair in RocksDB
//
// Hash calculation follows RedisBloom's design:
// - fp = hash % 255 + 1 (fingerprint, non-zero, range: 1-255)
// - h1 = hash (primary hash)
// - h2 = h1 ^ (fp * 0x5bd1e995) (alternate hash via XOR)
// - bucket_index = hash % num_buckets (only apply modulo when indexing)
class CuckooFilter {
public:
// Calculate the optimal number of buckets for the filter
static uint32_t OptimalNumBuckets(uint64_t capacity, uint8_t bucket_size) {
// A load factor of 95.5% is chosen for the cuckoo filter
uint32_t num_buckets = static_cast<uint32_t>(capacity / bucket_size / 0.955);
// Round up to next power of 2 for better hash distribution
if (num_buckets == 0) num_buckets = 1;
uint32_t power = 1;
while (power < num_buckets) power <<= 1;
return power;
}

// Generate fingerprint from hash (8-bit fingerprint, non-zero, range: 1-255)
// Following RedisBloom: fp = hash % 255 + 1
static uint8_t GenerateFingerprint(uint64_t hash) { return static_cast<uint8_t>(hash % 255 + 1); }

// Calculate alternate hash using XOR (following RedisBloom)
// h2 = h1 ^ (fp * 0x5bd1e995)
// This preserves symmetry: GetAltHash(fp, GetAltHash(fp, h)) == h
static uint64_t GetAltHash(uint8_t fingerprint, uint64_t hash) {
return hash ^ (static_cast<uint64_t>(fingerprint) * 0x5bd1e995);
}

// Legacy function for backward compatibility with tests
// Converts bucket index to hash, applies GetAltHash, then converts back to bucket index
static uint32_t GetAltBucketIndex(uint32_t bucket_idx, uint8_t fingerprint, uint32_t num_buckets) {
// Treat bucket_idx as a hash value for the calculation
uint64_t hash = bucket_idx;
uint64_t alt_hash = GetAltHash(fingerprint, hash);
// Convert back to bucket index
return static_cast<uint32_t>(alt_hash % num_buckets);
}

// Compute hash for a given item using MurmurHash2 (compatible with RedisBloom)
// This is the entry point for hashing items before they are inserted/checked in the filter
static uint64_t Hash(const char* data, size_t length) { return HllMurMurHash64A(data, static_cast<int>(length), 0); }

// Convenience overload for std::string
static uint64_t Hash(const std::string& item) { return Hash(item.data(), item.size()); }
};

} // namespace redis
Loading
Loading