Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
std::vector<std::string> registers;
HyperLogLogMetadata metadata;

rocksdb::Status s = GetMetadata(ctx, dest_user_key, &metadata);
rocksdb::Status s = GetMetadata(ctx, dest_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
{
std::vector<Slice> all_user_keys;
Expand Down
53 changes: 53 additions & 0 deletions tests/gocase/unit/hyperloglog/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -213,4 +214,56 @@ func TestHyperLogLog(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 10, card)
})

t.Run("PFMERGE into existing dest preserves dest data", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "dest", "src1", "src2").Err())

_, err := rdb.PFAdd(ctx, "dest", "a", "b", "c").Result()
require.NoError(t, err)
_, err = rdb.PFAdd(ctx, "src1", "d", "e").Result()
require.NoError(t, err)
_, err = rdb.PFAdd(ctx, "src2", "f").Result()
require.NoError(t, err)

// Merge src1 and src2 into existing dest
_, err = rdb.PFMerge(ctx, "dest", "src1", "src2").Result()
require.NoError(t, err)

// dest should have all 6 elements
card, err := rdb.PFCount(ctx, "dest").Result()
require.NoError(t, err)
require.EqualValues(t, 6, card)

// Merge again into dest to verify idempotency
_, err = rdb.PFMerge(ctx, "dest", "src1").Result()
require.NoError(t, err)
card, err = rdb.PFCount(ctx, "dest").Result()
require.NoError(t, err)
require.EqualValues(t, 6, card)
})

// PFMERGE must load dest metadata using the namespace-prefixed key (same as storage).
// If GetMetadata used the raw user key, it would miss the record, keep default Metadata (expire=0),
// and the following PFMERGE would rewrite metadata without preserving TTL (TTL becomes "no expiry").
t.Run("PFMERGE into existing dest preserves TTL", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "dest_ttl", "src_ttl").Err())

_, err := rdb.PFAdd(ctx, "dest_ttl", "a", "b").Result()
require.NoError(t, err)
ok, err := rdb.Expire(ctx, "dest_ttl", 3600*time.Second).Result()
require.NoError(t, err)
require.True(t, ok)

ttlBefore := rdb.TTL(ctx, "dest_ttl").Val()
require.Greater(t, ttlBefore, time.Duration(0))

_, err = rdb.PFAdd(ctx, "src_ttl", "c").Result()
require.NoError(t, err)
_, err = rdb.PFMerge(ctx, "dest_ttl", "src_ttl").Result()
require.NoError(t, err)

ttlAfter := rdb.TTL(ctx, "dest_ttl").Val()
require.Greater(t, ttlAfter, time.Duration(0),
"PFMERGE must keep dest TTL when dest already exists (metadata must be read with ns-prefixed key)")
})
}
Loading