diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 75684ce0c..ff1789d7c 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -26,7 +26,6 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "GNU") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -D__STDC_FORMAT_MACROS") endif() -message("cmake using: USE_CPP11=${USE_CPP11}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") if(DEFINED ENV{CXX}) @@ -104,7 +103,13 @@ add_subdirectory(third_party) add_subdirectory(src) add_subdirectory(test) add_subdirectory(examples) +add_subdirectory(bench_mark) +set(TESTS_ENABLED ON) if(TESTS_ENABLED) add_dependencies(TsFile_Test tsfile) endif() +set(BENCH_MARK_ENABLED ON) +if(BENCH_MARK_ENABLED) + add_dependencies(bench_mark tsfile) +endif() diff --git a/cpp/bench_mark/CMakeLists.txt b/cpp/bench_mark/CMakeLists.txt index 6db63999c..7d7e96c53 100644 --- a/cpp/bench_mark/CMakeLists.txt +++ b/cpp/bench_mark/CMakeLists.txt @@ -17,17 +17,42 @@ specific language governing permissions and limitations under the License. ]] message("Running in bench_mark directory") +cmake_minimum_required(VERSION 3.11) +project(tsfile_bench_mark_project) + if(DEFINED ENV{CXX}) set(CMAKE_CXX_COMPILER $ENV{CXX}) endif() -set(CMAKE_CXX_FLAGS "$ENV{CXX_FLAGS} -Wall -Werror") +include_directories( + ${LIBRARY_INCLUDE_DIR} + ${THIRD_PARTY_INCLUDE} + ${CMAKE_SOURCE_DIR}/third_party/lz4 + ${CMAKE_SOURCE_DIR}/third_party/lzokay + ${CMAKE_SOURCE_DIR}/third_party/zlib-1.2.13 + ${CMAKE_SOURCE_DIR}/third_party/google_snappy + ${CMAKE_SOURCE_DIR}/third_party/antlr4-cpp-runtime-4/runtime/src + ${CMAKE_SOURCE_DIR}/src + +) -if (${USE_CPP11}) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") - set(CMAKE_CXX_STANDARD 11) +link_directories(${LIBRARY_OUTPUT_PATH}) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +if (CMAKE_BUILD_TYPE STREQUAL "Debug") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0") else() - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++03") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3") +endif() +message("CMAKE DEBUG: CMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}") + +add_executable(bench_mark + src/bench_mark_cpp.cc + src/bench_mark_c.cc + src/bench_mark.cc + src/bench_mark_utils.cc) + +if (WIN32) + target_link_libraries(bench_mark Psapi) endif() -add_subdirectory(bench_mark_src) \ No newline at end of file +target_link_libraries(bench_mark tsfile) diff --git a/cpp/bench_mark/bench_mark_src/CMakeLists.txt b/cpp/bench_mark/bench_mark_src/CMakeLists.txt deleted file mode 100644 index dbad71f91..000000000 --- a/cpp/bench_mark/bench_mark_src/CMakeLists.txt +++ /dev/null @@ -1,57 +0,0 @@ -#[[ -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -]] -cmake_minimum_required(VERSION 3.1) -project(libtsfile_bench_mark_project) -message("Running in bench_mark/bench_mark_src directory") -if(DEFINED ENV{CXX}) - set(CMAKE_CXX_COMPILER $ENV{CXX}) -endif() - -set(SDK_BENCH_MARK_DIR ${PROJECT_SOURCE_DIR}/) -message("PROJECT DIR: ${SDK_BENCH_MARK_DIR}") -set(SDK_INCLUDE_DIR_DEBUG ${SKD_BENCHH_MARK_DIR}../../build/Debug/bin/libtsfile_sdk/include) -set(SDK_INCLUDE_DIR_RELEASE ${SKD_BENCHH_MARK_DIR}../../build/Release/bin/libtsfile_sdk/include) -set(SDK_LIB_DIR_DEBUG ${SKD_BENCHH_MARK_DIR}../../build/Debug/bin/libtsfile_sdk/lib) -set(SDK_LIB_DIR_RELEASE ${SKD_BENCHH_MARK_DIR}../../build/Release/bin/libtsfile_sdk/lib) - -if (USE_SDK_DEBUG) - SET(SKD_INCLUDE_DIR ${SDK_INCLUDE_DIR_DEBUG}) - SET(SDK_LIB_DIR ${SDK_LIB_DIR_DEBUG}) - SET(CMAKE_CXX_FLAGS "-g -O0") -else() - SET(SKD_INCLUDE_DIR ${SDK_INCLUDE_DIR_RELEASE}) - SET(SDK_LIB_DIR ${SDK_LIB_DIR_RELEASE}) - SET(CMAKE_CXX_FLAGS "-O3") -endif() - -include_directories(${SKD_INCLUDE_DIR}) -set(MAKE_INCLUDE ${CMAKE_CURRENT_SOURCE_DIR}/../../src) -include_directories(${MAKE_INCLUDE}) -message("MAKE_INCLUDE: ${MAKE_INCLUDE}") -message("SDK_INCLUDE_DIR: ${SKD_INCLUDE_DIR}") -message("SDK_LIB_DIR: ${SDK_LIB_DIR}") - -link_directories(${SDK_LIB_DIR}) -find_library(my_tsfile_lib NAMES tsfile PATHS ${SDK_LIB_DIR} NO_DEFAULT_PATH REQUIRED) -add_executable(bench_mark_src bench_mark.cc) -target_link_libraries(bench_mark_src ${my_tsfile_lib}) - - - - diff --git a/cpp/bench_mark/bench_mark_src/bench_mark.cc b/cpp/bench_mark/bench_mark_src/bench_mark.cc deleted file mode 100644 index 09c9eb98f..000000000 --- a/cpp/bench_mark/bench_mark_src/bench_mark.cc +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include -#include -#include -#include -#include - -#include "bench_conf.h" -#include "common/db_common.h" -#include "common/global.h" -#include "common/path.h" -#include "writer/tsfile_writer.h" - -std::vector register_timeseries(storage::TsFileWriter& writer, - int timeseries_num, - std::vector type_list) { - auto start = std::chrono::high_resolution_clock::now(); - int sum = std::accumulate(type_list.begin(), type_list.end(), 0); - std::vector ratio_list; - for (int i = 0; i < type_list.size(); i++) { - ratio_list.push_back((float)type_list[i] / sum); - } - std::vector type_num; - for (int i = 0; i < common::TSDataType::TEXT - 1; i++) { - type_num.push_back((int)std::ceil(timeseries_num * ratio_list[i])); - } - type_num.push_back(timeseries_num - - std::accumulate(type_num.begin(), type_num.end(), 0)); - writer.open("/tmp/tsfile_test.tsfile", O_CREAT | O_RDWR, 0644); - int ind = 0; - int ret = 0; - int type = 0; - for (auto num : type_num) { - for (int i = 0; i < num; i++) { - std::string device_name = "root.db001.dev" + std::to_string(ind); - std::string measurement_name = "m" + std::to_string(ind); - ret = writer.register_timeseries( - device_name, measurement_name, (common::TSDataType)type, - common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); - ind++; - } - std::cout << "register finished for TsDataType" - << common::s_data_type_names[type] - << " timeseries num: " << num << std::endl; - type++; - } - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed = end - start; - std::cout << "register " << timeseries_num << "timeseries in file" - << "./test_data/tsfile_test.tsfile" << std::endl; - std::cout << "register timeseries cost time: " << elapsed.count() << "s" - << std::endl; - return type_num; -} - -void test_writer_benchmark(storage::TsFileWriter& writer, int loop_num, - std::vector type_num) { - std::cout << "start writing data" << std::endl; - auto start = std::chrono::high_resolution_clock::now(); - int type = 0; - for (int i = 0; i < loop_num; i++) { - int ind = 0; - for (auto num : type_num) { - for (int j = 0; j < num; j++) { - std::string device_name = - "root.db001.dev" + std::to_string(ind); - std::string measurement_name = "m" + std::to_string(ind); - long long currentTimeStamp = i; - storage::TsRecord record(currentTimeStamp, device_name, 1); - switch (type) { - case common::INT32: { - storage::DataPoint point(measurement_name, 10000 + i); - record.points_.push_back(point); - break; - } - case common::INT64: { - storage::DataPoint point(measurement_name, - int64_t(10000 + i)); - record.points_.push_back(point); - break; - } - case common::BOOLEAN: { - storage::DataPoint point(measurement_name, i / 2 == 0); - record.points_.push_back(point); - break; - } - case common::FLOAT: { - storage::DataPoint point(measurement_name, (float)i); - record.points_.push_back(point); - break; - } - case common::DOUBLE: { - storage::DataPoint point(measurement_name, (double)i); - record.points_.push_back(point); - break; - } - } - int ret = writer.write_record(record); - ASSERT(ret == 0); - ind++; - } - type++; - } - } - - auto end = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed = end - start; - int timeseries_num = std::accumulate(type_num.begin(), type_num.end(), 0); - std::cout << "writer loop: " << loop_num - << " timeseries num: " << timeseries_num << " records in file" - << "./test_data/tsfile_test.tsfile" << std::endl; - std::cout << "total num of points: " << loop_num * timeseries_num - << std::endl; - std::cout << "writer data cost time: " << elapsed.count() << "s" - << std::endl; - std::cout << "writer data speed:" - << loop_num * timeseries_num / elapsed.count() << " points/s" - << std::endl; - writer.flush(); - writer.close(); - auto end_flush = std::chrono::high_resolution_clock::now(); - std::chrono::duration elapsed_flush = end_flush - end; - std::cout << "flush data cost time: " << elapsed_flush.count() << "s" - << std::endl; -} - -int main() { - std::cout << "LibTsFile benchmark" << std::endl; - std::cout << "LOOP_NUM:" << bench::LOOP_NUM << std::endl; - std::cout << "THREAD_NUM:" << bench::THREAD_NUM << std::endl; - std::cout << "TIMESERIES_NUM:" << bench::TIMESERIES_NUM << std::endl; - std::cout << "TYPE_LIST: " << bench::TYPE_LIST[0] << ":" - << bench::TYPE_LIST[1] << ":" << bench::TYPE_LIST[2] << ":" - << bench::TYPE_LIST[3] << ":" << bench::TYPE_LIST[4] << ":" - << bench::TYPE_LIST[5] << std::endl; - std::cout << "init tsfile config value" << std::endl; - common::init_config_value(); - storage::TsFileWriter writer; - auto type_num = - register_timeseries(writer, bench::TIMESERIES_NUM, bench::TYPE_LIST); - test_writer_benchmark(writer, bench::LOOP_NUM, type_num); - return 0; -} diff --git a/cpp/bench_mark/build.sh b/cpp/bench_mark/build.sh index e0e4b0cb6..2f40dee36 100644 --- a/cpp/bench_mark/build.sh +++ b/cpp/bench_mark/build.sh @@ -18,33 +18,10 @@ # build_type=Debug -env_for_cyber=0 use_cpp11=1 -if [[ ${env_for_cyber} -eq 1 ]] -then - export PATH=$PATH:~/dev/gcc-linaro-5.5.0-2017.10-x86_64_arm-linux-gnueabi/bin - export CROSS_COMPILE=arm-linux-gnueabi- - export ARCH=arm - export CC=${CROSS_COMPILE}gcc - export CXX=${CROSS_COMPILE}g++ - echo "set up gcc for cyber" -fi +mkdir -p build/Release +cd build/Release - -if [ ${build_type} = "Debug" ] -then - mkdir -p build/Debug - cd build/Debug - use_sdk_debug=1 -else - mkdir -p build/Release - cd build/Release - use_sdk_debug=0 -fi - -echo "use_sdk_debug=${use_sdk_debug}" -cmake ../../ \ - -DUSE_SDK_DEBUG=$use_sdk_debug \ - -DUSE_CPP11=$use_cpp11 +cmake ../../ make diff --git a/cpp/bench_mark/bench_mark_src/bench_conf.h b/cpp/bench_mark/src/bench_conf.h similarity index 74% rename from cpp/bench_mark/bench_mark_src/bench_conf.h rename to cpp/bench_mark/src/bench_conf.h index 486d0b14f..728c3fc1e 100644 --- a/cpp/bench_mark/bench_mark_src/bench_conf.h +++ b/cpp/bench_mark/src/bench_conf.h @@ -17,11 +17,17 @@ * under the License. */ +#ifndef TSFILE_BENCH_MARK_BENCH_CONF_H +#define TSFILE_BENCH_MARK_BENCH_CONF_H + #include namespace bench { -int LOOP_NUM = 100000; -int THREAD_NUM = 1; -int TIMESERIES_NUM = 50; -std::vector TYPE_LIST = {0, 0, 1, 0, 1}; +static int tablet_num = 1000; +static int tag1_num = 1; +static int tag2_num = 10; +static int timestamp_per_tag = 1000; +static std::vector field_type_vector = {1, 1, 1, 1, 1}; } // namespace bench + +#endif // TSFILE_BENCH_MARK_BENCH_CONF_H diff --git a/cpp/bench_mark/src/bench_mark.cc b/cpp/bench_mark/src/bench_mark.cc new file mode 100644 index 000000000..0d19d95b5 --- /dev/null +++ b/cpp/bench_mark/src/bench_mark.cc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "bench_mark_c_cpp.h" + +int main(int argc, char* argv[]) { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return 1; + } + + std::string mode(argv[1]); + + if (mode == "cpp") { + bench_mark_cpp_write(); + bench_mark_cpp_read(); + } else if (mode == "c") { + bench_mark_c_write(); + bench_mark_c_read(); + } else { + std::cerr << "Invalid mode. Use 'cpp' or 'c'." << std::endl; + return 1; + } + + return 0; +} diff --git a/cpp/bench_mark/src/bench_mark_c.cc b/cpp/bench_mark/src/bench_mark_c.cc new file mode 100644 index 000000000..f4ddca9b8 --- /dev/null +++ b/cpp/bench_mark/src/bench_mark_c.cc @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include +#include + +#include + +#include "bench_conf.h" +#include "bench_mark_c_cpp.h" +#include "bench_mark_utils.h" +#include "cwrapper/tsfile_cwrapper.h" +#define HANDLE_ERROR(err_no) \ + do { \ + if (err_no != 0) { \ + printf("get err no: %d", err_no); \ + return err_no; \ + } \ + } while (0) + +char** column_list; +TSDataType* data_types_c; +int column_num_c = 0; + +int bench_mark_c_write() { + ERRNO code = 0; + char* table_name = "TestTable"; + print_config(false); + TableSchema table_schema; + + + table_schema.table_name = strdup(table_name); + int column = 0; + for (auto data_type : bench::field_type_vector) { + column += data_type; + } + column_list = new char*[column + 2]; + data_types_c = new TSDataType[column + 2]; + column_num_c = column; + + std::ofstream csv_file("memory_usage_c.csv"); + if (!csv_file.is_open()) { + std::cout << "csv create failed!" << std::endl; + return 0; + } + csv_file << "iter_num,memory_usage(kb)\n"; + int iter_num = 0; + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + + table_schema.column_schemas = + (ColumnSchema*)malloc(sizeof(ColumnSchema) * (2 + column)); + table_schema.column_num = column + 2; + table_schema.column_schemas[0] = + (ColumnSchema){.column_name = strdup("TAG1"), + .data_type = TS_DATATYPE_STRING, + .column_category = TAG}; + column_list[0] = strdup("TAG1"); + data_types_c[0] = TS_DATATYPE_STRING; + table_schema.column_schemas[1] = + (ColumnSchema){.column_name = strdup("TAG2"), + .data_type = TS_DATATYPE_STRING, + .column_category = TAG}; + column_list[1] = strdup("TAG2"); + data_types_c[1] = TS_DATATYPE_STRING; + + int col = 2; + for (int i = 0; i < bench::field_type_vector.size(); i++) { + int column_num = bench::field_type_vector[i]; + for (int j = 0; j < column_num; j++) { + column_list[col] = + strdup(std::string("FIELD" + std::to_string(i)).c_str()); + data_types_c[col] = static_cast(data_type[i]); + table_schema.column_schemas[col++] = (ColumnSchema){ + .column_name = + strdup(std::string("FIELD" + std::to_string(i)).c_str()), + .data_type = static_cast(data_type[i]), + .column_category = FIELD}; + } + } + + remove("bench_mark_c.tsfile"); + WriteFile file = write_file_new("bench_mark_c.tsfile", &code); + HANDLE_ERROR(code); + TsFileWriter writer = tsfile_writer_new(file, &table_schema, &code); + HANDLE_ERROR(code); + free_table_schema(table_schema); + int64_t prepare_time = 0; + int64_t writing_time = 0; + int64_t timestamp = 0; + int64_t row_num = + bench::tag1_num * bench::tag2_num * bench::timestamp_per_tag; + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < bench::tablet_num; i++) { + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + int cur_row = 0; + print_progress_bar(i, bench::tablet_num); + auto prepare_start = std::chrono::high_resolution_clock::now(); + auto tablet = + tablet_new(column_list, data_types_c, column + 2, row_num); + for (int tag1 = 0; tag1 < bench::tag1_num; tag1++) { + for (int tag2 = 0; tag2 < bench::tag2_num; tag2++) { + for (int row = 0; row < bench::timestamp_per_tag; row++) { + tablet_add_timestamp(tablet, cur_row, timestamp + row); + tablet_add_value_by_index_string( + tablet, cur_row, 0, + std::string("TAG1_" + std::to_string(tag1)).c_str()); + tablet_add_value_by_index_string( + tablet, cur_row, 1, + std::string("TAG2_" + std::to_string(tag2)).c_str()); + for (int col = 2; col < column + 2; col++) { + switch (data_types_c[col]) { + case TS_DATATYPE_INT32: + tablet_add_value_by_index_int32_t( + tablet, cur_row, col, + static_cast(timestamp)); + break; + case TS_DATATYPE_INT64: + tablet_add_value_by_index_int64_t( + tablet, cur_row, col, + static_cast(timestamp)); + break; + case TS_DATATYPE_FLOAT: + tablet_add_value_by_index_float( + tablet, cur_row, col, + static_cast(timestamp)); + break; + case TS_DATATYPE_DOUBLE: + tablet_add_value_by_index_double( + tablet, cur_row, col, + static_cast(timestamp)); + break; + case TS_DATATYPE_BOOLEAN: + tablet_add_value_by_index_bool( + tablet, cur_row, col, + static_cast(timestamp % 2)); + break; + default: + ; + } + } + cur_row++; + } + } + } + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + auto prepare_end = std::chrono::high_resolution_clock::now(); + prepare_time += std::chrono::duration_cast( + prepare_end - prepare_start) + .count(); + + auto writing_start = std::chrono::high_resolution_clock::now(); + tsfile_writer_write(writer, tablet); + auto writing_end = std::chrono::high_resolution_clock::now(); + writing_time += std::chrono::duration_cast( + writing_end - writing_start) + .count(); + free_tablet(&tablet); + timestamp += bench::timestamp_per_tag; + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + } + + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + auto close_start = std::chrono::high_resolution_clock::now(); + tsfile_writer_close(writer); + auto close_end = std::chrono::high_resolution_clock::now(); + + writing_time += std::chrono::duration_cast( + close_end - close_start) + .count(); + free_write_file(&file); + auto end = std::chrono::high_resolution_clock::now(); + + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + FILE* file_to_size = fopen("bench_mark_c.tsfile", "rb"); + if (!file_to_size) { + std::cout << "unable to open file" << std::endl; + return -1; + } + fseeko(file_to_size, 0, SEEK_END); + off_t size = ftello(file_to_size); + fclose(file_to_size); + + std::cout << "=======" << std::endl; + std::cout << "Finish writing for C" << std::endl; + std::cout << "Tsfile size is " << size << " bytes " << " ~ " << size / 1024 + << "KB" << std::endl; + + double pre_time = prepare_time / 1000.0 / 1000.0; + double write_time = writing_time / 1000.0 / 1000.0; + std::cout << "Preparing time is " << pre_time << " s" << std::endl; + std::cout << "Writing time is " << write_time << " s" << std::endl; + std::cout << "writing speed is " + << static_cast( + bench::tablet_num * bench::tag1_num * bench::tag2_num * + bench::timestamp_per_tag * (column_num_c + 2) / + (pre_time + write_time)) + << " points/s" << std::endl; + std::cout << "total time is " + << std::chrono::duration_cast(end - + start) + .count() / + 1000.0 / 1000.0 + << " s" << std::endl; + std::cout << "========" << std::endl; + return 0; +} + +int bench_mark_c_read() { + std::cout << "Bench mark c read" << std::endl; + int code = common::E_OK; + TsFileReader reader = tsfile_reader_new("bench_mark_c.tsfile", &code); + ResultSet result = + tsfile_query_table(reader, "TestTable", column_list, column_num_c, + INT64_MIN, INT64_MAX, &code); + int64_t row = 0; + auto read_start = std::chrono::high_resolution_clock::now(); + while (tsfile_result_set_next(result, &code) && code == common::E_OK) { + row++; + } + auto read_end = std::chrono::high_resolution_clock::now(); + int64_t total_points = row * column_num_c; + double reading_time; + reading_time = std::chrono::duration_cast( + read_end - read_start) + .count() / + 1000.0 / 1000.0; + + std::cout << "total points is " << total_points << std::endl; + std::cout << "reading time is " << reading_time << " s" << std::endl; + std::cout << "read speed:" + << static_cast(total_points / reading_time) + << " points/s" << std::endl; + std::cout << "====================" << std::endl; + free_tsfile_result_set(&result); + tsfile_reader_close(reader); + return 0; +} \ No newline at end of file diff --git a/cpp/bench_mark/bench_mark_src/bench_mark.h b/cpp/bench_mark/src/bench_mark_c_cpp.h similarity index 70% rename from cpp/bench_mark/bench_mark_src/bench_mark.h rename to cpp/bench_mark/src/bench_mark_c_cpp.h index e3bbd95d6..001de9c49 100644 --- a/cpp/bench_mark/bench_mark_src/bench_mark.h +++ b/cpp/bench_mark/src/bench_mark_c_cpp.h @@ -1,5 +1,5 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one +* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -16,3 +16,12 @@ * specific language governing permissions and limitations * under the License. */ +#ifndef TSFILE_BENCH_MARK_BENCH_MARK_C_CPP_H +#define TSFILE_BENCH_MARK_BENCH_MARK_C_CPP_H + +int bench_mark_c_write(); +int bench_mark_cpp_read(); +int bench_mark_cpp_write(); +int bench_mark_c_read(); + +#endif // TSFILE_BENCH_MARK_BENCH_MARK_C_CPP_H \ No newline at end of file diff --git a/cpp/bench_mark/src/bench_mark_cpp.cc b/cpp/bench_mark/src/bench_mark_cpp.cc new file mode 100644 index 000000000..2467c9f87 --- /dev/null +++ b/cpp/bench_mark/src/bench_mark_cpp.cc @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include +#include + +#include "bench_conf.h" +#include "bench_mark_c_cpp.h" +#include "bench_mark_utils.h" +#include "common/db_common.h" +#include "common/path.h" +#include "common/tablet.h" +#include "file/write_file.h" +#include "utils/db_utils.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +std::vector columns_name; +std::vector data_types; + +TableSchema* gen_table_schema(const std::vector& field_type_vector) { + std::vector schemas; + // 2 TAG Columns default + for (int i = 0; i < 2; i++) { + std::string column_name = std::string("TAG" + std::to_string(i)); + schemas.emplace_back(column_name, common::TSDataType::STRING, + common::ColumnCategory::TAG); + columns_name.push_back(column_name); + data_types.push_back(TSDataType::STRING); + } + for (int i = 0; i < field_type_vector.size(); i++) { + int column_num = field_type_vector[i]; + for (int j = 0; j < column_num; j++) { + auto column_name = + std::string("FIELD" + std::to_string(i) + std::to_string(j)); + auto type = static_cast(data_type[i]); + data_types.push_back(type); + columns_name.push_back(column_name); + schemas.emplace_back(column_name, type, ColumnCategory::FIELD); + } + } + return new TableSchema("TestTable", schemas); +} + +int bench_mark_cpp_write() { + int code = common::E_OK; + print_config(true); + remove("bench_mark_cpp.tsfile"); + libtsfile_init(); + // benchmark for write + WriteFile file = WriteFile(); + + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + code = file.create("bench_mark_cpp.tsfile", flags, mode); + if (code != common::E_OK) { + return -1; + } + + std::ofstream csv_file("memory_usage_cpp.csv"); + if (!csv_file.is_open()) { + std::cout << "csv create failed!" << std::endl; + return 0; + } + csv_file << "iter_num,memory_usage(kb)\n"; + int iter_num = 0; + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + + TableSchema* table_schema = gen_table_schema(bench::field_type_vector); + auto writer = new TsFileTableWriter(&file, table_schema); + delete (table_schema); + + int64_t timestamp = 0; + int64_t prepare_time = 0; + int64_t writing_time = 0; + + auto start = std::chrono::high_resolution_clock::now(); + + + for (int tablet_i = 0; tablet_i < bench::tablet_num; tablet_i++) { + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + print_progress_bar(tablet_i, bench::tablet_num); + auto prepare_start = std::chrono::high_resolution_clock::now(); + auto tablet = Tablet( + columns_name, data_types, + bench::tag1_num * bench::tag2_num * bench::timestamp_per_tag); + int row_num = 0; + for (int tag1 = 0; tag1 < bench::tag1_num; tag1++) { + for (int tag2 = 0; tag2 < bench::tag2_num; tag2++) { + for (int row = 0; row < bench::timestamp_per_tag; row++) { + tablet.add_timestamp(row_num, timestamp + row); + tablet.add_value( + row_num, 0, + std::string("tag1_" + std::to_string(tag1)).c_str()); + tablet.add_value( + row_num, 1, + std::string("tag2_" + std::to_string(tag2)).c_str()); + for (int col = 0; col < data_types.size(); col++) { + switch (data_types[col]) { + case TSDataType::INT32: + tablet.add_value( + row_num, col, + static_cast(timestamp)); + break; + case TSDataType::INT64: + tablet.add_value( + row_num, col, + static_cast(timestamp)); + break; + case TSDataType::FLOAT: + tablet.add_value( + row_num, col, + static_cast(timestamp * 1.1)); + break; + case TSDataType::DOUBLE: + tablet.add_value( + row_num, col, + static_cast(timestamp * 1.1)); + break; + + case TSDataType::BOOLEAN: + tablet.add_value( + row_num, col, + static_cast(timestamp % 2)); + break; + default: + ; + } + } + row_num++; + } + } + } + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + auto prepare_end = std::chrono::high_resolution_clock::now(); + + prepare_time += std::chrono::duration_cast( + prepare_end - prepare_start) + .count(); + + auto write_start = std::chrono::high_resolution_clock::now(); + writer->write_table(tablet); + auto write_end = std::chrono::high_resolution_clock::now(); + writing_time += std::chrono::duration_cast( + write_end - write_start) + .count(); + timestamp += bench::timestamp_per_tag; + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + } + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + auto close_start = std::chrono::high_resolution_clock::now(); + writer->flush(); + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + writer->close(); + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + auto close_end = std::chrono::high_resolution_clock::now(); + + writing_time += std::chrono::duration_cast( + close_end - close_start) + .count(); + delete writer; + auto end = std::chrono::high_resolution_clock::now(); + + FILE* file_to_size = fopen("bench_mark_cpp.tsfile", "rb"); + if (!file_to_size) { + std::cout << "unable to open file" << std::endl; + return -1; + } + csv_file << iter_num++ <<","<< get_memory_usage() << "\n"; + fseeko(file_to_size, 0, SEEK_END); + off_t size = ftello(file_to_size); + fclose(file_to_size); + + std::cout << "=======" << std::endl; + std::cout << "Finish writing for CPP" << std::endl; + std::cout << "Tsfile size is " << size << " bytes " << " ~ " << size / 1024 + << "KB" << std::endl; + + double pre_time = prepare_time / 1000.0 / 1000.0; + double write_time = writing_time / 1000.0 / 1000.0; + std::cout << "Preparing time is " << pre_time << " s" << std::endl; + std::cout << "Writing time is " << write_time << " s" << std::endl; + std::cout << "writing speed is " + << static_cast( + bench::tablet_num * bench::tag1_num * bench::tag2_num * + bench::timestamp_per_tag * data_types.size() / + (pre_time + write_time)) + << " points/s" << std::endl; + std::cout << "total time is " + << std::chrono::duration_cast(end - + start) + .count() / + 1000.0 / 1000.0 + << " s" << std::endl; + std::cout << "=====" << std::endl; + return 0; +} + +int bench_mark_cpp_read() { + std::cout << "bench mark cpp read" << std::endl; + libtsfile_init(); + int code = common::E_OK; + TsFileReader reader = TsFileReader(); + reader.open("bench_mark_cpp.tsfile"); + ResultSet* result_set = nullptr; + code = reader.query("TestTable", columns_name, INT64_MIN, INT64_MAX, + result_set); + bool has_next = false; + int row = 0; + auto read_start = std::chrono::high_resolution_clock::now(); + std::unordered_map columns_info; + while ((code = result_set->next(has_next)) == common::E_OK && has_next) { + row++; + } + result_set->close(); + reader.close(); + delete result_set; + auto read_end = std::chrono::high_resolution_clock::now(); + int64_t total_points = row * columns_name.size(); + double reading_time = std::chrono::duration_cast( + read_end - read_start) + .count() / + 1000.0 / 1000.0; + + std::cout << "total points is " << total_points << std::endl; + std::cout << "reading time is " << reading_time << " s" << std::endl; + std::cout << "read speed:" + << static_cast(total_points / reading_time) + << " points/s" << std::endl; + std::cout << "====================" << std::endl; + return 0; +} diff --git a/cpp/bench_mark/src/bench_mark_utils.cc b/cpp/bench_mark/src/bench_mark_utils.cc new file mode 100644 index 000000000..c4b4562f1 --- /dev/null +++ b/cpp/bench_mark/src/bench_mark_utils.cc @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "bench_mark_utils.h" +#ifndef _WIN32 +#include +#endif + +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#endif + +#ifdef __APPLE__ +#include +#include +#include +#include +#endif + + +#include "bench_conf.h" +using namespace bench; + +void print_config(bool is_cpp) { + std::cout << "====================" << std::endl; + std::cout << "TsFile Benchmark "<< (is_cpp ? "CPP" : "C") <<" Begin " << std::endl; + std::cout << "Tag Column num: " << 2 << std::endl; + std::cout << "TAG1 num: " << tag1_num << " TAG2 num: " << tag2_num + << std::endl; + std::cout << "Filed column (type x num) : " << std::endl; + int column_num = 0; + for (int i = 0; i < 5; i++) { + std::cout << data_types_name[i] << "x" << field_type_vector[i] << " "; + column_num += field_type_vector[i]; + } + + std::cout << std::endl; + std::cout << "Tablet num:" << tablet_num << std::endl; + std::cout << "Tablet row num per tag:" << timestamp_per_tag << std::endl; + std::cout << "Total points is " + << tablet_num * tag1_num * tag2_num * timestamp_per_tag * + (column_num) + << std::endl; +} + +void print_progress_bar(int current, int total, int barWidth) { + float progress = static_cast(current) / total; + int pos = barWidth * progress; + + std::cout << "["; + for (int i = 0; i < barWidth; ++i) { + if (i < pos) + std::cout << "="; + else if (i == pos) + std::cout << ">"; + else + std::cout << " "; + } + std::cout << "] " << int(progress * 100.0) << " %\r"; + std::cout.flush(); +} + +int get_memory_usage() { +#ifdef _WIN32 + PROCESS_MEMORY_COUNTERS pmc; + if (GetProcessMemoryInfo(GetCurrentProcess(), &pmc, sizeof(pmc))) { + return pmc.WorkingSetSize / 1024 ; + } else { + return 0; + } +#elif defined(__linux__) + std::ifstream status_file("/proc/self/status"); + std::string line; + while (std::getline(status_file, line)) { + if (line.find("VmRSS") == 0) { + size_t num_pos = line.find_first_of("0123456789"); + if (num_pos != std::string::npos) { + size_t end_pos = line.find_first_not_of("1234567890", num_pos); + std::string num_str = line.substr(num_pos, end_pos - num_pos); + + unsigned long vm_rss_kb = strtoul(num_str.c_str(), nullptr, 10); + return vm_rss_kb; + } + } + } + return 0; +#elif defined(__APPLE__) + + task_basic_info_data_t info; + mach_msg_type_number_t count = TASK_BASIC_INFO_COUNT; + kern_return_t ret = task_info( + mach_task_self(), + TASK_BASIC_INFO, + (task_info_t)&info, + &count + ); + if (ret == KERN_SUCCESS) { + return info.resident_size / 1024 ; + } else { + return 0; + } +#endif +} diff --git a/cpp/bench_mark/src/bench_mark_utils.h b/cpp/bench_mark/src/bench_mark_utils.h new file mode 100644 index 000000000..4027ffc61 --- /dev/null +++ b/cpp/bench_mark/src/bench_mark_utils.h @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef TSFILE_BENCH_MARK_BENCH_MARK_UTILS_H +#define TSFILE_BENCH_MARK_BENCH_MARK_UTILS_H + + +static const char* data_types_name[5] = {"BOOLEAN", "INT32", "INT64", "FLOAT", + "DOUBLE"}; +static const int data_type[5] = {0, 1, 2, 3, 4}; + +void print_config(bool is_cpp); +void print_progress_bar(int current, int total, int barWidth = 50); +int get_memory_usage(); + + +#endif // TSFILE_BENCH_MARK_BENCH_MARK_UTILS_H \ No newline at end of file diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h index 021cb6aaf..195207eb0 100644 --- a/cpp/src/common/device_id.h +++ b/cpp/src/common/device_id.h @@ -35,7 +35,7 @@ namespace storage { class IDeviceID { -public: + public: virtual ~IDeviceID() = default; virtual int serialize(common::ByteStream& write_stream) { return 0; } virtual int deserialize(common::ByteStream& read_stream) { return 0; } @@ -49,10 +49,10 @@ class IDeviceID { virtual bool operator==(const IDeviceID& other) { return false; } virtual bool operator!=(const IDeviceID& other) { return false; } -protected: + protected: IDeviceID() : empty_segments_() {} -private: + private: const std::vector empty_segments_; }; @@ -64,7 +64,7 @@ struct IDeviceIDComparator { }; class StringArrayDeviceID : public IDeviceID { -public: + public: explicit StringArrayDeviceID(const std::vector& segments) : segments_(formalize(segments)) {} @@ -73,14 +73,19 @@ class StringArrayDeviceID : public IDeviceID { explicit StringArrayDeviceID() : segments_() {} + StringArrayDeviceID(const std::vector& segments, bool internal) + : segments_(segments) {} + ~StringArrayDeviceID() override = default; std::string get_device_name() const override { - return segments_.empty() ? "" : std::accumulate(std::next(segments_.begin()), segments_.end(), - segments_.front(), - [](std::string a, const std::string& b) { - return std::move(a) + "." + b; - }); + return segments_.empty() + ? "" + : std::accumulate(std::next(segments_.begin()), + segments_.end(), segments_.front(), + [](std::string a, const std::string& b) { + return std::move(a) + "." + b; + }); }; int serialize(common::ByteStream& write_stream) override { @@ -88,12 +93,12 @@ class StringArrayDeviceID : public IDeviceID { if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), write_stream))) { return ret; - } + } for (const auto& segment : segments_) { - if (RET_FAIL(common::SerializationUtil::write_var_str(segment, - write_stream))) { + if (RET_FAIL(common::SerializationUtil::write_var_str( + segment, write_stream))) { return ret; - } + } } return ret; } @@ -101,13 +106,15 @@ class StringArrayDeviceID : public IDeviceID { int deserialize(common::ByteStream& read_stream) override { int ret = common::E_OK; uint32_t num_segments; - if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, read_stream))) { + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { return ret; } segments_.clear(); for (uint32_t i = 0; i < num_segments; ++i) { std::string segment; - if (RET_FAIL(common::SerializationUtil::read_var_str(segment, read_stream))) { + if (RET_FAIL(common::SerializationUtil::read_var_str( + segment, read_stream))) { return ret; } segments_.push_back(segment); @@ -133,17 +140,26 @@ class StringArrayDeviceID : public IDeviceID { } bool operator==(const IDeviceID& other) override { - auto other_segments = other.get_segments(); - return (segments_.size() == other_segments.size()) && - std::equal(segments_.begin(), segments_.end(), - other_segments.begin()); + auto const& other_segments = other.get_segments(); + if (segments_.size() != other_segments.size()) { + return false; + } + + for (size_t i = 0; i < segments_.size(); ++i) { + const std::string& a = segments_[i]; + const std::string& b = other_segments[i]; + + if (a.size() != b.size()) return false; + if (a != b) return false; + } + return true; } bool operator!=(const IDeviceID& other) override { return !(*this == other); } -private: + private: std::vector segments_; std::vector formalize( @@ -173,8 +189,9 @@ class StringArrayDeviceID : public IDeviceID { if (segment_cnt == 1) { // "root" -> {"root"} final_segments.push_back(splits[0]); - } else if (segment_cnt < static_cast( - storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + 1)) { + } else if (segment_cnt < + static_cast( + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + 1)) { // "root.a" -> {"root", "a"} // "root.a.b" -> {"root.a", "b"} std::string table_name = std::accumulate( @@ -184,26 +201,26 @@ class StringArrayDeviceID : public IDeviceID { }); final_segments.push_back(table_name); final_segments.push_back(splits.back()); - } else { - // "root.a.b.c" -> {"root.a.b", "c"} - // "root.a.b.c.d" -> {"root.a.b", "c", "d"} - std::string table_name = std::accumulate( - splits.begin(), - splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, - std::string(), [](const std::string& a, const std::string& b) { - return a.empty() ? b : a + storage::PATH_SEPARATOR + b; - }); - - final_segments.emplace_back(std::move(table_name)); - final_segments.insert( - final_segments.end(), - splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, - splits.end()); - } + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + std::string table_name = std::accumulate( + splits.begin(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + std::string(), [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + + final_segments.emplace_back(std::move(table_name)); + final_segments.insert( + final_segments.end(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + splits.end()); + } return final_segments; } }; -} +} // namespace storage #endif \ No newline at end of file diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h index 72fd028f5..67dbfb91a 100644 --- a/cpp/src/common/schema.h +++ b/cpp/src/common/schema.h @@ -63,7 +63,7 @@ struct MeasurementSchema { : measurement_name_(measurement_name), data_type_(data_type), encoding_(get_default_encoding_for_type(data_type)), - compression_type_(common::UNCOMPRESSED), + compression_type_(common::get_default_compression_for_type(data_type)), chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index ac4a2708b..a8c9e3eb2 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -325,6 +325,7 @@ void Tablet::set_column_categories( std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; + id_array.reserve(id_column_indexes_.size() + 1); id_array.push_back(insert_target_name_); for (auto id_column_idx : id_column_indexes_) { common::TSDataType data_type = INVALID_DATATYPE; @@ -339,7 +340,7 @@ std::shared_ptr Tablet::get_device_id(int i) const { break; } } - return std::make_shared(id_array); + return std::make_shared(id_array, true); } } // end namespace storage \ No newline at end of file diff --git a/cpp/src/common/tsfile_common.cc b/cpp/src/common/tsfile_common.cc index 89a007ba0..17c3d479b 100644 --- a/cpp/src/common/tsfile_common.cc +++ b/cpp/src/common/tsfile_common.cc @@ -260,7 +260,7 @@ int TsFileMeta::deserialize_from(common::ByteStream &in) { /* ================ MetaIndexNode ================ */ int MetaIndexNode::binary_search_children(std::shared_ptr key, bool exact_search, - IMetaIndexEntry &ret_index_entry, + std::shared_ptr &ret_index_entry, int64_t &ret_end_offset) { #if DEBUG_SE std::cout << "MetaIndexNode::binary_search_children start, name=" << key @@ -311,7 +311,7 @@ int MetaIndexNode::binary_search_children(std::shared_ptr key, bool return E_NOT_EXIST; } } - ret_index_entry.clone(children_[l], pa_); + ret_index_entry = children_[l]->clone(pa_); if (l == (int)children_.size() - 1) { ret_end_offset = this->end_offset_; } else { diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index 9fca01689..878fc4d21 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -738,15 +738,14 @@ struct IMetaIndexEntry { common::PageArena *pa) { return common::E_NOT_SUPPORT; } - virtual int64_t get_offset() const { return 0; } + virtual int64_t get_offset() const = 0; virtual bool is_device_level() const { return false; } virtual std::shared_ptr get_compare_key() const { return std::shared_ptr(); } virtual common::String get_name() const { return {}; } virtual std::shared_ptr get_device_id() const { return nullptr; } - virtual void clone(std::shared_ptr entry, - common::PageArena *pa) {} + virtual std::shared_ptr clone(common::PageArena *pa) = 0; #ifndef NDEBUG virtual void print(std::ostream &os) const {} friend std::ostream &operator<<(std::ostream &os, @@ -801,10 +800,8 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { std::shared_ptr get_device_id() const override { return device_id_; } - void clone(std::shared_ptr entry, - common::PageArena *pa) override { - offset_ = entry->get_offset(); - device_id_ = entry->get_device_id(); + std::shared_ptr clone(common::PageArena *pa) override { + return std::make_shared(device_id_, offset_); } #ifndef NDEBUG void print(std::ostream &os) const override { @@ -862,10 +859,8 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { std::shared_ptr get_device_id() const override { return nullptr; } - void clone(std::shared_ptr entry, - common::PageArena *pa) override { - offset_ = entry->get_offset(); - name_.dup_from(entry->get_name(), *pa); + std::shared_ptr clone(common::PageArena *pa) override { + return std::make_shared(name_, offset_, *pa); } #ifndef NDEBUG void print(std::ostream &os) const override { @@ -915,7 +910,7 @@ struct MetaIndexNode { int binary_search_children(std::shared_ptr key, bool exact_search, - IMetaIndexEntry &ret_index_entry, + std::shared_ptr &ret_index_entry, int64_t &ret_end_offset); int serialize_to(common::ByteStream &out) { diff --git a/cpp/src/encoding/gorilla_decoder.h b/cpp/src/encoding/gorilla_decoder.h index 5b241de4f..f374b32eb 100644 --- a/cpp/src/encoding/gorilla_decoder.h +++ b/cpp/src/encoding/gorilla_decoder.h @@ -44,7 +44,7 @@ class GorillaDecoder : public Decoder { stored_trailing_zeros_ = 0; bits_left_ = 0; first_value_was_read_ = false; - has_next_ = true; + has_next_ = false; buffer_ = 0; } diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 11113fc0f..a25669594 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -93,7 +93,7 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( PageArena &pa) { int ret = E_OK; load_tsfile_meta_if_necessary(); - DeviceMetaIndexEntry meta_index_entry; + std::shared_ptr meta_index_entry; int64_t end_offset; std::vector, int64_t> > meta_index_entry_list; @@ -101,7 +101,7 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( load_device_index_entry(std::make_shared(device_id), meta_index_entry, end_offset))) { } else if (RET_FAIL(load_all_measurement_index_entry( - meta_index_entry.offset_, end_offset, pa, + meta_index_entry->get_offset(), end_offset, pa, meta_index_entry_list))) { } else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa, timeseries_indexs))) { @@ -215,20 +215,20 @@ int TsFileIOReader::load_timeseries_index_for_ssi( std::shared_ptr device_id, const std::string &measurement_name, TsFileSeriesScanIterator *&ssi) { int ret = E_OK; - DeviceMetaIndexEntry device_index_entry; + std::shared_ptr device_index_entry; int64_t device_ie_end_offset = 0; - MeasurementMetaIndexEntry measurement_index_entry; + std::shared_ptr measurement_index_entry; int64_t measurement_ie_end_offset = 0; // bool is_aligned = false; if (RET_FAIL(load_device_index_entry( std::make_shared(device_id), device_index_entry, device_ie_end_offset))) { } else if (RET_FAIL(load_measurement_index_entry( - measurement_name, device_index_entry.offset_, + measurement_name, device_index_entry->get_offset(), device_ie_end_offset, measurement_index_entry, measurement_ie_end_offset))) { } else if (RET_FAIL(do_load_timeseries_index( - measurement_name, measurement_index_entry.offset_, + measurement_name, measurement_index_entry->get_offset(), measurement_ie_end_offset, ssi->timeseries_index_pa_, ssi->itimeseries_index_))) { } else { @@ -249,7 +249,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( int TsFileIOReader::load_device_index_entry( std::shared_ptr device_name, - IMetaIndexEntry &device_index_entry, int64_t &end_offset) { + std::shared_ptr &device_index_entry, int64_t &end_offset) { int ret = E_OK; std::shared_ptr device_id_comparable = std::dynamic_pointer_cast(device_name); @@ -281,7 +281,7 @@ int TsFileIOReader::load_device_index_entry( int TsFileIOReader::load_measurement_index_entry( const std::string &measurement_name_str, int64_t start_offset, - int64_t end_offset, IMetaIndexEntry &ret_measurement_index_entry, + int64_t end_offset, std::shared_ptr &ret_measurement_index_entry, int64_t &ret_end_offset) { #if DEBUG_SE std::cout << "load_measurement_index_entry: measurement_name_str=" @@ -380,7 +380,8 @@ int TsFileIOReader::load_all_measurement_index_entry( int TsFileIOReader::read_device_meta_index(int32_t start_offset, int32_t end_offset, common::PageArena &pa, - MetaIndexNode *&device_meta_index) { + MetaIndexNode *&device_meta_index, + bool leaf) { int ret = E_OK; ASSERT(start_offset < end_offset); const int32_t read_size = (int32_t)(end_offset - start_offset); @@ -393,9 +394,8 @@ int TsFileIOReader::read_device_meta_index(int32_t start_offset, device_meta_index = new (m_idx_node_buf) MetaIndexNode(&pa); if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, ret_read_len))) { - } else if (RET_FAIL( - device_meta_index->deserialize_from(data_buf, read_size))) { } + ret = device_meta_index->device_deserialize_from(data_buf, read_size); return ret; } @@ -404,9 +404,9 @@ int TsFileIOReader::get_timeseries_indexes(std::shared_ptr device_id, std::vector ×eries_indexs, common::PageArena &pa) { int ret = E_OK; - DeviceMetaIndexEntry device_index_entry; + std::shared_ptr device_index_entry; int64_t device_ie_end_offset = 0; - MeasurementMetaIndexEntry measurement_index_entry; + std::shared_ptr measurement_index_entry; int64_t measurement_ie_end_offset = 0; if (RET_FAIL(load_device_index_entry( std::make_shared(device_id), device_index_entry, @@ -416,11 +416,11 @@ int TsFileIOReader::get_timeseries_indexes(std::shared_ptr device_id, int64_t idx = 0; for (const auto &measurement_name : measurement_names) { if (RET_FAIL(load_measurement_index_entry( - measurement_name, device_index_entry.offset_, + measurement_name, device_index_entry->get_offset(), device_ie_end_offset, measurement_index_entry, measurement_ie_end_offset))) { } else if (RET_FAIL(do_load_timeseries_index( - measurement_name, measurement_index_entry.offset_, + measurement_name, measurement_index_entry->get_offset(), measurement_ie_end_offset, pa, timeseries_indexs[idx++]))) { } @@ -435,7 +435,7 @@ int TsFileIOReader::get_timeseries_indexes(std::shared_ptr device_id, int TsFileIOReader::search_from_leaf_node( std::shared_ptr target_name, std::shared_ptr index_node, - IMetaIndexEntry &ret_index_entry, int64_t &ret_end_offset) { + std::shared_ptr &ret_index_entry, int64_t &ret_end_offset) { int ret = E_OK; ret = index_node->binary_search_children(target_name, true, ret_index_entry, ret_end_offset); @@ -444,15 +444,14 @@ int TsFileIOReader::search_from_leaf_node( int TsFileIOReader::search_from_internal_node( std::shared_ptr target_name, - std::shared_ptr index_node, IMetaIndexEntry &ret_index_entry, + std::shared_ptr index_node, std::shared_ptr &ret_index_entry, int64_t &ret_end_offset) { int ret = E_OK; - IMetaIndexEntry index_entry; + std::shared_ptr index_entry; int64_t end_offset = 0; ASSERT(index_node->node_type_ == INTERNAL_MEASUREMENT || index_node->node_type_ == INTERNAL_DEVICE); - if (RET_FAIL(index_node->binary_search_children( target_name, /*exact=*/false, index_entry, end_offset))) { return ret; @@ -460,7 +459,7 @@ int TsFileIOReader::search_from_internal_node( while (IS_SUCC(ret)) { // reader next level index node - const int read_size = end_offset - index_entry.get_offset(); + const int read_size = end_offset - index_entry->get_offset(); #if DEBUG_SE std::cout << "search_from_internal_node, end_offset=" << end_offset << ", index_entry.offset_=" << index_entry.get_offset() @@ -476,11 +475,11 @@ int TsFileIOReader::search_from_internal_node( MetaIndexNode *cur_level_index_node = new(buf) MetaIndexNode(&cur_level_index_node_pa); int32_t ret_read_len = 0; - if (RET_FAIL(read_file_->read(index_entry.get_offset(), data_buf, read_size, + if (RET_FAIL(read_file_->read(index_entry->get_offset(), data_buf, read_size, ret_read_len))) { } else if (read_size != ret_read_len) { ret = E_TSFILE_CORRUPTED; - } else if (RET_FAIL(cur_level_index_node->deserialize_from( + } else if (RET_FAIL(cur_level_index_node->device_deserialize_from( data_buf, read_size))) { } else { if (cur_level_index_node->node_type_ == LEAF_DEVICE) { diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index d32f690fa..bd4a293a7 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -77,7 +77,8 @@ class TsFileIOReader { std::vector &chunk_meta_list); int read_device_meta_index(int32_t start_offset, int32_t end_offset, common::PageArena &pa, - MetaIndexNode *&device_meta_index); + MetaIndexNode *&device_meta_index, + bool leaf); int get_timeseries_indexes( std::shared_ptr device_id, const std::unordered_set &measurement_names, @@ -92,12 +93,12 @@ class TsFileIOReader { int load_tsfile_meta_if_necessary(); int load_device_index_entry(std::shared_ptr target_name, - IMetaIndexEntry &device_index_entry, + std::shared_ptr &device_index_entry, int64_t &end_offset); int load_measurement_index_entry( const std::string &measurement_name, int64_t start_offset, - int64_t end_offset, IMetaIndexEntry &ret_measurement_index_entry, + int64_t end_offset, std::shared_ptr &ret_measurement_index_entry, int64_t &ret_end_offset); int load_all_measurement_index_entry( @@ -122,12 +123,12 @@ class TsFileIOReader { int search_from_leaf_node(std::shared_ptr target_name, std::shared_ptr index_node, - IMetaIndexEntry &ret_index_entry, + std::shared_ptr &ret_index_entry, int64_t &ret_end_offset); int search_from_internal_node(std::shared_ptr target_name, std::shared_ptr index_node, - IMetaIndexEntry &ret_index_entry, + std::shared_ptr &ret_index_entry, int64_t &ret_end_offset); bool filter_stasify(ITimeseriesIndex *ts_index, Filter *time_filter); diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index 90bcdc10d..d4512263d 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -78,7 +78,7 @@ int DeviceMetaIterator::load_leaf_device(MetaIndexNode* meta_index_node) { : meta_index_node->end_offset_; MetaIndexNode* child_node = nullptr; if (RET_FAIL(io_reader_->read_device_meta_index( - start_offset, end_offset, pa_, child_node))) { + start_offset, end_offset, pa_, child_node, false))) { return ret; } else { result_cache_.push( @@ -101,7 +101,7 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode* meta_index_node) { MetaIndexNode* child_node = nullptr; if (RET_FAIL(io_reader_->read_device_meta_index( - start_offset, end_offset, pa_, child_node))) { + start_offset, end_offset, pa_, child_node, false))) { return ret; } else { meta_index_nodes_.push(child_node); diff --git a/cpp/src/writer/time_chunk_writer.cc b/cpp/src/writer/time_chunk_writer.cc index 892c0d1c1..35684f596 100644 --- a/cpp/src/writer/time_chunk_writer.cc +++ b/cpp/src/writer/time_chunk_writer.cc @@ -100,6 +100,9 @@ int TimeChunkWriter::seal_cur_page(bool end_chunk) { time_page_writer_.destroy_page_data(); time_page_writer_.reset(); } else { + if (first_page_statistic_ == nullptr) { + std::cout<<"error"< ret_entry = std::make_shared< - MeasurementMetaIndexEntry>(); - ret_entry->init(ret_entry_name, 0, arena_); + std::shared_ptr ret_entry = std::make_shared< + MeasurementMetaIndexEntry>(ret_entry_name, 0, arena_); int64_t ret_offset = 0; int result = node_.binary_search_children( std::make_shared("banana"), - true, *ret_entry, ret_offset); + true, ret_entry, ret_offset); ASSERT_EQ(result, 0); ASSERT_EQ(ret_offset, 30); } TEST_F(MetaIndexNodeSearchTest, ExactSearchNotFound) { const std::string ret_entry_name(""); - std::shared_ptr ret_entry = std::make_shared< - MeasurementMetaIndexEntry>(); - ret_entry->init(ret_entry_name, 0, arena_); + std::shared_ptr ret_entry = std::make_shared< + MeasurementMetaIndexEntry>(ret_entry_name, 0, arena_); int64_t ret_offset = 0; char search_name[] = "grape"; int result = node_.binary_search_children( std::make_shared(search_name), - true, *ret_entry, ret_offset); + true, ret_entry, ret_offset); ASSERT_EQ(result, common::E_NOT_EXIST); } TEST_F(MetaIndexNodeSearchTest, NonExactSearchFound) { const std::string ret_entry_name(""); - std::shared_ptr ret_entry = std::make_shared< - MeasurementMetaIndexEntry>(); - ret_entry->init(ret_entry_name, 0, arena_); + std::shared_ptr ret_entry = std::make_shared< + MeasurementMetaIndexEntry>(ret_entry_name, 0, arena_); int64_t ret_offset = 0; char search_name[] = "blueberry"; int result = node_.binary_search_children( std::make_shared(search_name), - false, *ret_entry, ret_offset); + false, ret_entry, ret_offset); ASSERT_EQ(result, 0); ASSERT_EQ(ret_offset, 30); } TEST_F(MetaIndexNodeSearchTest, NonExactSearchNotFound) { const std::string ret_entry_name(""); - std::shared_ptr ret_entry = std::make_shared< - MeasurementMetaIndexEntry>(); - ret_entry->init(ret_entry_name, 0, arena_); + std::shared_ptr ret_entry = std::make_shared< + MeasurementMetaIndexEntry>(ret_entry_name, 0, arena_); int64_t ret_offset = 0; char search_name[] = "aardvark"; int result = node_.binary_search_children( std::make_shared(search_name), - false, *ret_entry, ret_offset); + false, ret_entry, ret_offset); ASSERT_EQ(result, common::E_NOT_EXIST); } diff --git a/java/bench_mark/pom.xml b/java/bench_mark/pom.xml new file mode 100644 index 000000000..c193d4d78 --- /dev/null +++ b/java/bench_mark/pom.xml @@ -0,0 +1,80 @@ + + + + 4.0.0 + + org.apache.tsfile + tsfile-java + 2.1.0-SNAPSHOT + + bench_mark + TsFile: Java: BenchMark + + + ch.qos.logback + logback-classic + + + org.apache.tsfile + tsfile + 2.1.0-SNAPSHOT + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + org.apache.maven.plugins + maven-dependency-plugin + + + check-dependencies + + analyze-only + + verify + + true + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + true + + + + + + diff --git a/java/bench_mark/src/main/java/org/apache/tsfile/BenchMark.java b/java/bench_mark/src/main/java/org/apache/tsfile/BenchMark.java new file mode 100644 index 000000000..5f8047f6e --- /dev/null +++ b/java/bench_mark/src/main/java/org/apache/tsfile/BenchMark.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +public class BenchMark { + private static final Logger LOGGER = LoggerFactory.getLogger(BenchMark.class); + + public static void main(String[] args) + throws IOException, ReadProcessException, NoTableException, NoMeasurementException { + BenchMarkConf.printConfig(); + MemoryMonitor monitor = new MemoryMonitor(); + String path = "/tmp/tsfile_table_write_bench_mark.tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + Files.delete(f.toPath()); + } + monitor.recordMemoryUsage(); + List column_names = new ArrayList<>(); + List column_types = new ArrayList<>(); + List columnSchemas = new ArrayList<>(); + columnSchemas.add( + new ColumnSchemaBuilder() + .name("TAG1") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.TAG) + .build()); + + columnSchemas.add( + new ColumnSchemaBuilder() + .name("TAG2") + .dataType(TSDataType.STRING) + .category(Tablet.ColumnCategory.TAG) + .build()); + column_names.add("TAG1"); + column_names.add("TAG2"); + column_types.add(TSDataType.STRING); + column_types.add(TSDataType.STRING); + + int fieldIndex = 2; + for (int i = 0; i < BenchMarkConf.FIELD_TYPE_VECTOR.size(); i++) { + int count = BenchMarkConf.FIELD_TYPE_VECTOR.get(i); + TSDataType dataType = BenchMarkConf.getTsDataType(i); + for (int j = 0; j < count; j++) { + columnSchemas.add( + new ColumnSchemaBuilder() + .name("FIELD" + fieldIndex) + .dataType(dataType) + .category(Tablet.ColumnCategory.FIELD) + .build()); + column_names.add("FIELD" + fieldIndex); + column_types.add(dataType); + fieldIndex++; + } + } + monitor.recordMemoryUsage(); + long totalPrepareTimeNs = 0; + long totalWriteTimeNs = 0; + long start = System.nanoTime(); + TableSchema tableSchema = new TableSchema("TestTable", columnSchemas); + monitor.recordMemoryUsage(); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(f).tableSchema(tableSchema).build()) { + monitor.recordMemoryUsage(); + long timestamp = 0; + for (int table_ind = 0; table_ind < BenchMarkConf.TABLET_NUM; table_ind++) { + long prepareStartTime = System.nanoTime(); + Tablet tablet = + new Tablet( + column_names, + column_types, + BenchMarkConf.TAG1_NUM * BenchMarkConf.TAG2_NUM * BenchMarkConf.TIMESTAMP_PER_TAG); + int row_count = 0; + for (int tag1_ind = 0; tag1_ind < BenchMarkConf.TAG1_NUM; tag1_ind++) { + for (int tag2_ind = 0; tag2_ind < BenchMarkConf.TAG2_NUM; tag2_ind++) { + for (int row = 0; row < BenchMarkConf.TIMESTAMP_PER_TAG; row++) { + tablet.addTimestamp(row_count, timestamp + row); + tablet.addValue(row_count, 0, "tag1_" + tag1_ind); + tablet.addValue(row_count, 1, "tag2_" + tag2_ind); + + for (int i = 2; i < column_types.size(); i++) { + switch (column_types.get(i)) { + case INT32: + tablet.addValue(row_count, i, (int) timestamp); + break; + case INT64: + tablet.addValue(row_count, i, timestamp); + break; + case FLOAT: + tablet.addValue(row_count, i, (float) (timestamp * 1.1)); + break; + case DOUBLE: + tablet.addValue(row_count, i, (double) timestamp * 1.1); + break; + case BOOLEAN: + tablet.addValue(row_count, i, timestamp % 2 == 0); + default: + // + } + } + row_count++; + } + } + } + monitor.recordMemoryUsage(); + long prepareEndTime = System.nanoTime(); + + totalPrepareTimeNs += (prepareEndTime - prepareStartTime); + long writeStartTime = System.nanoTime(); + writer.write(tablet); + monitor.recordMemoryUsage(); + long writeEndTime = System.nanoTime(); + totalWriteTimeNs += (writeEndTime - writeStartTime); + timestamp += BenchMarkConf.TIMESTAMP_PER_TAG; + } + } catch (WriteProcessException e) { + LOGGER.error("meet error in TsFileWrite ", e); + } + monitor.recordMemoryUsage(); + long end = System.nanoTime(); + double totalPrepareTimeSec = totalPrepareTimeNs / 1_000_000_000.0; + double totalWriteTimeSec = totalWriteTimeNs / 1_000_000_000.0; + double totalTimeSec = (end - start) / 1_000_000_000.0; + + long size = f.length(); + + monitor.close(); + System.out.println("===================="); + System.out.println("finish bench mark for java"); + System.out.printf("tsfile size is %d bytes ~ %dKB%n", size, size / 1024); + + System.out.printf("prepare data time is %.6f s%n", totalPrepareTimeSec); + System.out.printf("writing data time is %.6f s%n", totalWriteTimeSec); + + long totalPoints = + (long) BenchMarkConf.TABLET_NUM + * BenchMarkConf.TAG1_NUM + * BenchMarkConf.TAG2_NUM + * BenchMarkConf.TIMESTAMP_PER_TAG + * column_names.size(); + double writingSpeed = totalPoints / totalTimeSec; + System.out.printf("writing speed is %d points/s%n", (long) writingSpeed); + + System.out.printf("total time is %.6f s%n", totalTimeSec); + System.out.println("===================="); + + Integer row = 0; + long read_start = System.nanoTime(); + try (ITsFileReader reader = new TsFileReaderBuilder().file(f).build()) { + ResultSet resultSet = reader.query("TestTable", column_names, Long.MIN_VALUE, Long.MAX_VALUE); + while (resultSet.next()) { + row++; + } + } + System.out.println("row is " + row); + long read_end = System.nanoTime(); + double totalReadTimeSec = (read_end - read_start) / 1_000_000_000.0; + System.out.printf("read time is %.6f s%n", totalReadTimeSec); + double readSpeed = row * column_names.size() / totalReadTimeSec; + System.out.printf("read speed is %.6f points/s %n", readSpeed); + } +} diff --git a/java/bench_mark/src/main/java/org/apache/tsfile/BenchMarkConf.java b/java/bench_mark/src/main/java/org/apache/tsfile/BenchMarkConf.java new file mode 100644 index 000000000..9ba26555a --- /dev/null +++ b/java/bench_mark/src/main/java/org/apache/tsfile/BenchMarkConf.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile; + +import org.apache.tsfile.enums.TSDataType; + +import java.util.Arrays; +import java.util.List; + +public class BenchMarkConf { + public static final int TABLET_NUM = 1000; + public static final int TAG1_NUM = 1; + public static final int TAG2_NUM = 10; + public static final int TIMESTAMP_PER_TAG = 1000; + public static final List FIELD_TYPE_VECTOR = Arrays.asList(1, 1, 1, 1, 1); + + public static TSDataType getTsDataType(int index) { + switch (index) { + case 0: + return TSDataType.INT32; + case 1: + return TSDataType.INT64; + case 2: + return TSDataType.FLOAT; + case 3: + return TSDataType.DOUBLE; + case 4: + return TSDataType.BOOLEAN; + } + return TSDataType.UNKNOWN; + } + + public static final List DATA_TYPES_NAME = + Arrays.asList("INT32", "INT64", "FLOAT", "DOUBLE", "BOOLEAN"); + + public static void printConfig() { + int columnNum = 0; + for (int count : FIELD_TYPE_VECTOR) { + columnNum += count; + } + + System.out.println("TsFile benchmark For Java"); + System.out.println("Schema Configuration:"); + System.out.println("Tag Column num: " + 2); + System.out.printf( + "TAG1 num: %d TAG2 num: %d%n%n", BenchMarkConf.TAG1_NUM, BenchMarkConf.TAG2_NUM); + + System.out.println("Field Column and types: "); + for (int i = 0; i < 5; i++) { + System.out.printf("%sx%d ", DATA_TYPES_NAME.get(i), BenchMarkConf.FIELD_TYPE_VECTOR.get(i)); + } + + System.out.printf("%nTablet num: %d%n", BenchMarkConf.TABLET_NUM); + System.out.printf("Tablet row num per tag: %d%n", BenchMarkConf.TIMESTAMP_PER_TAG); + + long totalPoints = + (long) BenchMarkConf.TABLET_NUM + * BenchMarkConf.TAG1_NUM + * BenchMarkConf.TAG2_NUM + * BenchMarkConf.TIMESTAMP_PER_TAG + * columnNum; + System.out.println("Total points is " + totalPoints); + System.out.println("======"); + } +} diff --git a/java/bench_mark/src/main/java/org/apache/tsfile/MemoryMonitor.java b/java/bench_mark/src/main/java/org/apache/tsfile/MemoryMonitor.java new file mode 100644 index 000000000..8829c1a5f --- /dev/null +++ b/java/bench_mark/src/main/java/org/apache/tsfile/MemoryMonitor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile; + +import oshi.SystemInfo; +import oshi.hardware.GlobalMemory; +import oshi.software.os.OSProcess; +import oshi.software.os.OperatingSystem; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +public class MemoryMonitor { + private static final String CSV_HEADER = "iter,memory_usage(kb)\n"; + private static final String CSV_FILE = "/tmp/memory_usage_java.csv"; + private BufferedWriter writer; + private int iter = 0; + + public MemoryMonitor() throws IOException { + writer = + Files.newBufferedWriter( + Paths.get(CSV_FILE), StandardOpenOption.CREATE, StandardOpenOption.APPEND); + writer.write(CSV_HEADER); + } + + public void recordMemoryUsage() throws IOException { + long memory = get_memory_usage(); + String line = String.format("%d,%d\n", iter++, memory); + writer.write(line); + writer.flush(); + } + + public long get_memory_usage() { + SystemInfo si = new SystemInfo(); + OperatingSystem os = si.getOperatingSystem(); + GlobalMemory memory = si.getHardware().getMemory(); + OSProcess currentProcess = os.getProcess(os.getProcessId()); + long residentSetSize = currentProcess.getResidentSetSize(); + return residentSetSize / 1024; + } + + public void close() { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/java/pom.xml b/java/pom.xml index 8c6be19be..87435a663 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -35,6 +35,7 @@ tsfile examples tools + bench_mark diff --git a/python/bench_mark/bench_mark.py b/python/bench_mark/bench_mark.py new file mode 100644 index 000000000..1fc02e951 --- /dev/null +++ b/python/bench_mark/bench_mark.py @@ -0,0 +1,185 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +from time import perf_counter + +from tqdm import tqdm +import psutil +import csv +from tsfile import TSDataType, ColumnCategory +from tsfile import TableSchema, ColumnSchema +from tsfile import Tablet +from tsfile import TsFileTableWriter, TsFileReader + + +bench_mark_conf = { + "tablet_num": 1000, + "tag1_num": 1, + "tag2_num": 10, + "timestamp_per_tag": 1000, + "field_type_vector": [1, 1, 1, 1, 1], +} + +type_list = [TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.BOOLEAN] + + +def print_config(): + data_types_name = ["INT64", "INT32", "FLOAT", "DOUBLE", "BOOLEAN"] + print("=====================") + print("TsFile benchmark For Python") + print("Schema Configuration:") + print(f"Tag Column num: {2}") + print(f"TAG1 num: {bench_mark_conf['tag1_num']} TAG2 num: {bench_mark_conf['tag2_num']}\n") + + print("Filed Column and types: ") + column_num = 0 + for i in range(5): + print(f"{data_types_name[i]}x{bench_mark_conf['field_type_vector'][i]} ", end="") + column_num += bench_mark_conf['field_type_vector'][i] + + print("\n") + print(f"Tablet num: {bench_mark_conf['tablet_num']}") + print(f"Tablet row num per tag: {bench_mark_conf['timestamp_per_tag']}") + + total_points = (bench_mark_conf['tablet_num'] * + bench_mark_conf['tag1_num'] * + bench_mark_conf['tag2_num'] * + bench_mark_conf['timestamp_per_tag'] * + column_num) + print(f"Total points is {total_points}") + print("======") + +column_name = [] + +def get_memory_usage_kb(): + process = psutil.Process(os.getpid()) + return process.memory_info().rss // 1024 + +def bench_mark_write(): + csv_file = "memory_usage_python.csv" + + csv_writer = csv.writer(open(csv_file, "w")) + csv_writer.writerow(["iter_num", "memory_usage(kb)"]) + iter_num = 0 + print_config() + column_schema_list = [] + column_datat_type = [] + column_schema_list.append(ColumnSchema("TAG1", TSDataType.STRING, ColumnCategory.TAG)) + column_name.append("TAG1") + column_datat_type.append(TSDataType.STRING) + column_schema_list.append(ColumnSchema("TAG2", TSDataType.STRING, ColumnCategory.TAG)) + column_name.append("TAG2") + column_datat_type.append(TSDataType.STRING) + + + i = 2 + for count, type in zip(bench_mark_conf["field_type_vector"], type_list): + for _ in range(count): + column_schema_list.append(ColumnSchema("FIELD" + str(i), type, ColumnCategory.FIELD)) + column_name.append("FIELD" + str(i)) + column_datat_type.append(type) + i = i + 1 + + timestamp = 0 + table_schema = TableSchema("TestTable", column_schema_list) + start = perf_counter() + prepare_time = 0 + writing_time = 0 + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + with TsFileTableWriter("tsfile_table_write_bench_mark.tsfile", table_schema) as writer: + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + for i in tqdm(range(bench_mark_conf["tablet_num"]), desc="Tablets"): + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + row_num = 0 + prepare_start = perf_counter() + tablet = Tablet(column_name, column_datat_type, + bench_mark_conf["timestamp_per_tag"] * bench_mark_conf["tag1_num"] * + bench_mark_conf["tag2_num"]) + + for j in range(bench_mark_conf["tag1_num"]): + for k in range(bench_mark_conf["tag2_num"]): + for row in range(bench_mark_conf["timestamp_per_tag"]): + tablet.add_timestamp(row_num, timestamp + row) + tablet.add_value_by_index(0, row_num, "tag1_" + str(j)) + tablet.add_value_by_index(1, row_num, "tag2_" + str(k)) + for col in range(2, len(column_name)): + if column_datat_type[col] == TSDataType.INT32: + tablet.add_value_by_index(col, row_num, timestamp) + elif column_datat_type[col] == TSDataType.INT64: + tablet.add_value_by_index(col, row_num, timestamp) + elif column_datat_type[col] == TSDataType.FLOAT: + tablet.add_value_by_index(col, row_num, timestamp * 1.1) + elif column_datat_type[col] == TSDataType.DOUBLE: + tablet.add_value_by_index(col, row_num, timestamp * 1.1) + elif column_datat_type[col] == TSDataType.BOOLEAN: + tablet.add_value_by_index(col, row_num, timestamp % 2 == 0) + row_num = row_num + 1 + + prepare_time += perf_counter() - prepare_start + write_start = perf_counter() + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + writer.write_table(tablet) + writing_time += perf_counter() - write_start + timestamp = timestamp + bench_mark_conf["timestamp_per_tag"] + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + + csv_writer.writerow([iter_num, get_memory_usage_kb()]) + iter_num += 1 + end = perf_counter() + total_time = end - start + size = os.path.getsize("tsfile_table_write_bench_mark.tsfile") + + total_points = bench_mark_conf["tablet_num"] * bench_mark_conf["tag1_num"] * bench_mark_conf["tag2_num"] * \ + bench_mark_conf["timestamp_per_tag"] * len(column_name) + + print("finish bench mark for python") + print(f"tsfile size is {size} bytes ~ {size // 1024}KB") + + print(f"prepare data time is {prepare_time:.6f} s") + print(f"writing data time is {writing_time:.6f} s") + print(f" total_time is {total_time} s") + writing_speed = int(total_points / (prepare_time + writing_time)) + print(f"writing speed is {writing_speed} points/s") + + total_time_seconds = (end - start) + print(f"total time is {total_time_seconds:.6f} s") + +def bench_mark_read(): + start = perf_counter() + row = 0 + with TsFileReader("tsfile_table_write_bench_mark.tsfile") as reader: + result = reader.query_table("TestTable", column_name) + first = True + while result.next(): + row = row + 1 + + end = perf_counter() + total_time = end - start + reading_speed = int(row * len(column_name) / total_time) + print("total row is ", row) + print(f"reading data time is {total_time} s") + print(f"reading data speed is {reading_speed} points/s") + + +bench_mark_write() +bench_mark_read() diff --git a/python/requirements.txt b/python/requirements.txt index 9e6c929ec..ea2241724 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -22,4 +22,5 @@ numpy==1.26.4 pandas==2.2.2 setuptools==70.0.0 wheel==0.45.1 - +tqdm +psutil diff --git a/python/setup.py b/python/setup.py index 6edeea0b9..4f4e0e4f8 100644 --- a/python/setup.py +++ b/python/setup.py @@ -16,17 +16,19 @@ # under the License. # -from setuptools import setup, Extension -from setuptools.command.build_ext import build_ext -from Cython.Build import cythonize -import numpy as np +import os import platform import shutil -import os -version = "2.1.0.dev0" +import numpy as np +from Cython.Build import cythonize +from setuptools import setup, Extension +from setuptools.command.build_ext import build_ext + +version = "2.1.0.dev" system = platform.system() + def copy_tsfile_lib(source_dir, target_dir, suffix): lib_file_name = f"libtsfile.{suffix}" source = os.path.join(source_dir, lib_file_name) @@ -51,6 +53,7 @@ def copy_tsfile_header(source, target): if os.path.exists(source): shutil.copyfile(source, target) + project_dir = os.path.dirname(os.path.abspath(__file__)) ## Copy C wrapper header. @@ -72,8 +75,7 @@ def copy_tsfile_header(source, target): else: copy_tsfile_lib(tsfile_shared_source_dir, tsfile_shared_dir, "dll") -tsfile_include_dir=os.path.join(project_dir, "tsfile") - +tsfile_include_dir = os.path.join(project_dir, "tsfile") ext_modules_tsfile = [ # utils: from python to c or c to python. @@ -141,7 +143,10 @@ def finalize_options(self): author='"Apache TsFile"', packages=["tsfile"], license="Apache 2.0", - ext_modules=cythonize(ext_modules_tsfile), + ext_modules=cythonize( + ext_modules_tsfile + ), + cmdclass={"build_ext": BuildExt}, include_dirs=[np.get_include()], package_dir={"tsfile": "./tsfile"}, diff --git a/python/tsfile/tablet.py b/python/tsfile/tablet.py index 2935db09d..52e7389c4 100644 --- a/python/tsfile/tablet.py +++ b/python/tsfile/tablet.py @@ -137,7 +137,8 @@ def add_value_by_index(self, col_index: int, row_index: int, value: Union[int, f if not isinstance(value, expected_type.to_py_type()): raise TypeError(f"Expected {expected_type.to_py_type()} got {type(value)}") - self._check_numeric_range(value, expected_type) + if expected_type in (TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE): + self._check_numeric_range(value, expected_type) self.data_list[col_index][row_index] = value