Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 376 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@

#include <cstring>
#include <set>
#include <vector>

#include "common/device_id.h"
#include "common/statistic.h"
#include "common/tablet.h"
#include "common/tsfile_common.h"
#include "reader/result_set.h"
#include "reader/table_result_set.h"
#include "reader/tsfile_reader.h"
Expand Down Expand Up @@ -695,6 +699,378 @@ DeviceSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader reader,
return device_schema;
}

const DeviceID tsfile_c_metadata_empty_device_list_marker = {nullptr};

namespace {

char* dup_common_string_to_cstr(const common::String& s) {
if (s.buf_ == nullptr || s.len_ == 0) {
return strdup("");
}
char* p = static_cast<char*>(malloc(static_cast<size_t>(s.len_) + 1U));
if (p == nullptr) {
return nullptr;
}
memcpy(p, s.buf_, static_cast<size_t>(s.len_));
p[s.len_] = '\0';
return p;
}

void free_timeseries_statistic_heap(TimeseriesStatistic* s) {
if (s == nullptr) {
return;
}
free(s->str_min);
s->str_min = nullptr;
free(s->str_max);
s->str_max = nullptr;
free(s->str_first);
s->str_first = nullptr;
free(s->str_last);
s->str_last = nullptr;
}

void clear_timeseries_statistic(TimeseriesStatistic* s) {
memset(s, 0, sizeof(*s));
}

/**
* Fills @p out from C++ Statistic. On allocation failure returns E_OOM and
* clears/frees any partial string fields in @p out.
*/
int fill_timeseries_statistic(storage::Statistic* st,
TimeseriesStatistic* out) {
clear_timeseries_statistic(out);
if (st == nullptr) {
return common::E_OK;
}
out->has_statistic = true;
out->row_count = st->get_count();
out->start_time = st->start_time_;
out->end_time = st->get_end_time();
out->sum_valid = false;
out->sum = 0.0;
const common::TSDataType t = st->get_type();
switch (t) {
case common::BOOLEAN: {
auto* bs = static_cast<storage::BooleanStatistic*>(st);
out->sum_valid = true;
out->sum = static_cast<double>(bs->sum_value_);
out->bool_ext_valid = true;
out->first_bool = bs->first_value_;
out->last_bool = bs->last_value_;
break;
}
case common::INT32:
case common::DATE: {
auto* is = static_cast<storage::Int32Statistic*>(st);
out->sum_valid = true;
out->sum = static_cast<double>(is->sum_value_);
if (out->row_count > 0) {
out->int_range_valid = true;
out->min_int64 = static_cast<int64_t>(is->min_value_);
out->max_int64 = static_cast<int64_t>(is->max_value_);
out->first_int64 = static_cast<int64_t>(is->first_value_);
out->last_int64 = static_cast<int64_t>(is->last_value_);
}
break;
}
case common::INT64:
case common::TIMESTAMP: {
auto* ls = static_cast<storage::Int64Statistic*>(st);
out->sum_valid = true;
out->sum = ls->sum_value_;
if (out->row_count > 0) {
out->int_range_valid = true;
out->min_int64 = ls->min_value_;
out->max_int64 = ls->max_value_;
out->first_int64 = ls->first_value_;
out->last_int64 = ls->last_value_;
}
break;
}
case common::FLOAT: {
auto* fs = static_cast<storage::FloatStatistic*>(st);
out->sum_valid = true;
out->sum = static_cast<double>(fs->sum_value_);
if (out->row_count > 0) {
out->float_range_valid = true;
out->min_float64 = static_cast<double>(fs->min_value_);
out->max_float64 = static_cast<double>(fs->max_value_);
out->first_float64 = static_cast<double>(fs->first_value_);
out->last_float64 = static_cast<double>(fs->last_value_);
}
break;
}
case common::DOUBLE: {
auto* ds = static_cast<storage::DoubleStatistic*>(st);
out->sum_valid = true;
out->sum = ds->sum_value_;
if (out->row_count > 0) {
out->float_range_valid = true;
out->min_float64 = ds->min_value_;
out->max_float64 = ds->max_value_;
out->first_float64 = ds->first_value_;
out->last_float64 = ds->last_value_;
}
break;
}
case common::STRING: {
auto* ss = static_cast<storage::StringStatistic*>(st);
out->str_ext_valid = true;
out->str_min = dup_common_string_to_cstr(ss->min_value_);
if (out->str_min == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
out->str_max = dup_common_string_to_cstr(ss->max_value_);
if (out->str_max == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
out->str_first = dup_common_string_to_cstr(ss->first_value_);
if (out->str_first == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
out->str_last = dup_common_string_to_cstr(ss->last_value_);
if (out->str_last == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
break;
}
case common::TEXT: {
auto* ts = static_cast<storage::TextStatistic*>(st);
out->str_ext_valid = true;
out->str_min = strdup("");
out->str_max = strdup("");
if (out->str_min == nullptr || out->str_max == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
out->str_first = dup_common_string_to_cstr(ts->first_value_);
if (out->str_first == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
out->str_last = dup_common_string_to_cstr(ts->last_value_);
if (out->str_last == nullptr) {
free_timeseries_statistic_heap(out);
clear_timeseries_statistic(out);
return common::E_OOM;
}
break;
}
default:
break;
}
return common::E_OK;
}

void free_device_timeseries_metadata_entries_partial(
DeviceTimeseriesMetadataEntry* entries, size_t filled_count) {
if (entries == nullptr) {
return;
}
for (size_t i = 0; i < filled_count; i++) {
free(entries[i].device.path);
entries[i].device.path = nullptr;
if (entries[i].timeseries != nullptr) {
for (uint32_t j = 0; j < entries[i].timeseries_count; j++) {
free_timeseries_statistic_heap(
&entries[i].timeseries[j].statistic);
free(entries[i].timeseries[j].measurement_name);
}
free(entries[i].timeseries);
entries[i].timeseries = nullptr;
}
}
free(entries);
}

} // namespace

ERRNO tsfile_reader_get_all_devices(TsFileReader reader, DeviceID** out_devices,
uint32_t* out_length) {
if (reader == nullptr || out_devices == nullptr || out_length == nullptr) {
return common::E_INVALID_ARG;
}
*out_devices = nullptr;
*out_length = 0;
auto* r = static_cast<storage::TsFileReader*>(reader);
const auto ids = r->get_all_devices();
if (ids.empty()) {
return common::E_OK;
}
auto* arr = static_cast<DeviceID*>(malloc(sizeof(DeviceID) * ids.size()));
if (arr == nullptr) {
return common::E_OOM;
}
memset(arr, 0, sizeof(DeviceID) * ids.size());
for (size_t i = 0; i < ids.size(); i++) {
const std::string name =
ids[i] ? ids[i]->get_device_name() : std::string();
arr[i].path = strdup(name.c_str());
if (arr[i].path == nullptr) {
tsfile_free_device_id_array(arr, static_cast<uint32_t>(i));
return common::E_OOM;
}
}
*out_devices = arr;
*out_length = static_cast<uint32_t>(ids.size());
return common::E_OK;
}

void tsfile_free_device_id_array(DeviceID* devices, uint32_t length) {
if (devices == nullptr) {
return;
}
for (uint32_t i = 0; i < length; i++) {
free(devices[i].path);
devices[i].path = nullptr;
}
free(devices);
}

ERRNO tsfile_reader_get_timeseries_metadata(
TsFileReader reader, const DeviceID* device_ids, uint32_t length,
DeviceTimeseriesMetadataMap* out_map) {
if (reader == nullptr || out_map == nullptr) {
return common::E_INVALID_ARG;
}
out_map->entries = nullptr;
out_map->device_count = 0;
auto* r = static_cast<storage::TsFileReader*>(reader);
storage::DeviceTimeseriesMetadataMap cpp_map;
if (device_ids == nullptr) {
cpp_map = r->get_timeseries_metadata();
} else if (length == 0) {
return common::E_OK;
} else {
std::vector<std::shared_ptr<storage::IDeviceID>> query_ids;
query_ids.reserve(length);
for (uint32_t i = 0; i < length; i++) {
if (device_ids[i].path == nullptr) {
return common::E_INVALID_ARG;
}
query_ids.push_back(std::make_shared<storage::StringArrayDeviceID>(
std::string(device_ids[i].path)));
}
cpp_map = r->get_timeseries_metadata(query_ids);
}
if (cpp_map.empty()) {
return common::E_OK;
}
const uint32_t dev_n = static_cast<uint32_t>(cpp_map.size());
auto* entries = static_cast<DeviceTimeseriesMetadataEntry*>(
malloc(sizeof(DeviceTimeseriesMetadataEntry) * dev_n));
if (entries == nullptr) {
return common::E_OOM;
}
memset(entries, 0, sizeof(DeviceTimeseriesMetadataEntry) * dev_n);
size_t di = 0;
for (const auto& kv : cpp_map) {
DeviceTimeseriesMetadataEntry& e = entries[di];
const std::string dname =
kv.first ? kv.first->get_device_name() : std::string();
e.device.path = strdup(dname.c_str());
if (e.device.path == nullptr) {
free_device_timeseries_metadata_entries_partial(entries, di);
return common::E_OOM;
}
const auto& vec = kv.second;
uint32_t n_ts = 0;
for (const auto& idx_nz : vec) {
if (idx_nz != nullptr) {
n_ts++;
}
}
e.timeseries_count = n_ts;
if (e.timeseries_count == 0) {
e.timeseries = nullptr;
di++;
continue;
}
e.timeseries = static_cast<TimeseriesMetadata*>(
malloc(sizeof(TimeseriesMetadata) * e.timeseries_count));
if (e.timeseries == nullptr) {
free(e.device.path);
e.device.path = nullptr;
free_device_timeseries_metadata_entries_partial(entries, di);
return common::E_OOM;
}
memset(e.timeseries, 0,
sizeof(TimeseriesMetadata) * e.timeseries_count);
uint32_t slot = 0;
for (const auto& idx : vec) {
if (idx == nullptr) {
continue;
}
TimeseriesMetadata& m = e.timeseries[slot];
common::String mn = idx->get_measurement_name();
m.measurement_name = strdup(mn.to_std_string().c_str());
if (m.measurement_name == nullptr) {
for (uint32_t u = 0; u < slot; u++) {
free_timeseries_statistic_heap(&e.timeseries[u].statistic);
free(e.timeseries[u].measurement_name);
}
free(e.timeseries);
e.timeseries = nullptr;
free(e.device.path);
e.device.path = nullptr;
free_device_timeseries_metadata_entries_partial(entries, di);
return common::E_OOM;
}
m.data_type = static_cast<TSDataType>(idx->get_data_type());
storage::Statistic* st = idx->get_statistic();
int32_t chunk_cnt = 0;
auto* cl = idx->get_chunk_meta_list();
if (cl != nullptr) {
chunk_cnt = static_cast<int32_t>(cl->size());
}
m.chunk_meta_count = chunk_cnt;
const int st_rc = fill_timeseries_statistic(st, &m.statistic);
if (st_rc != common::E_OK) {
for (uint32_t u = 0; u < slot; u++) {
free_timeseries_statistic_heap(&e.timeseries[u].statistic);
free(e.timeseries[u].measurement_name);
}
free_timeseries_statistic_heap(&m.statistic);
free(m.measurement_name);
free(e.timeseries);
e.timeseries = nullptr;
free(e.device.path);
e.device.path = nullptr;
free_device_timeseries_metadata_entries_partial(entries, di);
return st_rc;
}
slot++;
}
di++;
}
out_map->entries = entries;
out_map->device_count = dev_n;
return common::E_OK;
}

void tsfile_free_device_timeseries_metadata_map(
DeviceTimeseriesMetadataMap* map) {
if (map == nullptr) {
return;
}
free_device_timeseries_metadata_entries_partial(map->entries,
map->device_count);
map->entries = nullptr;
map->device_count = 0;
}

// delete pointer
void _free_tsfile_ts_record(TsRecord* record) {
if (*record != nullptr) {
Expand Down
Loading
Loading