Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/commands/cmd_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ constexpr const char *errBadRetention = "Couldn't parse RETENTION";
constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
constexpr const char *errBadEncoding = "unknown ENCODING parameter";
constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
constexpr const char *errBadIgnore = "Couldn't parse IGNORE";
constexpr const char *errInvalidTimestamp = "invalid timestamp";
constexpr const char *errInvalidValue = "invalid value";
constexpr const char *errOldTimestamp = "Timestamp is older than retention";
Expand Down Expand Up @@ -252,6 +253,9 @@ class CommandTSCreateBase : public KeywordCommandBase {
registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
});
registerHandler("IGNORE", [this](TSOptionsParser &parser) {
return handleIgnore(parser, create_option_.ignore_max_time_diff, create_option_.ignore_max_val_diff);
});
registerHandler("LABELS", [this](TSOptionsParser &parser) { return handleLabels(parser, create_option_.labels); });
}

Expand Down Expand Up @@ -315,6 +319,17 @@ class CommandTSCreateBase : public KeywordCommandBase {
return Status::OK();
}

static Status handleIgnore(TSOptionsParser &parser, uint64_t &ignore_max_time_diff, double &ignore_max_val_diff) {
auto parse_time_diff = parser.TakeInt<uint64_t>();
auto parse_val_diff = parser.TakeFloat<double>();
if (!parse_time_diff.IsOK() || !parse_val_diff.IsOK() || parse_val_diff.GetValue() < 0) {
return {Status::RedisParseErr, errBadIgnore};
}
ignore_max_time_diff = parse_time_diff.GetValue();
ignore_max_val_diff = parse_val_diff.GetValue();
return Status::OK();
}

TSCreateOption create_option_;
};

Expand Down
6 changes: 5 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,15 @@ void TimeSeriesMetadata::Encode(std::string *dst) const {
PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
PutSizedString(dst, source_key);
PutFixed64(dst, last_timestamp);
PutFixed64(dst, ignore_max_time_diff);
PutDouble(dst, ignore_max_val_diff);
}

rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}
if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 + sizeof(uint32_t)) {
if (input->size() < sizeof(uint64_t) * 3 + sizeof(uint8_t) * 2 + sizeof(uint32_t) + sizeof(double)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

Expand All @@ -641,6 +643,8 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
GetSizedString(input, &source_key_slice);
source_key = source_key_slice.ToString();
GetFixed64(input, &last_timestamp);
GetFixed64(input, &ignore_max_time_diff);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause compatibility issues with old data, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe it will cause compatibility issues with old data.

One other option we have is we can add backward compatibility like so:

ignore_max_time_diff = 0;
ignore_max_val_diff = 0.0;
if (input->size() >= sizeof(uint64_t) + sizeof(double)) {
  GetFixed64(input, &ignore_max_time_diff);
  GetDouble(input, &ignore_max_val_diff);
}

This will only Decode the values if they exist in the metadata.

How do you think we should proceed?

GetDouble(input, &ignore_max_val_diff);

return rocksdb::Status::OK();
}
10 changes: 8 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ class TimeSeriesMetadata : public Metadata {
uint64_t chunk_size;
ChunkType chunk_type;
DuplicatePolicy duplicate_policy;
uint64_t ignore_max_time_diff;
double ignore_max_val_diff;
std::string source_key;
uint64_t last_timestamp = 0; // Approximate last timestamp, used for compaction filter

Expand All @@ -423,14 +425,18 @@ class TimeSeriesMetadata : public Metadata {
retention_time(0),
chunk_size(0),
chunk_type(ChunkType::UNCOMPRESSED),
duplicate_policy(DuplicatePolicy::BLOCK) {}
duplicate_policy(DuplicatePolicy::BLOCK),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}
TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType chunk_type,
DuplicatePolicy duplicate_policy, bool generate_version = true)
: Metadata(kRedisTimeSeries, generate_version),
retention_time(retention_time),
chunk_size(chunk_size),
chunk_type(chunk_type),
duplicate_policy(duplicate_policy) {}
duplicate_policy(duplicate_policy),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}

void SetSourceKey(Slice key);

Expand Down
49 changes: 48 additions & 1 deletion src/types/redis_timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "redis_timeseries.h"

#include <cmath>
#include <queue>

#include "commands/error_constants.h"
Expand Down Expand Up @@ -515,7 +516,9 @@ TSCreateOption::TSCreateOption()
: retention_time(kDefaultRetentionTime),
chunk_size(kDefaultChunkSize),
chunk_type(kDefaultChunkType),
duplicate_policy(kDefaultDuplicatePolicy) {}
duplicate_policy(kDefaultDuplicatePolicy),
ignore_max_time_diff(0),
ignore_max_val_diff(0.0) {}

Status TSMQueryFilterParser::Parse(std::string_view expr) {
if (expr.empty()) return Status::OK();
Expand Down Expand Up @@ -678,6 +681,8 @@ TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
metadata.chunk_size = option.chunk_size;
metadata.chunk_type = option.chunk_type;
metadata.duplicate_policy = option.duplicate_policy;
metadata.ignore_max_time_diff = option.ignore_max_time_diff;
metadata.ignore_max_val_diff = option.ignore_max_val_diff;
metadata.SetSourceKey(option.source_key);

return metadata;
Expand Down Expand Up @@ -851,6 +856,44 @@ rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
return createTimeSeries(ctx, ns_key, metadata_out, option);
}

