diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 8cf7b6223..1066305b2 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -26,8 +26,12 @@ #include #include +#include +#include "common/device_id.h" +#include "common/statistic.h" #include "common/tablet.h" +#include "common/tsfile_common.h" #include "reader/result_set.h" #include "reader/table_result_set.h" #include "reader/tsfile_reader.h" @@ -695,6 +699,378 @@ DeviceSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader reader, return device_schema; } +const DeviceID tsfile_c_metadata_empty_device_list_marker = {nullptr}; + +namespace { + +char* dup_common_string_to_cstr(const common::String& s) { + if (s.buf_ == nullptr || s.len_ == 0) { + return strdup(""); + } + char* p = static_cast(malloc(static_cast(s.len_) + 1U)); + if (p == nullptr) { + return nullptr; + } + memcpy(p, s.buf_, static_cast(s.len_)); + p[s.len_] = '\0'; + return p; +} + +void free_timeseries_statistic_heap(TimeseriesStatistic* s) { + if (s == nullptr) { + return; + } + free(s->str_min); + s->str_min = nullptr; + free(s->str_max); + s->str_max = nullptr; + free(s->str_first); + s->str_first = nullptr; + free(s->str_last); + s->str_last = nullptr; +} + +void clear_timeseries_statistic(TimeseriesStatistic* s) { + memset(s, 0, sizeof(*s)); +} + +/** + * Fills @p out from C++ Statistic. On allocation failure returns E_OOM and + * clears/frees any partial string fields in @p out. + */ +int fill_timeseries_statistic(storage::Statistic* st, + TimeseriesStatistic* out) { + clear_timeseries_statistic(out); + if (st == nullptr) { + return common::E_OK; + } + out->has_statistic = true; + out->row_count = st->get_count(); + out->start_time = st->start_time_; + out->end_time = st->get_end_time(); + out->sum_valid = false; + out->sum = 0.0; + const common::TSDataType t = st->get_type(); + switch (t) { + case common::BOOLEAN: { + auto* bs = static_cast(st); + out->sum_valid = true; + out->sum = static_cast(bs->sum_value_); + out->bool_ext_valid = true; + out->first_bool = bs->first_value_; + out->last_bool = bs->last_value_; + break; + } + case common::INT32: + case common::DATE: { + auto* is = static_cast(st); + out->sum_valid = true; + out->sum = static_cast(is->sum_value_); + if (out->row_count > 0) { + out->int_range_valid = true; + out->min_int64 = static_cast(is->min_value_); + out->max_int64 = static_cast(is->max_value_); + out->first_int64 = static_cast(is->first_value_); + out->last_int64 = static_cast(is->last_value_); + } + break; + } + case common::INT64: + case common::TIMESTAMP: { + auto* ls = static_cast(st); + out->sum_valid = true; + out->sum = ls->sum_value_; + if (out->row_count > 0) { + out->int_range_valid = true; + out->min_int64 = ls->min_value_; + out->max_int64 = ls->max_value_; + out->first_int64 = ls->first_value_; + out->last_int64 = ls->last_value_; + } + break; + } + case common::FLOAT: { + auto* fs = static_cast(st); + out->sum_valid = true; + out->sum = static_cast(fs->sum_value_); + if (out->row_count > 0) { + out->float_range_valid = true; + out->min_float64 = static_cast(fs->min_value_); + out->max_float64 = static_cast(fs->max_value_); + out->first_float64 = static_cast(fs->first_value_); + out->last_float64 = static_cast(fs->last_value_); + } + break; + } + case common::DOUBLE: { + auto* ds = static_cast(st); + out->sum_valid = true; + out->sum = ds->sum_value_; + if (out->row_count > 0) { + out->float_range_valid = true; + out->min_float64 = ds->min_value_; + out->max_float64 = ds->max_value_; + out->first_float64 = ds->first_value_; + out->last_float64 = ds->last_value_; + } + break; + } + case common::STRING: { + auto* ss = static_cast(st); + out->str_ext_valid = true; + out->str_min = dup_common_string_to_cstr(ss->min_value_); + if (out->str_min == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + out->str_max = dup_common_string_to_cstr(ss->max_value_); + if (out->str_max == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + out->str_first = dup_common_string_to_cstr(ss->first_value_); + if (out->str_first == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + out->str_last = dup_common_string_to_cstr(ss->last_value_); + if (out->str_last == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + break; + } + case common::TEXT: { + auto* ts = static_cast(st); + out->str_ext_valid = true; + out->str_min = strdup(""); + out->str_max = strdup(""); + if (out->str_min == nullptr || out->str_max == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + out->str_first = dup_common_string_to_cstr(ts->first_value_); + if (out->str_first == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + out->str_last = dup_common_string_to_cstr(ts->last_value_); + if (out->str_last == nullptr) { + free_timeseries_statistic_heap(out); + clear_timeseries_statistic(out); + return common::E_OOM; + } + break; + } + default: + break; + } + return common::E_OK; +} + +void free_device_timeseries_metadata_entries_partial( + DeviceTimeseriesMetadataEntry* entries, size_t filled_count) { + if (entries == nullptr) { + return; + } + for (size_t i = 0; i < filled_count; i++) { + free(entries[i].device.path); + entries[i].device.path = nullptr; + if (entries[i].timeseries != nullptr) { + for (uint32_t j = 0; j < entries[i].timeseries_count; j++) { + free_timeseries_statistic_heap( + &entries[i].timeseries[j].statistic); + free(entries[i].timeseries[j].measurement_name); + } + free(entries[i].timeseries); + entries[i].timeseries = nullptr; + } + } + free(entries); +} + +} // namespace + +ERRNO tsfile_reader_get_all_devices(TsFileReader reader, DeviceID** out_devices, + uint32_t* out_length) { + if (reader == nullptr || out_devices == nullptr || out_length == nullptr) { + return common::E_INVALID_ARG; + } + *out_devices = nullptr; + *out_length = 0; + auto* r = static_cast(reader); + const auto ids = r->get_all_devices(); + if (ids.empty()) { + return common::E_OK; + } + auto* arr = static_cast(malloc(sizeof(DeviceID) * ids.size())); + if (arr == nullptr) { + return common::E_OOM; + } + memset(arr, 0, sizeof(DeviceID) * ids.size()); + for (size_t i = 0; i < ids.size(); i++) { + const std::string name = + ids[i] ? ids[i]->get_device_name() : std::string(); + arr[i].path = strdup(name.c_str()); + if (arr[i].path == nullptr) { + tsfile_free_device_id_array(arr, static_cast(i)); + return common::E_OOM; + } + } + *out_devices = arr; + *out_length = static_cast(ids.size()); + return common::E_OK; +} + +void tsfile_free_device_id_array(DeviceID* devices, uint32_t length) { + if (devices == nullptr) { + return; + } + for (uint32_t i = 0; i < length; i++) { + free(devices[i].path); + devices[i].path = nullptr; + } + free(devices); +} + +ERRNO tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID* device_ids, uint32_t length, + DeviceTimeseriesMetadataMap* out_map) { + if (reader == nullptr || out_map == nullptr) { + return common::E_INVALID_ARG; + } + out_map->entries = nullptr; + out_map->device_count = 0; + auto* r = static_cast(reader); + storage::DeviceTimeseriesMetadataMap cpp_map; + if (device_ids == nullptr) { + cpp_map = r->get_timeseries_metadata(); + } else if (length == 0) { + return common::E_OK; + } else { + std::vector> query_ids; + query_ids.reserve(length); + for (uint32_t i = 0; i < length; i++) { + if (device_ids[i].path == nullptr) { + return common::E_INVALID_ARG; + } + query_ids.push_back(std::make_shared( + std::string(device_ids[i].path))); + } + cpp_map = r->get_timeseries_metadata(query_ids); + } + if (cpp_map.empty()) { + return common::E_OK; + } + const uint32_t dev_n = static_cast(cpp_map.size()); + auto* entries = static_cast( + malloc(sizeof(DeviceTimeseriesMetadataEntry) * dev_n)); + if (entries == nullptr) { + return common::E_OOM; + } + memset(entries, 0, sizeof(DeviceTimeseriesMetadataEntry) * dev_n); + size_t di = 0; + for (const auto& kv : cpp_map) { + DeviceTimeseriesMetadataEntry& e = entries[di]; + const std::string dname = + kv.first ? kv.first->get_device_name() : std::string(); + e.device.path = strdup(dname.c_str()); + if (e.device.path == nullptr) { + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + const auto& vec = kv.second; + uint32_t n_ts = 0; + for (const auto& idx_nz : vec) { + if (idx_nz != nullptr) { + n_ts++; + } + } + e.timeseries_count = n_ts; + if (e.timeseries_count == 0) { + e.timeseries = nullptr; + di++; + continue; + } + e.timeseries = static_cast( + malloc(sizeof(TimeseriesMetadata) * e.timeseries_count)); + if (e.timeseries == nullptr) { + free(e.device.path); + e.device.path = nullptr; + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + memset(e.timeseries, 0, + sizeof(TimeseriesMetadata) * e.timeseries_count); + uint32_t slot = 0; + for (const auto& idx : vec) { + if (idx == nullptr) { + continue; + } + TimeseriesMetadata& m = e.timeseries[slot]; + common::String mn = idx->get_measurement_name(); + m.measurement_name = strdup(mn.to_std_string().c_str()); + if (m.measurement_name == nullptr) { + for (uint32_t u = 0; u < slot; u++) { + free_timeseries_statistic_heap(&e.timeseries[u].statistic); + free(e.timeseries[u].measurement_name); + } + free(e.timeseries); + e.timeseries = nullptr; + free(e.device.path); + e.device.path = nullptr; + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + m.data_type = static_cast(idx->get_data_type()); + storage::Statistic* st = idx->get_statistic(); + int32_t chunk_cnt = 0; + auto* cl = idx->get_chunk_meta_list(); + if (cl != nullptr) { + chunk_cnt = static_cast(cl->size()); + } + m.chunk_meta_count = chunk_cnt; + const int st_rc = fill_timeseries_statistic(st, &m.statistic); + if (st_rc != common::E_OK) { + for (uint32_t u = 0; u < slot; u++) { + free_timeseries_statistic_heap(&e.timeseries[u].statistic); + free(e.timeseries[u].measurement_name); + } + free_timeseries_statistic_heap(&m.statistic); + free(m.measurement_name); + free(e.timeseries); + e.timeseries = nullptr; + free(e.device.path); + e.device.path = nullptr; + free_device_timeseries_metadata_entries_partial(entries, di); + return st_rc; + } + slot++; + } + di++; + } + out_map->entries = entries; + out_map->device_count = dev_n; + return common::E_OK; +} + +void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap* map) { + if (map == nullptr) { + return; + } + free_device_timeseries_metadata_entries_partial(map->entries, + map->device_count); + map->entries = nullptr; + map->device_count = 0; +} + // delete pointer void _free_tsfile_ts_record(TsRecord* record) { if (*record != nullptr) { diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 4f4ce8d6e..4ab9c8611 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -104,6 +104,86 @@ typedef struct device_schema { int timeseries_num; } DeviceSchema; +/** + * @brief Device identifier for C API (canonical path string from IDeviceID). + */ +typedef struct DeviceID { + char* path; +} DeviceID; + +/** + * @brief Aggregated statistic for one timeseries (subset of C++ Statistic). + * + * String pointers str_* are allocated with malloc; freed by + * tsfile_free_device_timeseries_metadata_map (do not free individually). + */ +typedef struct TimeseriesStatistic { + bool has_statistic; + int32_t row_count; + int64_t start_time; + int64_t end_time; + /** True when @p sum is meaningful (numeric / boolean aggregate types). */ + bool sum_valid; + /** Sum when sum_valid; boolean uses sum of true as int-like aggregate. */ + double sum; + + /** INT32, DATE, INT64, TIMESTAMP: min/max/first/last in int64_t form. */ + bool int_range_valid; + int64_t min_int64; + int64_t max_int64; + int64_t first_int64; + int64_t last_int64; + + /** FLOAT, DOUBLE: min/max/first/last. */ + bool float_range_valid; + double min_float64; + double max_float64; + double first_float64; + double last_float64; + + /** BOOLEAN: first/last sample values. */ + bool bool_ext_valid; + bool first_bool; + bool last_bool; + + /** STRING: min/max lexicographic; TEXT: first/last only (min/max unused). + */ + bool str_ext_valid; + char* str_min; + char* str_max; + char* str_first; + char* str_last; +} TimeseriesStatistic; + +/** + * @brief One measurement's metadata as exposed to C. + */ +typedef struct TimeseriesMetadata { + char* measurement_name; + TSDataType data_type; + int32_t chunk_meta_count; + TimeseriesStatistic statistic; +} TimeseriesMetadata; + +typedef struct DeviceTimeseriesMetadataEntry { + DeviceID device; + TimeseriesMetadata* timeseries; + uint32_t timeseries_count; +} DeviceTimeseriesMetadataEntry; + +/** + * @brief Map device -> list of TimeseriesMetadata (C layout with explicit + * counts). + */ +typedef struct DeviceTimeseriesMetadataMap { + DeviceTimeseriesMetadataEntry* entries; + uint32_t device_count; +} DeviceTimeseriesMetadataMap; + +/** Sentinel: optional address for bindings when querying an empty device_id + * list (length 0). */ +extern const DeviceID tsfile_c_metadata_empty_device_list_marker; + typedef struct result_set_meta_data { char** column_names; TSDataType* data_types; @@ -316,6 +396,34 @@ ERRNO tsfile_writer_close(TsFileWriter writer); */ ERRNO tsfile_reader_close(TsFileReader reader); +/** + * @brief Lists all devices in the file. + * + * @param out_devices [out] Allocated array; caller frees with + * tsfile_free_device_id_array. + * @param out_length [out] Number of devices. + */ +ERRNO tsfile_reader_get_all_devices(TsFileReader reader, DeviceID** out_devices, + uint32_t* out_length); + +void tsfile_free_device_id_array(DeviceID* devices, uint32_t length); + +/** + * @brief Timeseries metadata for none, some, or all devices. + * + * @param device_ids NULL: all devices (length ignored). + * Non-NULL with length==0: empty result (E_OK), device_ids + * not read. Non-NULL with length>0: only these devices (existing only). + * @param out_map [out] Must point to zeroed struct; filled on success. + * Free with tsfile_free_device_timeseries_metadata_map. + */ +ERRNO tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID* device_ids, uint32_t length, + DeviceTimeseriesMetadataMap* out_map); + +void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap* map); + /*--------------------------Tablet API------------------------ */ /** diff --git a/cpp/test/cwrapper/cwrapper_metadata_test.cc b/cpp/test/cwrapper/cwrapper_metadata_test.cc new file mode 100644 index 000000000..6faadb676 --- /dev/null +++ b/cpp/test/cwrapper/cwrapper_metadata_test.cc @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include + +extern "C" { +#include "cwrapper/errno_define_c.h" +#include "cwrapper/tsfile_cwrapper.h" +} + +namespace cwrapper_metadata { + +class CWrapperMetadataTest : public testing::Test {}; + +TEST_F(CWrapperMetadataTest, GetAllDevicesAndMetadataWithStatistic) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_stat.tsfile"; + remove(filename); + + const char* device = "root.sg.d1"; + char* m_int = strdup("s_int"); + timeseries_schema sch{}; + sch.timeseries_name = m_int; + sch.data_type = TS_DATATYPE_INT32; + sch.encoding = TS_ENCODING_PLAIN; + sch.compression = TS_COMPRESSION_UNCOMPRESSED; + + auto* writer = static_cast( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_register_timeseries(writer, device, &sch)); + + for (int row = 0; row < 3; row++) { + auto* record = static_cast( + _ts_record_new(device, static_cast(row + 1), 1)); + const int32_t v = static_cast((row + 1) * 10); + ASSERT_EQ(RET_OK, _insert_data_into_ts_record_by_name_int32_t( + record, m_int, v)); + ASSERT_EQ(RET_OK, _tsfile_writer_write_ts_record(writer, record)); + _free_tsfile_ts_record(reinterpret_cast(&record)); + } + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + ASSERT_NE(nullptr, reader); + + DeviceID* devices = nullptr; + uint32_t n_dev = 0; + ASSERT_EQ(RET_OK, tsfile_reader_get_all_devices(reader, &devices, &n_dev)); + ASSERT_EQ(1u, n_dev); + ASSERT_NE(nullptr, devices); + ASSERT_STREQ(device, devices[0].path); + tsfile_free_device_id_array(devices, n_dev); + + DeviceTimeseriesMetadataMap map{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, &map)); + ASSERT_EQ(1u, map.device_count); + ASSERT_NE(nullptr, map.entries); + ASSERT_STREQ(device, map.entries[0].device.path); + ASSERT_EQ(1u, map.entries[0].timeseries_count); + ASSERT_NE(nullptr, map.entries[0].timeseries); + TimeseriesMetadata& tm = map.entries[0].timeseries[0]; + ASSERT_STREQ(m_int, tm.measurement_name); + ASSERT_EQ(TS_DATATYPE_INT32, tm.data_type); + ASSERT_TRUE(tm.statistic.has_statistic); + EXPECT_EQ(3, tm.statistic.row_count); + EXPECT_EQ(1, tm.statistic.start_time); + EXPECT_EQ(3, tm.statistic.end_time); + ASSERT_TRUE(tm.statistic.sum_valid); + EXPECT_DOUBLE_EQ(60.0, tm.statistic.sum); + ASSERT_TRUE(tm.statistic.int_range_valid); + EXPECT_EQ(10, tm.statistic.min_int64); + EXPECT_EQ(30, tm.statistic.max_int64); + EXPECT_EQ(10, tm.statistic.first_int64); + EXPECT_EQ(30, tm.statistic.last_int64); + + tsfile_free_device_timeseries_metadata_map(&map); + + DeviceTimeseriesMetadataMap empty{}; + ASSERT_EQ(RET_OK, tsfile_reader_get_timeseries_metadata( + reader, &tsfile_c_metadata_empty_device_list_marker, + 0, &empty)); + EXPECT_EQ(0u, empty.device_count); + EXPECT_EQ(nullptr, empty.entries); + + DeviceID q{}; + q.path = const_cast(device); + DeviceTimeseriesMetadataMap one{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, &q, 1, &one)); + ASSERT_EQ(1u, one.device_count); + tsfile_free_device_timeseries_metadata_map(&one); + + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + free(m_int); + remove(filename); +} + +TEST_F(CWrapperMetadataTest, GetTimeseriesMetadataBooleanStatistic) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_bool.tsfile"; + remove(filename); + + const char* device = "root.sg.bool"; + char* m_b = strdup("s_bool"); + timeseries_schema sch{}; + sch.timeseries_name = m_b; + sch.data_type = TS_DATATYPE_BOOLEAN; + sch.encoding = TS_ENCODING_PLAIN; + sch.compression = TS_COMPRESSION_UNCOMPRESSED; + + auto* writer = static_cast( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_register_timeseries(writer, device, &sch)); + + const bool vals[] = {true, false, true}; + for (int row = 0; row < 3; row++) { + auto* record = static_cast( + _ts_record_new(device, static_cast(row + 1), 1)); + ASSERT_EQ(RET_OK, _insert_data_into_ts_record_by_name_bool(record, m_b, + vals[row])); + ASSERT_EQ(RET_OK, _tsfile_writer_write_ts_record(writer, record)); + _free_tsfile_ts_record(reinterpret_cast(&record)); + } + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + + DeviceTimeseriesMetadataMap map{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, &map)); + TimeseriesMetadata& tm = map.entries[0].timeseries[0]; + ASSERT_STREQ(m_b, tm.measurement_name); + ASSERT_EQ(TS_DATATYPE_BOOLEAN, tm.data_type); + ASSERT_TRUE(tm.statistic.has_statistic); + ASSERT_TRUE(tm.statistic.sum_valid); + EXPECT_DOUBLE_EQ(2.0, tm.statistic.sum); + ASSERT_TRUE(tm.statistic.bool_ext_valid); + EXPECT_TRUE(tm.statistic.first_bool); + EXPECT_TRUE(tm.statistic.last_bool); + + tsfile_free_device_timeseries_metadata_map(&map); + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + free(m_b); + remove(filename); +} + +TEST_F(CWrapperMetadataTest, GetTimeseriesMetadataStringStatistic) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_str.tsfile"; + remove(filename); + + const char* device = "root.sg.str"; + char* m_str = strdup("s_str"); + timeseries_schema sch{}; + sch.timeseries_name = m_str; + sch.data_type = TS_DATATYPE_STRING; + sch.encoding = TS_ENCODING_PLAIN; + sch.compression = TS_COMPRESSION_UNCOMPRESSED; + + auto* writer = static_cast( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_register_timeseries(writer, device, &sch)); + + const char* vals[] = {"aa", "cc", "bb"}; + for (int row = 0; row < 3; row++) { + auto* record = static_cast( + _ts_record_new(device, static_cast(row + 1), 1)); + ASSERT_EQ(RET_OK, _insert_data_into_ts_record_by_name_string_with_len( + record, m_str, vals[row], + static_cast(std::strlen(vals[row])))); + ASSERT_EQ(RET_OK, _tsfile_writer_write_ts_record(writer, record)); + _free_tsfile_ts_record(reinterpret_cast(&record)); + } + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + + DeviceTimeseriesMetadataMap map{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, &map)); + ASSERT_EQ(1u, map.device_count); + TimeseriesMetadata& tm = map.entries[0].timeseries[0]; + ASSERT_STREQ(m_str, tm.measurement_name); + ASSERT_EQ(TS_DATATYPE_STRING, tm.data_type); + ASSERT_TRUE(tm.statistic.has_statistic); + ASSERT_TRUE(tm.statistic.str_ext_valid); + ASSERT_NE(nullptr, tm.statistic.str_min); + ASSERT_NE(nullptr, tm.statistic.str_max); + ASSERT_NE(nullptr, tm.statistic.str_first); + ASSERT_NE(nullptr, tm.statistic.str_last); + EXPECT_STREQ("aa", tm.statistic.str_min); + EXPECT_STREQ("cc", tm.statistic.str_max); + EXPECT_STREQ("aa", tm.statistic.str_first); + EXPECT_STREQ("bb", tm.statistic.str_last); + + tsfile_free_device_timeseries_metadata_map(&map); + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + free(m_str); + remove(filename); +} + +TEST_F(CWrapperMetadataTest, GetTimeseriesMetadataNullDevicePath) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_null_path.tsfile"; + remove(filename); + + auto* writer = static_cast( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + + DeviceID bad{}; + bad.path = nullptr; + DeviceTimeseriesMetadataMap map{}; + EXPECT_EQ(RET_INVALID_ARG, + tsfile_reader_get_timeseries_metadata(reader, &bad, 1, &map)); + + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + remove(filename); +} + +TEST_F(CWrapperMetadataTest, GetTimeseriesMetadataInvalidArgs) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_empty.tsfile"; + remove(filename); + + auto* writer = static_cast( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + + DeviceTimeseriesMetadataMap map{}; + EXPECT_NE(RET_OK, + tsfile_reader_get_timeseries_metadata(nullptr, nullptr, 0, &map)); + EXPECT_NE(RET_OK, tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, + nullptr)); + + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + remove(filename); +} + +} // namespace cwrapper_metadata diff --git a/python/tests/test_reader_metadata.py b/python/tests/test_reader_metadata.py new file mode 100644 index 000000000..fc10c40af --- /dev/null +++ b/python/tests/test_reader_metadata.py @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +import tempfile + +import pytest + +from tsfile import Field, RowRecord, TimeseriesSchema, TsFileReader, TsFileWriter +from tsfile import TSDataType +from tsfile.schema import DeviceID + + +def test_get_all_devices_and_timeseries_metadata_statistic(): + path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_stat.tsfile") + try: + os.unlink(path) + except OSError: + pass + + device = "root.sg.py_meta" + writer = TsFileWriter(path) + writer.register_timeseries( + device, TimeseriesSchema("m_int", TSDataType.INT32)) + for row in range(3): + v = (row + 1) * 10 + writer.write_row_record( + RowRecord( + device, + row + 1, + [Field("m_int", v, TSDataType.INT32)], + ) + ) + writer.close() + + reader = TsFileReader(path) + try: + devices = reader.get_all_devices() + assert len(devices) == 1 + assert devices[0].path == device + + meta_all = reader.get_timeseries_metadata(None) + assert list(meta_all.keys()) == [device] + series = meta_all[device] + assert len(series) == 1 + m = series[0] + assert m.measurement_name == "m_int" + assert m.data_type == TSDataType.INT32 + st = m.statistic + assert st.has_statistic + assert st.row_count == 3 + assert st.start_time == 1 + assert st.end_time == 3 + assert st.sum_valid + assert st.sum == pytest.approx(60.0) + assert st.int_range_valid + assert st.min_int64 == 10 + assert st.max_int64 == 30 + assert st.first_int64 == 10 + assert st.last_int64 == 30 + + assert reader.get_timeseries_metadata([]) == {} + + sub = reader.get_timeseries_metadata([DeviceID(device)]) + assert device in sub + assert len(sub[device]) == 1 + + sub_str = reader.get_timeseries_metadata([device]) + assert device in sub_str + finally: + reader.close() + try: + os.unlink(path) + except OSError: + pass + + +def test_get_timeseries_metadata_boolean_statistic(): + path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_bool.tsfile") + try: + os.unlink(path) + except OSError: + pass + + device = "root.sg.py_bool" + writer = TsFileWriter(path) + writer.register_timeseries( + device, TimeseriesSchema("m_b", TSDataType.BOOLEAN)) + for row, b in enumerate([True, False, True]): + writer.write_row_record( + RowRecord( + device, + row + 1, + [Field("m_b", b, TSDataType.BOOLEAN)], + ) + ) + writer.close() + + reader = TsFileReader(path) + try: + meta_all = reader.get_timeseries_metadata(None) + st = meta_all[device][0].statistic + assert st.has_statistic + assert st.sum_valid + assert st.sum == pytest.approx(2.0) + assert st.bool_ext_valid + assert st.first_bool is True + assert st.last_bool is True + finally: + reader.close() + try: + os.unlink(path) + except OSError: + pass + + +def test_get_timeseries_metadata_string_statistic(): + path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_str.tsfile") + try: + os.unlink(path) + except OSError: + pass + + device = "root.sg.py_str" + writer = TsFileWriter(path) + writer.register_timeseries( + device, TimeseriesSchema("m_str", TSDataType.STRING)) + for row, s in enumerate(["aa", "cc", "bb"]): + writer.write_row_record( + RowRecord( + device, + row + 1, + [Field("m_str", s, TSDataType.STRING)], + ) + ) + writer.close() + + reader = TsFileReader(path) + try: + meta_all = reader.get_timeseries_metadata(None) + m = meta_all[device][0] + assert m.measurement_name == "m_str" + assert m.data_type == TSDataType.STRING + st = m.statistic + assert st.has_statistic + assert st.str_ext_valid + assert st.str_min == "aa" + assert st.str_max == "cc" + assert st.str_first == "aa" + assert st.str_last == "bb" + finally: + reader.close() + try: + os.unlink(path) + except OSError: + pass diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index c89649bf3..10b2412ad 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -15,12 +15,63 @@ # specific language governing permissions and limitations # under the License. # -from typing import List +from dataclasses import dataclass +from typing import List, Optional from .exceptions import TypeMismatchError from .constants import TSDataType, ColumnCategory, TSEncoding, Compressor +@dataclass(frozen=True) +class DeviceID: + """Device path string as returned by the native reader (tree/table file layout).""" + + path: str + + def __str__(self) -> str: + return self.path + + +@dataclass(frozen=True) +class TimeseriesStatistic: + """Subset of file chunk statistic exposed through the C API.""" + + has_statistic: bool + row_count: int + start_time: int + end_time: int + sum_valid: bool + sum: float + int_range_valid: bool = False + min_int64: int = 0 + max_int64: int = 0 + first_int64: int = 0 + last_int64: int = 0 + float_range_valid: bool = False + min_float64: float = 0.0 + max_float64: float = 0.0 + first_float64: float = 0.0 + last_float64: float = 0.0 + bool_ext_valid: bool = False + first_bool: bool = False + last_bool: bool = False + str_ext_valid: bool = False + str_min: Optional[str] = None + str_max: Optional[str] = None + str_first: Optional[str] = None + str_last: Optional[str] = None + + +@dataclass(frozen=True) +class TimeseriesMetadata: + """Per-measurement metadata from get_timeseries_metadata (includes statistic when present).""" + + measurement_name: str + data_type: TSDataType + chunk_meta_count: int + statistic: TimeseriesStatistic + + class TimeseriesSchema: """ Metadata schema for a time series (name, data type, encoding, compression). diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 29008148d..10ba05a6c 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -103,6 +103,52 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": TimeseriesSchema * timeseries_schema int timeseries_num + ctypedef struct DeviceID: + char * path + + ctypedef struct TimeseriesStatistic: + bint has_statistic + int32_t row_count + int64_t start_time + int64_t end_time + bint sum_valid + double sum + bint int_range_valid + int64_t min_int64 + int64_t max_int64 + int64_t first_int64 + int64_t last_int64 + bint float_range_valid + double min_float64 + double max_float64 + double first_float64 + double last_float64 + bint bool_ext_valid + bint first_bool + bint last_bool + bint str_ext_valid + char* str_min + char* str_max + char* str_first + char* str_last + + ctypedef struct TimeseriesMetadata: + char * measurement_name + TSDataType data_type + int32_t chunk_meta_count + TimeseriesStatistic statistic + + ctypedef struct DeviceTimeseriesMetadataEntry: + DeviceID device + TimeseriesMetadata * timeseries + uint32_t timeseries_count + + ctypedef struct DeviceTimeseriesMetadataMap: + DeviceTimeseriesMetadataEntry * entries + uint32_t device_count + + const DeviceID tsfile_c_metadata_empty_device_list_marker + ctypedef struct ResultSetMetaData: char** column_names TSDataType * data_types @@ -218,6 +264,17 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": DeviceSchema * tsfile_reader_get_all_timeseries_schemas(TsFileReader reader, uint32_t * size); + ErrorCode tsfile_reader_get_all_devices(TsFileReader reader, + DeviceID ** out_devices, + uint32_t * out_length); + void tsfile_free_device_id_array(DeviceID * devices, uint32_t length); + + ErrorCode tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID * device_ids, uint32_t length, + DeviceTimeseriesMetadataMap * out_map); + void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap * map); + # resultSet : get data from resultSet bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code); bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t column_index); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 197a4ec87..b6baee80d 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -67,5 +67,8 @@ cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) +cdef public api object reader_get_all_devices_c(TsFileReader reader) +cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader, + object device_ids) cpdef public api object get_tsfile_config() cpdef public api void set_tsfile_config(dict new_config) \ No newline at end of file diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 4febeb731..91910175a 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -26,6 +26,7 @@ import numpy as np from libc.stdlib cimport free from libc.stdlib cimport malloc from libc.string cimport strdup +from libc.string cimport memset from cpython.exc cimport PyErr_SetObject from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsUTF8, PyUnicode_AsUTF8AndSize from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize @@ -36,6 +37,9 @@ from tsfile.schema import TSDataType as TSDataTypePy, TSEncoding as TSEncodingPy from tsfile.schema import Compressor as CompressorPy, ColumnCategory as CategoryPy from tsfile.schema import TableSchema as TableSchemaPy, ColumnSchema as ColumnSchemaPy from tsfile.schema import DeviceSchema as DeviceSchemaPy, TimeseriesSchema as TimeseriesSchemaPy +from tsfile.schema import DeviceID as ReaderDeviceID +from tsfile.schema import TimeseriesStatistic as TimeseriesStatisticPy +from tsfile.schema import TimeseriesMetadata as TimeseriesMetadataPy # check exception and set py exception object cdef inline void check_error(int errcode, const char * context=NULL) except*: @@ -922,3 +926,128 @@ cdef object get_all_timeseries_schema(TsFileReader reader): device_schemas.update([(schema_py.get_device_name(), schema_py)]) free(schemas) return device_schemas + +cdef object _c_str_to_py_utf8_or_none(char* p): + if p == NULL: + return None + return p.decode('utf-8') + +cdef object timeseries_metadata_c_to_py(TimeseriesMetadata* m): + cdef str name_py + if m == NULL or m.measurement_name == NULL: + name_py = "" + else: + name_py = m.measurement_name.decode('utf-8') + cdef object stat = TimeseriesStatisticPy( + bool(m.statistic.has_statistic), + int(m.statistic.row_count), + int(m.statistic.start_time), + int(m.statistic.end_time), + bool(m.statistic.sum_valid), + float(m.statistic.sum), + bool(m.statistic.int_range_valid), + int(m.statistic.min_int64), + int(m.statistic.max_int64), + int(m.statistic.first_int64), + int(m.statistic.last_int64), + bool(m.statistic.float_range_valid), + float(m.statistic.min_float64), + float(m.statistic.max_float64), + float(m.statistic.first_float64), + float(m.statistic.last_float64), + bool(m.statistic.bool_ext_valid), + bool(m.statistic.first_bool), + bool(m.statistic.last_bool), + bool(m.statistic.str_ext_valid), + _c_str_to_py_utf8_or_none(m.statistic.str_min), + _c_str_to_py_utf8_or_none(m.statistic.str_max), + _c_str_to_py_utf8_or_none(m.statistic.str_first), + _c_str_to_py_utf8_or_none(m.statistic.str_last), + ) + return TimeseriesMetadataPy( + name_py, + TSDataTypePy(m.data_type), + int(m.chunk_meta_count), + stat, + ) + +cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap): + cdef dict out = {} + cdef uint32_t di, ti + cdef char* p + cdef str key + cdef list series + for di in range(mmap.device_count): + p = mmap.entries[di].device.path + if p == NULL: + key = "" + else: + key = p.decode('utf-8') + series = [] + for ti in range(mmap.entries[di].timeseries_count): + series.append( + timeseries_metadata_c_to_py( + &mmap.entries[di].timeseries[ti])) + out[key] = series + return out + +cdef public api object reader_get_all_devices_c(TsFileReader reader): + cdef DeviceID* arr = NULL + cdef uint32_t n = 0 + cdef int err + cdef list out = [] + cdef uint32_t i + err = tsfile_reader_get_all_devices(reader, &arr, &n) + check_error(err) + try: + for i in range(n): + out.append(ReaderDeviceID(arr[i].path.decode('utf-8'))) + finally: + tsfile_free_device_id_array(arr, n) + return out + +cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader, + object device_ids): + cdef DeviceTimeseriesMetadataMap mmap + cdef DeviceID* q = NULL + cdef uint32_t qlen = 0 + cdef uint32_t i + cdef int err + cdef bytes bpath + cdef const char* raw + memset(&mmap, 0, sizeof(DeviceTimeseriesMetadataMap)) + if device_ids is None: + err = tsfile_reader_get_timeseries_metadata(reader, NULL, 0, &mmap) + check_error(err) + elif len(device_ids) == 0: + err = tsfile_reader_get_timeseries_metadata( + reader, &tsfile_c_metadata_empty_device_list_marker, 0, &mmap) + check_error(err) + else: + qlen = len(device_ids) + q = malloc(sizeof(DeviceID) * qlen) + if q == NULL: + raise MemoryError() + memset(q, 0, sizeof(DeviceID) * qlen) + try: + for i in range(qlen): + dev = device_ids[i] + try: + path_s = dev.path + except AttributeError: + path_s = str(dev) + bpath = path_s.encode('utf-8') + raw = PyBytes_AsString(bpath) + q[i].path = strdup(raw) + if q[i].path == NULL: + raise MemoryError() + err = tsfile_reader_get_timeseries_metadata(reader, q, qlen, &mmap) + check_error(err) + finally: + for i in range(qlen): + free(q[i].path) + free(q) + try: + return device_timeseries_metadata_map_to_py(&mmap) + finally: + tsfile_free_device_timeseries_metadata_map(&mmap) diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 3a1a15d4d..52a9a94f7 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -19,7 +19,7 @@ #cython: language_level=3 import weakref -from typing import List +from typing import List, Optional, Dict import pandas as pd from libc.stdint cimport INT64_MIN, INT64_MAX @@ -427,6 +427,21 @@ cdef class TsFileReaderPy: """ return get_all_timeseries_schema(self.reader) + def get_all_devices(self): + """ + Return all device IDs in the file as :class:`tsfile.schema.DeviceID`. + """ + return reader_get_all_devices_c(self.reader) + + def get_timeseries_metadata(self, device_ids: Optional[List] = None) -> Dict[str, list]: + """ + Return map device path -> list of :class:`tsfile.schema.TimeseriesMetadata`. + + ``device_ids is None``: all devices. ``device_ids == []``: empty map. + Non-empty list restricts to those devices (only existing devices appear). + """ + return reader_get_timeseries_metadata_c(self.reader, device_ids) + def close(self): """ Close TsFile Reader, if reader has result sets, invalid them.