rocksdb::Status TimeSeries::filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch) {
if (!metadata.source_key.empty() || metadata.duplicate_policy != DuplicatePolicy::LAST) {
return rocksdb::Status::OK();
}

std::vector<TSSample> latest_samples;
auto s = getCommon(ctx, ns_key, metadata, true, &latest_samples);
if (!s.ok() || latest_samples.empty()) {
return s;
}

auto latest_sample = latest_samples.back();
auto all_samples = sample_batch->AsSlice();
auto samples = all_samples.GetSampleSpan();
auto add_results = all_samples.GetAddResultSpan();

for (size_t i = 0; i < samples.size(); i++) {
if (add_results[i].type != TSChunk::AddResultType::kNone) {
continue;
}

const auto &sample = samples[i];
if (sample.ts >= latest_sample.ts && sample.ts - latest_sample.ts <= metadata.ignore_max_time_diff &&
std::abs(sample.v - latest_sample.v) <= metadata.ignore_max_val_diff) {
add_results[i].type = TSChunk::AddResultType::kSkip;
add_results[i].sample.ts = latest_sample.ts;
continue;
}

if (sample.ts >= latest_sample.ts) {
latest_sample = sample;
}
}

return rocksdb::Status::OK();
}

rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch, DownstreamUpsertArgs *ds_args) {
auto batch = storage_->GetWriteBatchBase();
Expand Down Expand Up @@ -1930,6 +1973,8 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSS
rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
if (!s.ok()) return s;
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : metadata.duplicate_policy);
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
if (!s.ok()) return s;

DownstreamUpsertArgs ds_args;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
Expand All @@ -1950,6 +1995,8 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, st
return s;
}
auto sample_batch = SampleBatch(std::move(samples), metadata.duplicate_policy);
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
if (!s.ok()) return s;
DownstreamUpsertArgs ds_args;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
if (!s.ok()) return s;
Expand Down
7 changes: 5 additions & 2 deletions src/types/redis_timeseries.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ struct TSCreateOption {
uint64_t chunk_size;
TimeSeriesMetadata::ChunkType chunk_type;
TimeSeriesMetadata::DuplicatePolicy duplicate_policy;
uint64_t ignore_max_time_diff;
double ignore_max_val_diff;
std::string source_key;
LabelKVList labels;

Expand Down Expand Up @@ -257,8 +259,7 @@ enum class TSAlterMode : uint8_t {
RETENTION = 1,
CHUNK_SIZE = 1 << 1,
DUPLICATE_POLICY = 1 << 2,
IGNORE = 1 << 3,
LABELS = 1 << 4,
LABELS = 1 << 3,
};

std::vector<TSSample> GroupSamplesAndReduce(const std::vector<std::vector<TSSample>> &all_samples,
Expand Down Expand Up @@ -317,6 +318,8 @@ class TimeSeries : public SubKeyScanner {
const TSCreateOption *options);
rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out,
const TSCreateOption *option = nullptr);
rocksdb::Status filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch);
rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
Expand Down
4 changes: 3 additions & 1 deletion src/types/timeseries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ std::vector<TSChunk::AddResult> TSChunk::SampleBatch::GetFinalResults() const {
res.resize(add_results_.size());
for (size_t idx = 0; idx < add_results_.size(); idx++) {
res[indexes_[idx]] = add_results_[idx];
res[indexes_[idx]].sample.ts = samples_[idx].ts;
if (!(res[indexes_[idx]].type == AddResultType::kSkip && res[indexes_[idx]].sample.ts != 0)) {
res[indexes_[idx]].sample.ts = samples_[idx].ts;
}
}
return res;
}
Expand Down
33 changes: 33 additions & 0 deletions tests/gocase/unit/type/timeseries/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,24 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", "13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
})

t.Run("TS.ADD Ignore Option", func(t *testing.T) {
ignoreKey := "test_add_ignore_key"
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())

require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1003", "11").Val())

res := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 1, len(res))
assert.Equal(t, []interface{}{int64(1000), float64(10)}, res[0])

require.Equal(t, int64(1008), rdb.Do(ctx, "ts.add", ignoreKey, "1008", "20").Val())
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(res))
assert.Equal(t, []interface{}{int64(1008), float64(20)}, res[1])
})

t.Run("TS.ADD With Retention", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", "1000").Err())
Expand Down Expand Up @@ -231,6 +249,21 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
assert.Contains(t, res[1], "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
})

t.Run("TS.MADD Ignore Option", func(t *testing.T) {
ignoreKey := "test_madd_ignore_key"
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())

res := rdb.Do(ctx, "ts.madd", ignoreKey, "1003", "11", ignoreKey, "1004", "13", ignoreKey, "1007", "14").Val().([]interface{})
assert.Equal(t, []interface{}{int64(1000), int64(1004), int64(1004)}, res)

samples := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
require.Equal(t, 2, len(samples))
assert.Equal(t, []interface{}{int64(1000), float64(10)}, samples[0])
assert.Equal(t, []interface{}{int64(1004), float64(13)}, samples[1])
})

t.Run("TS.MADD Nonexistent Key", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
require.NoError(t, rdb.Del(ctx, "existent").Err())
Expand Down
Loading