From 27ca2d275c846f2353ba40be9b6e24151933908b Mon Sep 17 00:00:00 2001 From: hemant Date: Sat, 14 Feb 2026 00:21:17 +0530 Subject: [PATCH 1/2] feat(connectors): add ClickHouse sink connector #2539 Implements a ClickHouse sink connector for iggy supporting JSONEachRow and RowBinary insert formats with full auth, TLS/mTLS, batching, and retry. - Add ClickHouseSinkConfig with accessors and sane defaults (compression, batch size, retry, chunk size, metadata, verbose logging) - Add ClickHouseClient::init with custom hyper+rustls connector for mTLS and custom CA support; falls back to default transport for public CAs - Add ClickHouseInserter using buffered JSONEachRow writes - Add GenericInserter trait abstracting typed Inserter and ClickHouseInserter for polymorphic insert dispatch - Add Sink impl with batch_insert/try_insert split, exponential backoff retry, field mapping, nested JSON path extraction, and iggy_ prefixed metadata injection - Add integration tests via testcontainers (ClickHouse 25.3-alpine) covering JSON, JSON+metadata, JSON+field-mappings, RowBinary, and RowBinary+metadata - Add README with config reference, schema examples, auth/TLS setup, performance tuning tips, and RowBinary chunk size caveat --- Cargo.lock | 148 +++- Cargo.toml | 4 +- .../sinks/clickhouse_sink/Cargo.toml | 53 ++ .../sinks/clickhouse_sink/README.md | 442 ++++++++++ .../sinks/clickhouse_sink/config.toml | 40 + .../sinks/clickhouse_sink/config.toml.sample | 63 ++ .../field_mappings/config.toml | 45 + .../clickhouse_sink/src/clickhouse_client.rs | 203 +++++ .../src/clickhouse_inserter.rs | 139 ++++ .../clickhouse_sink/src/generic_inserter.rs | 205 +++++ .../sinks/clickhouse_sink/src/lib.rs | 460 +++++++++++ .../sinks/clickhouse_sink/src/sink.rs | 782 ++++++++++++++++++ core/integration/Cargo.toml | 1 + .../connectors/clickhouse/clickhouse_sink.rs | 361 ++++++++ .../tests/connectors/clickhouse/mod.rs | 22 + .../tests/connectors/clickhouse/sink.toml | 20 + .../clickhouse/sink_field_mappings.toml | 20 + .../fixtures/clickhouse/container.rs | 229 +++++ .../clickhouse/image_configs/config.xml | 35 + .../clickhouse/image_configs/default-user.xml | 11 + .../image_configs/docker_related_config.xml | 6 + .../clickhouse/image_configs/users.xml | 33 + .../connectors/fixtures/clickhouse/mod.rs | 28 + .../connectors/fixtures/clickhouse/sink.rs | 260 ++++++ .../tests/connectors/fixtures/mod.rs | 6 + core/integration/tests/connectors/mod.rs | 1 + 26 files changed, 3610 insertions(+), 7 deletions(-) create mode 100644 core/connectors/sinks/clickhouse_sink/Cargo.toml create mode 100644 core/connectors/sinks/clickhouse_sink/README.md create mode 100644 core/connectors/sinks/clickhouse_sink/config.toml create mode 100644 core/connectors/sinks/clickhouse_sink/config.toml.sample create mode 100644 core/connectors/sinks/clickhouse_sink/field_mappings/config.toml create mode 100644 core/connectors/sinks/clickhouse_sink/src/clickhouse_client.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/lib.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/sink.rs create mode 100644 core/integration/tests/connectors/clickhouse/clickhouse_sink.rs create mode 100644 core/integration/tests/connectors/clickhouse/mod.rs create mode 100644 core/integration/tests/connectors/clickhouse/sink.toml create mode 100644 core/integration/tests/connectors/clickhouse/sink_field_mappings.toml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/container.rs create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/mod.rs create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/sink.rs diff --git a/Cargo.lock b/Cargo.lock index 9bd245b86e..8d3294d336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1361,6 +1361,12 @@ dependencies = [ "objc2", ] +[[package]] +name = "bnum" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" + [[package]] name = "bollard" version = "0.19.4" @@ -1593,6 +1599,9 @@ name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "bytestring" @@ -1771,6 +1780,12 @@ dependencies = [ "inout", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + [[package]] name = "clang-sys" version = "1.8.1" @@ -1832,6 +1847,56 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +[[package]] +name = "clickhouse" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d975a05171c6f8a453f60ec6287c0018c90911d5a8a46d9b6abe386ea359fab3" +dependencies = [ + "bnum", + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-macros", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "lz4_flex 0.11.5", + "polonius-the-crab 0.5.0", + "quanta", + "rustls", + "serde", + "thiserror 2.0.18", + "tokio", + "url", +] + +[[package]] +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.115", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "358fbfd439fb0bed02a3e2ecc5131f6a9d039ba5639aed650cf0e845f6ebfc16" +dependencies = [ + "bytes", + "thiserror 2.0.18", +] + [[package]] name = "clock" version = "0.1.0" @@ -4534,6 +4599,17 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "higher-kinded-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776" +dependencies = [ + "macro_rules_attribute 0.2.2", + "never-say-never", + "paste", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -5266,6 +5342,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_clickhouse_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "clickhouse", + "dashmap", + "futures", + "hyper-rustls", + "hyper-util", + "iggy_connector_sdk", + "once_cell", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", + "webpki-roots 1.0.6", +] + [[package]] name = "iggy_connector_elasticsearch_sink" version = "0.3.1-edge.1" @@ -5448,7 +5546,7 @@ dependencies = [ "clap", "futures-util", "iggy", - "lz4_flex", + "lz4_flex 0.12.0", "rand 0.10.0", "serde", "serde_json", @@ -5625,6 +5723,7 @@ dependencies = [ "async-trait", "bon", "bytes", + "clickhouse", "compio", "configs", "configs_derive", @@ -5974,10 +6073,10 @@ checksum = "bc07588c853b50689205fb5c00498aa681d89828e0ce8cbd965ebc7a5d8ae260" dependencies = [ "extension-traits", "lending-iterator-proc_macros", - "macro_rules_attribute", + "macro_rules_attribute 0.1.3", "never-say-never", "nougat", - "polonius-the-crab", + "polonius-the-crab 0.2.1", ] [[package]] @@ -6295,6 +6394,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4_flex" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" + [[package]] name = "lz4_flex" version = "0.12.0" @@ -6310,7 +6415,17 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf0c9b980bf4f3a37fd7b1c066941dd1b1d0152ce6ee6e8fe8c49b9f6810d862" dependencies = [ - "macro_rules_attribute-proc_macro", + "macro_rules_attribute-proc_macro 0.1.3", + "paste", +] + +[[package]] +name = "macro_rules_attribute" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" +dependencies = [ + "macro_rules_attribute-proc_macro 0.2.2", "paste", ] @@ -6320,6 +6435,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58093314a45e00c77d5c508f76e77c3396afbbc0d01506e7fae47b018bac2b1d" +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + [[package]] name = "matchers" version = "0.2.0" @@ -6644,7 +6765,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b57b9ced431322f054fc673f1d3c7fa52d80efd9df74ad2fc759f044742510" dependencies = [ - "macro_rules_attribute", + "macro_rules_attribute 0.1.3", "nougat-proc_macros", ] @@ -7236,7 +7357,7 @@ dependencies = [ "futures", "half", "hashbrown 0.16.1", - "lz4_flex", + "lz4_flex 0.12.0", "num-bigint", "num-integer", "num-traits", @@ -7568,6 +7689,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a69ee997a6282f8462abf1e0d8c38c965e968799e912b3bed8c9e8a28c2f9f" +[[package]] +name = "polonius-the-crab" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "polyval" version = "0.6.2" @@ -8353,9 +8484,11 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.13", "http 1.4.0", "http-body", "http-body-util", @@ -8364,10 +8497,12 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", "percent-encoding", "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", @@ -10208,6 +10343,7 @@ dependencies = [ "memchr", "parse-display", "pin-project-lite", + "reqwest", "serde", "serde_json", "serde_with", diff --git a/Cargo.toml b/Cargo.toml index d2bbfe5670..10b00c8886 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "core/configs_derive", "core/connectors/runtime", "core/connectors/sdk", + "core/connectors/sinks/clickhouse_sink", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/postgres_sink", @@ -93,6 +94,7 @@ charming = "0.6.0" chrono = { version = "0.4.43", features = ["serde"] } clap = { version = "4.5.58", features = ["derive", "wrap_help"] } clap_complete = "4.5.66" +clickhouse = { version = "0.14.2", features = ["rustls-tls", "rustls-tls-native-roots", "inserter"] } clock = { path = "core/clock" } colored = "3.1.1" comfy-table = "7.2.2" @@ -259,7 +261,7 @@ syn = { version = "2", features = ["full", "extra-traits"] } sysinfo = "0.38.1" tempfile = "3.25.0" test-case = "3.3.1" -testcontainers-modules = { version = "0.14.0", features = ["postgres"] } +testcontainers-modules = { version = "0.14.0", features = ["postgres", "clickhouse"] } thiserror = "2.0.18" tokio = { version = "1.49.0", features = ["full"] } tokio-rustls = "0.26.4" diff --git a/core/connectors/sinks/clickhouse_sink/Cargo.toml b/core/connectors/sinks/clickhouse_sink/Cargo.toml new file mode 100644 index 0000000000..9d2a5073c8 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/Cargo.toml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_clickhouse_sink" +version = "0.1.0" +description = "Iggy ClickHouse sink connector for storing stream messages into ClickHouse database" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "clickhouse", "sink"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "futures", "prost"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +clickhouse = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +hyper-rustls = { version = "0.27", features = ["webpki-roots"] } +hyper-util = { version = "0.1", features = ["client", "client-legacy", "http1", "tokio"] } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rustls = { version = "0.23", default-features = false, features = ["std"] } +rustls-pki-types = "1" +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +webpki-roots = { workspace = true } diff --git a/core/connectors/sinks/clickhouse_sink/README.md b/core/connectors/sinks/clickhouse_sink/README.md new file mode 100644 index 0000000000..6c9bccb14c --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/README.md @@ -0,0 +1,442 @@ +# ClickHouse Sink Connector + +The ClickHouse sink connector consumes messages from Iggy topics and stores them in ClickHouse databases. Supports multiple insert formats and flexible payload storage options optimized for high-performance analytics. + +## Features + +- **Multiple Insert Formats**: JSON (JSONEachRow) and RowBinary for different performance needs +- **Flexible Payload Storage**: Store structured JSON data or raw payloads as strings +- **Field Mappings**: Map specific JSON fields to ClickHouse columns +- **Metadata Storage**: Optionally store Iggy message metadata (offset, timestamp, topic, etc.) +- **Batch Processing**: Configurable batching for optimal throughput +- **Authentication Support**: None, username/password credentials, or JWT token +- **TLS/mTLS Support**: Secure connections with optional client certificates +- **Compression**: Optional LZ4 compression for network efficiency +- **Automatic Retries**: Exponential backoff for transient errors + +## Configuration + +```toml +[[streams]] +stream = "user_events" +topics = ["users", "orders"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "clickhouse_sink" + +[plugin_config] +url = "http://localhost:8123" +database = "default" +table = "iggy_messages" +insert_type = "json" +auth_type = "credential" +username = "default" +password = "password" +compression_enabled = true +max_batch_size = 1000 +include_metadata = true +``` + +## Configuration Options + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `url` | string | required | ClickHouse HTTP endpoint (e.g., `http://localhost:8123`) | +| `database` | string | `default` | Target database name | +| `table` | string | required | Target table name | +| `insert_type` | string | `json` | Insert format: `json` or `rowbinary` | +| `auth_type` | string | `none` | Authentication type: `none`, `credential`, or `jwt` | +| `username` | string | - | Username for credential authentication | +| `password` | string | - | Password for credential authentication | +| `jwt_token` | string | - | JWT token for JWT authentication | +| `role` | array | - | Optional roles for ClickHouse Cloud | +| `compression_enabled` | bool | `true` | Enable LZ4 compression | +| `tls_enabled` | bool | `false` | Enable TLS/HTTPS | +| `tls_root_ca_cert` | string | - | Path to CA certificate for server verification | +| `tls_client_cert` | string | - | Path to client certificate for mTLS | +| `tls_client_key` | string | - | Path to client private key for mTLS | +| `max_batch_size` | u32 | `1000` | Maximum messages per batch | +| `chunk_size` | usize | `1048576` | Internal buffer size in bytes (1MB default, JSON format only) | +| `retry` | bool | `true` | Enable automatic retries | +| `max_retry` | u32 | `3` | Maximum retry attempts | +| `base_delay` | u64 | `500` | Base delay in milliseconds between retries | +| `include_metadata` | bool | `false` | Include Iggy metadata columns | +| `field_mappings` | map | - | Map JSON fields to ClickHouse columns | +| `payload_data_type` | string | - | ClickHouse type for payload column (JSON format only) | +| `verbose_logging` | bool | `false` | Log at info level instead of debug | + +## Insert Formats + +The `insert_type` option determines how data is inserted into ClickHouse: + +### JSON (JSONEachRow) + +Uses ClickHouse's JSONEachRow format. Flexible and supports field mappings. Best for structured JSON data. + +```toml +[plugin_config] +insert_type = "json" +``` + +**Advantages:** + +- Automatic field mapping from JSON to columns +- Supports partial column insertion via `field_mappings` +- Human-readable format for debugging +- Dynamic schema flexibility + +**Use when:** + +- Messages are JSON objects +- Need to map specific fields to columns +- Schema may evolve over time + +### RowBinary + +Uses ClickHouse's RowBinary format. More efficient but stores entire payload as a string. + +```toml +[plugin_config] +insert_type = "rowbinary" +``` + +**Advantages:** + +- Higher throughput for large payloads +- Lower CPU overhead +- Compact binary format + +**Use when:** + +- Maximum performance is required +- Messages are binary, protobuf, or arbitrary bytes +- Don't need column-level access to payload fields + +## Table Schema + +### JSON Format without Metadata + +```sql +CREATE TABLE iggy_messages ( + id UInt64, + name String, + count UInt32, + amount Float64, + active Bool, + timestamp Int64 +) ENGINE = MergeTree() ORDER BY id; +``` + +### JSON Format with Metadata + +```sql +CREATE TABLE iggy_messages ( + id UInt64, + name String, + count UInt32, + amount Float64, + active Bool, + timestamp Int64, + iggy_stream String, + iggy_topic String, + iggy_partition_id UInt32, + iggy_id String, + iggy_offset UInt64, + iggy_checksum UInt64, + iggy_timestamp UInt64, + iggy_origin_timestamp UInt64 +) ENGINE = MergeTree() ORDER BY iggy_offset; +``` + +### RowBinary Format without Metadata + +```sql +CREATE TABLE iggy_messages ( + payload String +) ENGINE = MergeTree() ORDER BY tuple(); +``` + +### RowBinary Format with Metadata + +```sql +CREATE TABLE iggy_messages ( + iggy_stream String, + iggy_topic String, + iggy_partition_id UInt32, + iggy_id String, + iggy_offset UInt64, + iggy_checksum UInt64, + iggy_timestamp UInt64, + iggy_origin_timestamp UInt64, + payload String +) ENGINE = MergeTree() ORDER BY iggy_offset; +``` + +## Field Mappings + +When using `insert_type = "json"`, you can map specific JSON fields to ClickHouse columns: + +```toml +[plugin_config] +insert_type = "json" +table = "user_data" + +[plugin_config.field_mappings] +id = "user_id" +name = "user_name" +amount = "total_amount" +``` + +This maps: + +- JSON field `id` → ClickHouse column `user_id` +- JSON field `name` → ClickHouse column `user_name` +- JSON field `amount` → ClickHouse column `total_amount` + +Corresponding table schema: + +```sql +CREATE TABLE user_data ( + user_id UInt64, + user_name String, + total_amount Float64 +) ENGINE = MergeTree() ORDER BY user_id; +``` + +## Authentication + +### No Authentication + +```toml +[plugin_config] +auth_type = "none" +``` + +### Username/Password Credentials + +```toml +[plugin_config] +auth_type = "credential" +username = "default" +password = "secure_password" +``` + +**Note:** Both `username` and `password` must be set when using credential authentication. + +### JWT Token + +```toml +[plugin_config] +auth_type = "jwt" +jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." +``` + +**Caveat:** Do not mix credential and JWT authentication. Setting `jwt_token` while `username` or `password` is set will cause an error. + +### ClickHouse Cloud with Roles + +```toml +[plugin_config] +auth_type = "credential" +username = "cloud_user" +password = "cloud_password" +role = ["role1", "role2"] +``` + +## TLS/mTLS Configuration + +### TLS (Server Verification) + +```toml +[plugin_config] +url = "https://clickhouse.example.com:8443" +tls_enabled = true +tls_root_ca_cert = "/path/to/ca-cert.pem" +``` + +### mTLS (Client Certificates) + +```toml +[plugin_config] +url = "https://clickhouse.example.com:8443" +tls_enabled = true +tls_root_ca_cert = "/path/to/ca-cert.pem" +tls_client_cert = "/path/to/client-cert.pem" +tls_client_key = "/path/to/client-key.pem" +``` + +**Important:** The connector uses `rustls-tls` with webpki bundle. For ClickHouse Cloud or public CAs, no custom CA certificate is needed. For self-signed certificates, provide `tls_root_ca_cert`. + +## Performance + +### Recommended Indexes + +```sql +-- For metadata-enabled tables +CREATE INDEX idx_stream ON iggy_messages (iggy_stream); +CREATE INDEX idx_topic ON iggy_messages (iggy_topic); +CREATE INDEX idx_offset ON iggy_messages (iggy_offset); + +-- For JSON columns (if using payload_data_type) +CREATE INDEX idx_payload ON iggy_messages (payload) TYPE bloom_filter GRANULARITY 1; +``` + +### Tuning Tips + +- **Increase `max_batch_size`** for higher throughput (larger batches = fewer HTTP requests) +- **Enable `compression_enabled`** to reduce network bandwidth (enabled by default) +- **Use `insert_type = "rowbinary"`** for maximum performance with large payloads +- **Set `chunk_size` ≥ `max_batch_size * avg_message_size`** for JSON format to avoid runtime buffer extensions that slow down writes +- **For RowBinary format, keep batches under ~254KB** due to hardcoded chunk size limitation (see caveat below) +- **Use `poll_interval`** to control how often the sink checks for new messages +- **Choose appropriate `ORDER BY`** keys for query patterns (offset for time-series, id for lookups) + +## Example Configs + +### JSON Messages with Field Mappings + +```toml +[[streams]] +stream = "events" +topics = ["user_events"] +schema = "json" +batch_length = 500 +poll_interval = "10ms" +consumer_group = "ch_sink" + +[plugin_config] +url = "http://localhost:8123" +table = "events" +insert_type = "json" +auth_type = "credential" +username = "default" +password = "password" +max_batch_size = 1000 +include_metadata = false + +[plugin_config.field_mappings] +user_id = "id" +event_name = "name" +event_time = "timestamp" +``` + +### RowBinary for High Throughput + +```toml +[[streams]] +stream = "logs" +topics = ["app_logs", "system_logs"] +schema = "raw" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "ch_sink" + +[plugin_config] +url = "http://localhost:8123" +database = "logs" +table = "raw_logs" +insert_type = "rowbinary" +auth_type = "none" +compression_enabled = true +max_batch_size = 5000 +include_metadata = true +``` + +### ClickHouse Cloud with TLS + +```toml +[[streams]] +stream = "analytics" +topics = ["events"] +schema = "json" +batch_length = 100 +poll_interval = "10ms" +consumer_group = "ch_cloud_sink" + +[plugin_config] +url = "https://your-instance.clickhouse.cloud:8443" +database = "production" +table = "events" +insert_type = "json" +auth_type = "credential" +username = "cloud_user" +password = "cloud_password" +tls_enabled = true +compression_enabled = true +max_batch_size = 1000 +include_metadata = true +``` + +## Reliability Features + +### Automatic Retries + +The connector automatically retries transient errors with exponential backoff. The retry delay is calculated as `base_delay * attempt_number` milliseconds. Configure with: + +- `retry` (default: `true`) - Enable/disable retries +- `max_retry` (default: `3`) - Maximum retry attempts +- `base_delay` (default: `500ms`) - Base delay between retries + +Non-transient errors fail immediately without retrying. + +### Compression + +LZ4 compression is enabled by default (`compression_enabled = true`), reducing network bandwidth by 60-90% for text data. Disable for local deployments if network is not a bottleneck. + +### Error Handling + +The sink tracks and logs: + +- Total messages processed +- Insert attempt failures (retryable errors) +- Insert batch failures (permanent errors) + +Check logs for detailed error messages and performance metrics on connector shutdown. + +## Caveats + +### Chunk Size and Batch Size Relationship + +The `chunk_size` parameter controls the internal buffer capacity for the ClickHouse client's insert operations. Understanding its interaction with `max_batch_size` is important for optimal performance and data consistency: + +#### JSON Format (JSONEachRow) + +For JSON insert format, `chunk_size` is configurable (default: 1MB) and sets the initial buffer capacity. + +**Performance Impact:** If the accumulated batch data exceeds `chunk_size` before reaching `max_batch_size`, the internal buffer will be **extended at runtime**. This dynamic reallocation slows down writes due to memory copying overhead. + +**Best Practice:** Set `chunk_size ≥ max_batch_size * average_message_size` to avoid runtime buffer extensions and ensure optimal write performance. + +**Example:** + +```toml +[plugin_config] +max_batch_size = 1000 # 1000 messages per batch +chunk_size = 2097152 # 2MB buffer (for ~2KB avg message size) +``` + +**Calculation:** If average message size is 2KB, set `chunk_size ≥ 1000 * 2KB = 2MB` to prevent buffer extensions. + +#### RowBinary Format + +For RowBinary insert format, the chunk size is **hardcoded to 256KB** by the ClickHouse Rust client and cannot be configured. Data is auto-flushed at approximately **254KB**. + +**Limitation:** If `max_batch_size * average_message_size > 254KB`, the batch will be split across multiple inserts: + +- **Batch 1** (auto-flush at ~254KB): Succeeds ✓ +- **Batch 2** (remainder at end of `max_batch_size`): Fails ✗ +- **Retry**: Entire batch is retried, causing **duplicates** if retry succeeds + +**Workaround:** Keep batch sizes under 254KB until the ClickHouse Rust client resolves this limitation (see [clickhouse-rs#389](https://github.com/ClickHouse/clickhouse-rs/issues/389)). + +**Example for RowBinary:** + +```toml +[plugin_config] +insert_type = "rowbinary" +max_batch_size = 100 # Keep total batch < 254KB +# chunk_size is ignored for RowBinary (hardcoded to 256KB) +``` + +**Calculation:** If average message size is 2KB, set `max_batch_size ≤ 120` to stay under 254KB (120 * 2KB = 240KB). diff --git a/core/connectors/sinks/clickhouse_sink/config.toml b/core/connectors/sinks/clickhouse_sink/config.toml new file mode 100644 index 0000000000..2ebcdcb6f0 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/config.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "../../target/release/libiggy_connector_clickhouse_sink" +verbose = false + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +consumer_group = "clickhouse_sink" +schema = "json" +batch_length = 100 +poll_interval = "5ms" + +[plugin_config] +url = "http://localhost:8123" +compression_enabled = false +auth_type = "none" +table = "iggy_sink" +insert_type = "json" +include_metadata = false diff --git a/core/connectors/sinks/clickhouse_sink/config.toml.sample b/core/connectors/sinks/clickhouse_sink/config.toml.sample new file mode 100644 index 0000000000..af13a7aace --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/config.toml.sample @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "../../../target/debug/libiggy_connector_clickhouse_sink" +verbose = false + +[[streams]] +stream = "user_events" +topics = ["users", "orders"] +consumer_group = "clickhouse_sink" +schema = "json" +batch_length = 100 +poll_interval = "5ms" + +[plugin_config] +url = "https://clickhouse.example.com:8443" +compression_enabled = true # by default uses lz4 when enabled +auth_type = none # options - none(username/pwd=None), credential, jwt +tls_enabled = true # if true certs are configured in client init +tls_root_ca_cert = "/path/to/ca.pem" +tls_client_cert = "/path/to/client-cert.pem" +tls_client_key = "/path/to/client-key.pem" +username = "user" +password = "pwd" +jwt_token = "token" +roles = ["user_role1",'user_role2"] +database = "test" # database name if other than default +table = "iggy_data" +max_batch_size = 10 # max no of messages/records flushed to clickhouse server +chunk_size = 1024 # defaults to 1 MB for explicit flush control. currently applicable only for json insert_type. Make sure this to be > size of max_batch_size else the connector flush rows in batch size equalling the chunk_size which results in delayed error detection (if any) and in case of retryable error could result in duplicate rows as the whole batch is retried not based on chunk_size. +retry = true #defaults to true +max_retry = 3 +base_delay = 100 # in millis +insert_type = "json" # rowbinary or json. json lets you choose fields using field_mappings, rowbinary supports json as data type or string(json) through payload_format +payload_data_type = "json" # json is for json data type, or string(json) +include_metadata = true # When true, includes all metadata fields (offset, timestamp, checksum, etc.) in the insert. Used by typed Inserter. + +# Extract nested fields. Only these fields are included in the insert. +[plugin_config.field_mappings] +"event.id" = "event_id" +"event.type" = "event_type" +"user.id" = "user_id" +"user.profile.name" = "user_name" +"metadata.timestamp" = "event_time" \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/field_mappings/config.toml b/core/connectors/sinks/clickhouse_sink/field_mappings/config.toml new file mode 100644 index 0000000000..1677936edc --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/field_mappings/config.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "../../../target/release/libiggy_connector_clickhouse_sink" +verbose = false + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +consumer_group = "clickhouse_sink" +schema = "json" +batch_length = 100 +poll_interval = "5ms" + +[plugin_config] +url = "http://localhost:8123" +compression_enabled = false +auth_type = "none" +table = "iggy_sink_mapped" +insert_type = "json" +include_metadata = false + +[plugin_config.field_mappings] +"id" = "msg_id" +"name" = "msg_name" +"amount" = "msg_amount" diff --git a/core/connectors/sinks/clickhouse_sink/src/clickhouse_client.rs b/core/connectors/sinks/clickhouse_sink/src/clickhouse_client.rs new file mode 100644 index 0000000000..258537a2a4 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/clickhouse_client.rs @@ -0,0 +1,203 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{ + AuthType, ClickHouseInsertFormat, ClickHouseSinkConfig, clickhouse_inserter::ClickHouseInserter, +}; +use clickhouse::{Client, Compression}; +use hyper_rustls::HttpsConnectorBuilder; +use hyper_util::{client::legacy::Client as HyperClient, rt::TokioExecutor}; +use rustls::{ClientConfig, RootCertStore}; +use rustls_pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject}; +use std::fmt; + +/// read from config.toml and create client +/// client level configs +/// compression , cred - username/pwd or jwt, cert - client & server(cloud path), format +/// new json| column| blob(string)(goes into the `insert into` sql), role, default role +pub(crate) struct ClickHouseClient { + client: Client, + config: ClickHouseSinkConfig, +} + +impl fmt::Debug for ClickHouseClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClickHouseClient") + .field("client", &"") + .finish() + } +} + +impl ClickHouseClient { + pub(crate) fn init( + config: ClickHouseSinkConfig, + ) -> Result> { + // ── TLS / mTLS ─────────────────────────────────────────────────────────── + // The `clickhouse` crate is compiled with `rustls-tls` (webpki bundle) and + // `rustls-tls-native-roots` (OS trust store), so `Client::default()` already + // handles the two most common cases without any custom connector: + // + // • ClickHouse Cloud / public CA → covered by `rustls-tls` (webpki) + // • Self-signed cert installed → covered by `rustls-tls-native-roots` + // in the host OS trust store (reads /etc/ssl/certs at runtime) + // + // A custom hyper+rustls connector is only needed when the config supplies: + // • `tls_root_ca_cert` – a cert file path NOT in the OS store, OR + // • `tls_client_cert` + `tls_client_key` – mTLS (no crate feature covers this) + let needs_custom_http_client = config.tls_enabled() + && (config.tls_root_ca_cert.is_some() + || (config.tls_client_cert.is_some() && config.tls_client_key.is_some())); + + let base: Client = if needs_custom_http_client { + // ── Root-CA store ──────────────────────────────────────────────────── + // Start with the webpki built-in bundle (same roots that `rustls-tls` + // uses in `Client::default()`), then optionally append any custom CA + // provided via `tls_root_ca_cert`. This means a custom-CA-only config + // still trusts publicly-signed certs, and an mTLS-only config (no + // custom CA) also trusts public CAs correctly. + let mut root_store = RootCertStore { + roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(), + }; + + if let Some(ref ca_path) = config.tls_root_ca_cert { + // Append custom / self-signed CA on top of the public bundle. + for cert in CertificateDer::pem_file_iter(ca_path)? { + root_store.add(cert?)?; + } + } + + // ── rustls ClientConfig ────────────────────────────────────────────── + let tls_config = if let (Some(cert_path), Some(key_path)) = + (&config.tls_client_cert, &config.tls_client_key) + { + // mTLS: load the client certificate chain and private key. + let certs: Vec> = + CertificateDer::pem_file_iter(cert_path)?.collect::>()?; + let key = PrivateKeyDer::from_pem_file(key_path)?; + + ClientConfig::builder() + .with_root_certificates(root_store) + .with_client_auth_cert(certs, key)? + } else { + // Custom CA only, no client cert. + ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth() + }; + + // ── hyper connector ────────────────────────────────────────────────── + let connector = HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_or_http() + .enable_http1() + .build(); + + let hyper_client = HyperClient::builder(TokioExecutor::new()).build(connector); + Client::with_http_client(hyper_client) + } else { + // No cert file or mTLS needed – the default transport covers everything: + // `rustls-tls` → webpki bundle (ClickHouse Cloud, public CAs) + // `rustls-tls-native-roots` → OS trust store (self-signed certs on the host) + Client::default() + }; + + // ── Required: URL ──────────────────────────────────────────────────────── + let mut client = base.with_url(&config.url); + + // ── Compression ────────────────────────────────────────────────────────── + let compression = if config.compression_enabled() { + Compression::Lz4 + } else { + Compression::None + }; + client = client.with_compression(compression); + + // ── Authentication ─────────────────────────────────────────────────────── + // The clickhouse crate panics if credentials and JWT are mixed + match (&config.username, &config.password) { + (Some(_), None) | (None, Some(_)) => { + return Err("ClickHouseSink config: when using credential 'username' and 'password' both must be set".into()); + } + _ => {} + } + + let has_credentials = config.username.is_some(); + + client = match config.auth_type { + AuthType::None => client, + AuthType::Credential if has_credentials => { + client = client.with_user(config.username.as_deref().unwrap()); + client.with_password(config.password.as_deref().unwrap()) + } + AuthType::Credential => { + return Err("ClickHouseSink config: 'auth_type' is 'credential' but 'username' and 'password' are not set".into()); + } + AuthType::Jwt if !has_credentials => { + if let Some(ref token) = config.jwt_token { + client = client.with_access_token(token.clone()); + client + } else { + return Err( + "ClickHouseSink config: 'auth_type' is 'jwt' but 'jwt_token' is not set" + .into(), + ); + } + } + AuthType::Jwt => { + return Err("ClickHouseSink config: 'auth_type' is 'jwt' but 'username'/'password' are also set".into()); + } + }; + + if let Some(role) = &config.role { + client = client.with_roles(role); + } + + // Enable new JSON type usage + client = client.with_option("allow_experimental_json_type", "1"); + client = client.with_option("input_format_binary_read_json_as_string", "1"); + client = client.with_option("output_format_binary_write_json_as_string", "1"); + // Enable nested json objects in input json + client = client.with_option("input_format_import_nested_json", "1"); + + if let Some(database) = &config.database { + client = client.with_database(database) + } + + client = client.with_validation(false); //use rowbinary, disable Row type validation against schema, better for streaming + + Ok(Self { client, config }) + } + + /// Creates a formatted inserter for JSONEachRow format. + pub(crate) fn create_formatted_inserter(&self) -> ClickHouseInserter { + ClickHouseInserter::new( + &self.client, + &self.config.table, + ClickHouseInsertFormat::JsonEachRow, + self.config.chunk_size(), + ) + } + + /// Creates and returns a typed inserter for structured Row data. + pub(crate) fn create_typed_inserter( + &self, + table: &str, + ) -> clickhouse::inserter::Inserter { + self.client.inserter(table) + } +} diff --git a/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs b/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs new file mode 100644 index 0000000000..79c17b90a8 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs @@ -0,0 +1,139 @@ +use std::mem; + +use tokio::time::Duration; + +use clickhouse::{ + Client, + error::{Error, Result}, + insert_formatted::{self, BufInsertFormatted}, +}; + +use crate::ClickHouseInsertFormat; + +#[must_use] +pub struct ClickHouseInserter { + client: Client, + table: String, + format: ClickHouseInsertFormat, + chunk_capacity: usize, + send_timeout: Option, + end_timeout: Option, + insert: Option, + pending: Quantities, + in_transaction: bool, +} + +/// Statistics about pending or inserted data. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Quantities { + pub bytes: u64, + pub rows: u64, + pub transactions: u64, +} + +impl Quantities { + pub const ZERO: Quantities = Quantities { + bytes: 0, + rows: 0, + transactions: 0, + }; +} + +impl ClickHouseInserter { + pub(crate) fn new( + client: &Client, + table: &str, + format: ClickHouseInsertFormat, + chunk_capacity: usize, + ) -> Self { + Self { + client: client.clone(), + table: table.into(), + format, + chunk_capacity, + send_timeout: None, + end_timeout: None, + insert: None, + pending: Quantities::ZERO, + in_transaction: false, + } + } + + /// Serializes the provided row into an internal buffer. + /// # Panics + /// If called after the previous call that returned an error. + #[inline] + pub async fn write(&mut self, row: Vec) -> Result<()> { + if self.insert.is_none() { + self.init_insert().await?; + } + + let insert = self.insert.as_mut().unwrap(); + + let newline = b"\n"; + let bytes_json = row.len(); + let bytes_newline = newline.len(); + + if bytes_json + .checked_add(bytes_newline) + .and_then(|sum| sum.checked_add(insert.buf_len())) + .is_none() + { + return Err(Error::Custom("Buffer overflow".to_owned())); + } + //JSON row + insert.write_buffered(&row); + + //newline separator + insert.write_buffered(newline); + + let total_bytes = bytes_json + bytes_newline; + self.pending.bytes += total_bytes as u64; + self.pending.rows += 1; + + if !self.in_transaction { + self.pending.transactions += 1; + self.in_transaction = true; + } + Ok(()) + } + + /// Ends the current `INSERT` unconditionally. + pub async fn force_commit(&mut self) -> Result { + let quantities = self.insert().await?; + Ok(quantities) + } + + async fn insert(&mut self) -> Result { + self.in_transaction = false; + let quantities = mem::replace(&mut self.pending, Quantities::ZERO); + + if let Some(mut insert) = self.insert.take() { + insert.end().await?; + } + + // TODO: Reinstate the callback + // if let Some(cb) = &mut self.on_commit + // && quantities.transactions > 0 + // { + // (cb)(&quantities); + // } + + Ok(quantities) + } + + #[cold] + #[inline(never)] + async fn init_insert(&mut self) -> Result<()> { + debug_assert!(self.insert.is_none()); + debug_assert_eq!(self.pending, Quantities::ZERO); + let sql = format!("INSERT INTO {} FORMAT {}", self.table, self.format.as_str()); + let new_insert: BufInsertFormatted = self + .client + .insert_formatted_with(sql) + .with_timeouts(self.send_timeout, self.end_timeout) + .buffered_with_capacity(self.chunk_capacity); + self.insert = Some(new_insert); + Ok(()) + } +} diff --git a/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs b/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs new file mode 100644 index 0000000000..75f80f4650 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs @@ -0,0 +1,205 @@ +use crate::clickhouse_inserter::Quantities; +use clickhouse::Row; +use iggy_connector_sdk::ConsumedMessage; + +/// Common trait for all inserter types (Inserter, ClickHouseInserter, etc.) +#[allow(async_fn_in_trait)] +pub trait GenericInserter: Send { + /// The type of data this inserter accepts + type WriteData; + + /// Write a single item + async fn write(&mut self, data: Self::WriteData) -> clickhouse::error::Result<()>; + + /// Force commit regardless of limits + async fn force_commit(&mut self) -> clickhouse::error::Result; +} + +// Implement the trait for the standard Inserter +impl GenericInserter for clickhouse::inserter::Inserter +where + T: Row + clickhouse::RowWrite, +{ + type WriteData = T::Value<'static>; + + async fn write(&mut self, data: Self::WriteData) -> clickhouse::error::Result<()> { + self.write(&data).await + } + + async fn force_commit(&mut self) -> clickhouse::error::Result { + let q = self.force_commit().await?; + Ok(Quantities { + bytes: q.bytes, + rows: q.rows, + transactions: q.transactions, + }) + } +} + +// Implement the trait for JsonInserter +impl GenericInserter for crate::ClickHouseInserter { + type WriteData = Vec; + + async fn write(&mut self, data: Self::WriteData) -> clickhouse::error::Result<()> { + self.write(data).await + } + + async fn force_commit(&mut self) -> clickhouse::error::Result { + let q = self.force_commit().await?; + Ok(Quantities { + bytes: q.bytes, + rows: q.rows, + transactions: q.transactions, + }) + } +} + +/// Write and force-commit +/// +/// # Type Parameters +/// - `I`: The inserter type (must implement GenericInserter) +/// - `F`: Function to prepare WriteData from a ConsumedMessage +pub async fn run_inserter( + mut inserter: I, + messages: &[ConsumedMessage], + mut prepare_data: F, +) -> clickhouse::error::Result<()> +where + I: GenericInserter, + F: FnMut(&ConsumedMessage) -> clickhouse::error::Result, +{ + for message in messages { + // Prepare the data in the format expected by this inserter + let data = prepare_data(message)?; + inserter.write(data).await?; + } + + // force_commit to flush + inserter.force_commit().await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_connector_sdk::Payload; + use std::sync::{Arc, Mutex}; + + // ── Mock inserter ──────────────────────────────────────────────────────── + + #[derive(Debug, Clone, PartialEq, Eq)] + enum MockCall { + Write(String), + ForceCommit, + } + + struct MockInserter { + calls: Arc>>, + write_error_on_call: Option, + write_count: usize, + } + + impl MockInserter { + fn new(calls: Arc>>) -> Self { + Self { + calls, + write_error_on_call: None, + write_count: 0, + } + } + + fn failing_on_write(calls: Arc>>, fail_on_call: usize) -> Self { + Self { + calls, + write_error_on_call: Some(fail_on_call), + write_count: 0, + } + } + } + + impl GenericInserter for MockInserter { + type WriteData = String; + + async fn write(&mut self, data: Self::WriteData) -> clickhouse::error::Result<()> { + self.write_count += 1; + if self.write_error_on_call == Some(self.write_count) { + return Err(clickhouse::error::Error::Custom("mock write error".into())); + } + self.calls.lock().unwrap().push(MockCall::Write(data)); + Ok(()) + } + + async fn force_commit(&mut self) -> clickhouse::error::Result { + self.calls.lock().unwrap().push(MockCall::ForceCommit); + Ok(Quantities { + bytes: 0, + rows: 0, + transactions: 0, + }) + } + } + + // ── Test fixtures ──────────────────────────────────────────────────────── + + fn test_message(id: u128) -> ConsumedMessage { + ConsumedMessage { + id, + offset: 0, + checksum: 0, + timestamp: 0, + origin_timestamp: 0, + headers: None, + payload: Payload::Json(simd_json::json!({"msg_id": id as u64})), + } + } + + // ── run_inserter tests ─────────────────────────────────────────────────── + + #[tokio::test] + async fn given_messages_should_write_each_and_force_commit() { + let calls = Arc::new(Mutex::new(Vec::new())); + let inserter = MockInserter::new(calls.clone()); + let messages = vec![test_message(1), test_message(2), test_message(3)]; + + let result = run_inserter(inserter, &messages, |msg| Ok(format!("msg-{}", msg.id))).await; + + assert!(result.is_ok()); + let recorded = calls.lock().unwrap(); + assert_eq!(recorded.len(), 4); // 3 writes + 1 force_commit + assert_eq!(recorded[0], MockCall::Write("msg-1".into())); + assert_eq!(recorded[1], MockCall::Write("msg-2".into())); + assert_eq!(recorded[2], MockCall::Write("msg-3".into())); + assert_eq!(recorded[3], MockCall::ForceCommit); + } + + #[tokio::test] + async fn given_prepare_data_error_should_propagate() { + let calls = Arc::new(Mutex::new(Vec::new())); + let inserter = MockInserter::new(calls.clone()); + let messages = vec![test_message(1)]; + + let result = run_inserter(inserter, &messages, |_msg| { + Err(clickhouse::error::Error::Custom("prepare failed".into())) + }) + .await; + + assert!(result.is_err()); + let recorded = calls.lock().unwrap(); + // No write, no force_commit — error propagated immediately + assert!(recorded.is_empty()); + } + + #[tokio::test] + async fn given_write_error_should_propagate() { + let calls = Arc::new(Mutex::new(Vec::new())); + let inserter = MockInserter::failing_on_write(calls.clone(), 1); + let messages = vec![test_message(1), test_message(2)]; + + let result = run_inserter(inserter, &messages, |msg| Ok(format!("msg-{}", msg.id))).await; + + assert!(result.is_err()); + let recorded = calls.lock().unwrap(); + // No successful write recorded, no force_commit + assert!(recorded.is_empty()); + } +} diff --git a/core/connectors/sinks/clickhouse_sink/src/lib.rs b/core/connectors/sinks/clickhouse_sink/src/lib.rs new file mode 100644 index 0000000000..f17a4631d2 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/lib.rs @@ -0,0 +1,460 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::collections::HashMap; + +use crate::clickhouse_client::ClickHouseClient; +use crate::clickhouse_inserter::ClickHouseInserter; +use clickhouse::Row; +use iggy_connector_sdk::sink_connector; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; + +mod clickhouse_client; +mod clickhouse_inserter; +mod generic_inserter; +mod sink; + +sink_connector!(ClickHouseSink); + +/// Default configuration values for ClickHouse sink +pub struct ClickHouseSinkDefaults; + +impl ClickHouseSinkDefaults { + pub const COMPRESSION_ENABLED: bool = true; + pub const TLS_ENABLED: bool = false; + pub const TOKEN_AUTH: bool = false; + pub const MAX_BATCH_SIZE: u32 = 1000; + pub const CHUNK_SIZE: usize = 1024 * 1024; //1MB + pub const RETRY: bool = true; + pub const MAX_RETRY: u32 = 3; + pub const RETRY_BASE_DELAY: u64 = 500; //500 milliseconds + pub const INCLUDE_METADATA: bool = false; + pub const VERBOSE_LOGGING: bool = false; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ClickHouseSinkConfig { + pub url: String, + + pub compression_enabled: Option, + + pub tls_enabled: Option, + + // Server verification + pub tls_root_ca_cert: Option, + + // Client authentication (mTLS) + pub tls_client_cert: Option, + pub tls_client_key: Option, + + // Authentication + pub auth_type: AuthType, + pub username: Option, + pub password: Option, + + pub jwt_token: Option, + + // Role + pub role: Option>, + + // Database and table + pub table: String, + pub database: Option, + + // Batch configuration + pub max_batch_size: Option, + + // Chunk size + pub chunk_size: Option, + + // Retry configuration + pub retry: Option, + pub max_retry: Option, + pub base_delay: Option, + + // Insert configuration + pub insert_type: InsertType, + pub payload_data_type: Option, + + /// When true, includes all metadata fields (offset, timestamp, checksum, etc.). + pub include_metadata: Option, + + pub field_mappings: Option>, + + pub verbose_logging: Option, +} + +impl ClickHouseSinkConfig { + pub fn compression_enabled(&self) -> bool { + self.compression_enabled + .unwrap_or(ClickHouseSinkDefaults::COMPRESSION_ENABLED) + } + + pub fn tls_enabled(&self) -> bool { + self.tls_enabled + .unwrap_or(ClickHouseSinkDefaults::TLS_ENABLED) + } + + pub fn max_batch_size(&self) -> u32 { + self.max_batch_size + .unwrap_or(ClickHouseSinkDefaults::MAX_BATCH_SIZE) + } + + pub fn chunk_size(&self) -> usize { + self.chunk_size + .unwrap_or(ClickHouseSinkDefaults::CHUNK_SIZE) + } + + pub fn retry(&self) -> bool { + self.retry.unwrap_or(ClickHouseSinkDefaults::RETRY) + } + + pub fn max_retry(&self) -> u32 { + self.max_retry.unwrap_or(ClickHouseSinkDefaults::MAX_RETRY) + } + + pub fn retry_base_delay(&self) -> u64 { + self.base_delay + .unwrap_or(ClickHouseSinkDefaults::RETRY_BASE_DELAY) + } + + pub fn include_metadata(&self) -> bool { + self.include_metadata + .unwrap_or(ClickHouseSinkDefaults::INCLUDE_METADATA) + } + + pub fn verbose_logging(&self) -> bool { + self.verbose_logging + .unwrap_or(ClickHouseSinkDefaults::VERBOSE_LOGGING) + } +} + +#[derive(Debug)] +struct ClickHouseSink { + //id + pub id: u32, + //client + client: Option, + //config + config: ClickHouseSinkConfig, + state: Mutex, + verbose: bool, +} + +#[derive(Debug, Default)] +struct State { + messages_processed: u64, + insert_attempt_failed: u64, + insert_batch_failed: u64, +} + +/// Insert type for ClickHouse +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum InsertType { + /// JSONEachRow format - flexible, supports field mappings + #[default] + Json, + /// RowBinary - efficient but requires predefined structs + RowBinary, +} + +impl std::fmt::Display for InsertType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + InsertType::Json => write!(f, "json"), + InsertType::RowBinary => write!(f, "rowbinary"), + } + } +} + +impl std::str::FromStr for InsertType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "json" => Ok(InsertType::Json), + "rowbinary" | "row_binary" => Ok(InsertType::RowBinary), + _ => Err(format!( + "Invalid insert type: '{}'. Valid options: 'json', 'rowbinary'", + s + )), + } + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AuthType { + #[default] + None, + Credential, + Jwt, +} + +impl std::fmt::Display for AuthType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AuthType::None => write!(f, "none"), + AuthType::Credential => write!(f, "credential"), + AuthType::Jwt => write!(f, "jwt"), + } + } +} + +impl std::str::FromStr for AuthType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "none" => Ok(AuthType::None), + "credential" | "cred" | "creds" => Ok(AuthType::Credential), + "jwt" => Ok(AuthType::Jwt), + _ => Err(format!( + "Invalid auth type: '{}'. Valid options: 'none', 'credential','jwt'", + s + )), + } + } +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub enum ClickHouseInsertFormat { + #[default] + #[serde(rename = "JSONEachRow")] + JsonEachRow, +} + +impl ClickHouseInsertFormat { + pub const fn as_str(&self) -> &'static str { + match self { + ClickHouseInsertFormat::JsonEachRow => "JSONEachRow", + } + } +} + +impl std::fmt::Display for ClickHouseInsertFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl std::str::FromStr for ClickHouseInsertFormat { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "JSONEachRow" => Ok(ClickHouseInsertFormat::JsonEachRow), + _ => Err(format!( + "Invalid ClickHouse insert format: '{}'. Currently only 'JSONEachRow' is supported.", + s + )), + } + } +} + +/// Message row without metadata +#[derive(Debug, Clone, Row, Serialize, Deserialize)] +pub struct MessageRowWithoutMetadata { + pub payload: String, +} + +/// Message row with full metadata +/// All metadata fields use the `iggy_` prefix to distinguish them from payload data +#[derive(Debug, Clone, Row, Serialize, Deserialize)] +pub struct MessageRowWithMetadata { + pub iggy_stream: String, + pub iggy_topic: String, + pub iggy_partition_id: u32, + pub iggy_id: String, + pub iggy_offset: u64, + pub iggy_checksum: u64, + pub iggy_timestamp: u64, + pub iggy_origin_timestamp: u64, + pub payload: String, +} + +// #[derive(Error, Debug)] +// pub enum FormatError { +// #[error("Invalid header (expected {expected:?}, got {found:?})")] +// InvalidHeader { +// expected: String, +// found: String, +// }, +// #[error("Missing attribute: {0}")] +// MissingAttribute(String), +// } + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + fn test_config_all_none() -> ClickHouseSinkConfig { + ClickHouseSinkConfig { + url: "http://localhost:8123".to_string(), + compression_enabled: None, + tls_enabled: None, + tls_root_ca_cert: None, + tls_client_cert: None, + tls_client_key: None, + auth_type: AuthType::None, + username: None, + password: None, + jwt_token: None, + role: None, + table: "test_table".to_string(), + database: None, + max_batch_size: None, + chunk_size: None, + retry: None, + max_retry: None, + base_delay: None, + insert_type: InsertType::Json, + payload_data_type: None, + include_metadata: None, + field_mappings: None, + verbose_logging: None, + } + } + + // ── Config default accessor tests ──────────────────────────────────────── + + #[test] + fn given_default_config_should_use_all_defaults() { + let config = test_config_all_none(); + + assert_eq!( + config.compression_enabled(), + ClickHouseSinkDefaults::COMPRESSION_ENABLED + ); + assert_eq!(config.tls_enabled(), ClickHouseSinkDefaults::TLS_ENABLED); + assert_eq!( + config.max_batch_size(), + ClickHouseSinkDefaults::MAX_BATCH_SIZE + ); + assert_eq!(config.chunk_size(), ClickHouseSinkDefaults::CHUNK_SIZE); + assert_eq!(config.retry(), ClickHouseSinkDefaults::RETRY); + assert_eq!(config.max_retry(), ClickHouseSinkDefaults::MAX_RETRY); + assert_eq!( + config.retry_base_delay(), + ClickHouseSinkDefaults::RETRY_BASE_DELAY + ); + assert_eq!( + config.include_metadata(), + ClickHouseSinkDefaults::INCLUDE_METADATA + ); + assert_eq!( + config.verbose_logging(), + ClickHouseSinkDefaults::VERBOSE_LOGGING + ); + } + + #[test] + fn given_custom_config_should_override_all_defaults() { + let mut config = test_config_all_none(); + config.compression_enabled = Some(false); + config.tls_enabled = Some(true); + config.max_batch_size = Some(500); + config.chunk_size = Some(2048); + config.retry = Some(false); + config.max_retry = Some(10); + config.base_delay = Some(1000); + config.include_metadata = Some(true); + config.verbose_logging = Some(true); + + assert!(!config.compression_enabled()); + assert!(config.tls_enabled()); + assert_eq!(config.max_batch_size(), 500); + assert_eq!(config.chunk_size(), 2048); + assert!(!config.retry()); + assert_eq!(config.max_retry(), 10); + assert_eq!(config.retry_base_delay(), 1000); + assert!(config.include_metadata()); + assert!(config.verbose_logging()); + } + + // ── InsertType parsing tests ───────────────────────────────────────────── + + #[test] + fn given_valid_insert_type_string_should_parse() { + assert!(matches!(InsertType::from_str("json"), Ok(InsertType::Json))); + assert!(matches!( + InsertType::from_str("rowbinary"), + Ok(InsertType::RowBinary) + )); + assert!(matches!( + InsertType::from_str("row_binary"), + Ok(InsertType::RowBinary) + )); + } + + #[test] + fn given_invalid_insert_type_string_should_fail() { + assert!(InsertType::from_str("xml").is_err()); + assert!(InsertType::from_str("").is_err()); + } + + // ── AuthType parsing tests ─────────────────────────────────────────────── + + #[test] + fn given_valid_auth_type_string_should_parse() { + assert!(matches!(AuthType::from_str("none"), Ok(AuthType::None))); + assert!(matches!( + AuthType::from_str("credential"), + Ok(AuthType::Credential) + )); + assert!(matches!( + AuthType::from_str("cred"), + Ok(AuthType::Credential) + )); + assert!(matches!( + AuthType::from_str("creds"), + Ok(AuthType::Credential) + )); + assert!(matches!(AuthType::from_str("jwt"), Ok(AuthType::Jwt))); + } + + #[test] + fn given_invalid_auth_type_string_should_fail() { + assert!(AuthType::from_str("oauth").is_err()); + assert!(AuthType::from_str("").is_err()); + } + + // ── ClickHouseInsertFormat parsing tests ───────────────────────────────── + + #[test] + fn given_valid_insert_format_string_should_parse() { + assert!(matches!( + ClickHouseInsertFormat::from_str("JSONEachRow"), + Ok(ClickHouseInsertFormat::JsonEachRow) + )); + } + + #[test] + fn given_invalid_insert_format_string_should_fail() { + assert!(ClickHouseInsertFormat::from_str("CSV").is_err()); + assert!(ClickHouseInsertFormat::from_str("jsoneachrow").is_err()); + } + + #[test] + fn given_json_each_row_format_as_str_should_return_correct_value() { + assert_eq!(ClickHouseInsertFormat::JsonEachRow.as_str(), "JSONEachRow"); + } +} diff --git a/core/connectors/sinks/clickhouse_sink/src/sink.rs b/core/connectors/sinks/clickhouse_sink/src/sink.rs new file mode 100644 index 0000000000..600688afcc --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/sink.rs @@ -0,0 +1,782 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::time::Duration; + +use crate::clickhouse_client::ClickHouseClient; +use crate::generic_inserter; +use crate::{ + ClickHouseSink, ClickHouseSinkConfig, InsertType, MessageRowWithMetadata, + MessageRowWithoutMetadata, +}; +use async_trait::async_trait; +use clickhouse::error::Error as ChError; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata}; +use simd_json::{OwnedValue, base::ValueAsObject}; +use tracing::{debug, error, info, warn}; + +#[async_trait] +impl Sink for ClickHouseSink { + async fn open(&mut self) -> Result<(), Error> { + let clickhouse_client = ClickHouseClient::init(self.config.clone()) + .map_err(|e| Error::InitError(e.to_string()))?; + self.client = Some(clickhouse_client); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + self.batch_insert(&messages, topic_metadata, &messages_metadata) + .await + } + + async fn close(&mut self) -> Result<(), Error> { + info!("Closing ClickHouse sink connector with ID: {}", self.id); + + let state = self.state.lock().await; + info!( + "ClickHouse sink ID: {} processed {} messages with {} batch attempt failures and with {} batch failures", + self.id, state.messages_processed, state.insert_batch_failed, state.insert_batch_failed + ); + Ok(()) + } +} + +impl ClickHouseSink { + pub fn new(id: u32, config: ClickHouseSinkConfig) -> Self { + let verbose = config.verbose_logging(); + ClickHouseSink { + id, + client: None, + config, + state: tokio::sync::Mutex::new(Default::default()), + verbose, + } + } + + async fn run_inserter( + &self, + messages: &[ConsumedMessage], + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> clickhouse::error::Result<()> { + match self.config.insert_type { + InsertType::Json => { + let inserter = self + .client + .as_ref() + .ok_or_else(|| { + clickhouse::error::Error::Custom( + "ClickHouse client not initialized".to_string(), + ) + })? + .create_formatted_inserter(); + + generic_inserter::run_inserter(inserter, messages, |msg| { + self.prepare_json_data(msg, topic_metadata, messages_metadata) + }) + .await?; + } + InsertType::RowBinary => { + if self.config.include_metadata() { + let inserter = self + .client + .as_ref() + .ok_or_else(|| { + clickhouse::error::Error::Custom( + "ClickHouse client not initialized".to_string(), + ) + })? + .create_typed_inserter::(&self.config.table); + + generic_inserter::run_inserter(inserter, messages, |msg| { + self.prepare_row_with_metadata(msg, topic_metadata, messages_metadata) + }) + .await?; + } else { + let inserter = self + .client + .as_ref() + .ok_or_else(|| { + clickhouse::error::Error::Custom( + "ClickHouse client not initialized".to_string(), + ) + })? + .create_typed_inserter::(&self.config.table); + + generic_inserter::run_inserter(inserter, messages, |msg| { + self.prepare_row_without_metadata(msg) + }) + .await?; + } + } + } + + Ok(()) + } + + // create the batch and invoke try_insert + async fn batch_insert( + &self, + messages: &[ConsumedMessage], + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> Result<(), Error> { + let batch_size = self.config.max_batch_size() as usize; + for chunk in messages.chunks(batch_size) { + self.try_insert(chunk, topic_metadata, messages_metadata) + .await?; + } + Ok(()) + } + + // try insert batch of data with retry + async fn try_insert( + &self, + messages: &[ConsumedMessage], + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> Result<(), Error> { + let retry = self.config.retry(); + let max_retries = self.config.max_retry(); + let mut retry_count = 0; + let base_delay = Duration::from_millis(self.config.retry_base_delay()); + + loop { + match self + .run_inserter(messages, topic_metadata, messages_metadata) + .await + { + Ok(_) => { + let msg_count = messages.len(); + let table = &self.config.table; + if self.verbose { + info!( + "ClickHouse sink ID: {} processed {} messages to table '{}' (attempt {})", + self.id, + msg_count, + table, + retry_count + 1 + ); + } else { + debug!( + "ClickHouse sink ID: {} processed {} messages to table '{}' (attempt {})", + self.id, + msg_count, + table, + retry_count + 1 + ); + } + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + drop(state); + + return Ok(()); + } + //TODO: is_retryable check -- done + Err(ch_err) if retry && retry_count < max_retries && is_retryable(&ch_err) => { + let delay = base_delay * 2u32.pow(retry_count); + warn!( + "Failed to write messages (attempt {}): {}. Retrying in {:?}...", + retry_count + 1, + ch_err, + delay + ); + let mut state = self.state.lock().await; + state.insert_attempt_failed += 1; + drop(state); + retry_count += 1; + tokio::time::sleep(delay).await; + } + Err(ch_err) => { + if retry && retry_count >= max_retries { + error!( + "Failed to write {} messages after {} attempts: {}", + messages.len(), + max_retries + 1, + ch_err + ); + } else { + error!("Non-retryable error, aborting insert: {}", ch_err); + } + + let mut state = self.state.lock().await; + state.insert_batch_failed += 1; + drop(state); + + return Err(Error::CannotStoreData(ch_err.to_string())); + } + } + } + } + + fn prepare_json_data( + &self, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> clickhouse::error::Result> { + let mut json_obj = if let Some(field_mappings) = &self.config.field_mappings { + let fields: Vec = field_mappings.keys().cloned().collect(); + self.get_json_row(message, &fields) + .map_err(|e| clickhouse::error::Error::Custom(e.to_string()))? + } else { + match &message.payload { + Payload::Json(v) => { + if let Some(obj) = v.as_object() { + obj.clone() + } else { + return Err(clickhouse::error::Error::Custom( + "Payload is not a JSON object".to_string(), + )); + } + } + _ => { + return Err(clickhouse::error::Error::Custom( + "Expected JSON payload".to_string(), + )); + } + } + }; + + if self.config.include_metadata() { + self.add_metadata_to_json_object( + &mut json_obj, + message, + topic_metadata, + messages_metadata, + ); + } + + self.get_bytes_from_json(json_obj) + .map_err(|e| clickhouse::error::Error::Custom(e.to_string())) + } + + fn prepare_row_with_metadata( + &self, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> clickhouse::error::Result { + self.message_to_row_with_metadata(message, topic_metadata, messages_metadata) + .map_err(|e| clickhouse::error::Error::Custom(e.to_string())) + } + + fn prepare_row_without_metadata( + &self, + message: &ConsumedMessage, + ) -> clickhouse::error::Result { + self.message_to_row_without_metadata(message) + .map_err(|e| clickhouse::error::Error::Custom(e.to_string())) + } + + /// Extract a value from a nested JSON path like "a.b.c" + /// Returns None if the path doesn't exist or isn't valid + fn extract_field<'a>( + &self, + value: &'a simd_json::OwnedValue, + path: &str, + ) -> Option<&'a simd_json::OwnedValue> { + if path.is_empty() { + return Some(value); + } + + //non-nested + if !path.contains('.') { + return value.as_object()?.get(path); + } + + let mut current = value; + for segment in path.split('.') { + current = current.as_object()?.get(segment)?; + } + Some(current) + } + + fn map_field_path_to_column(&self, field_path: &str) -> Result { + self.config + .field_mappings + .as_ref() + .ok_or(Error::InvalidConfig)? + .get(field_path) + .cloned() + .map_or(Ok(field_path.to_string()), Ok) + } + + fn get_json_payload<'a>(&self, message: &'a ConsumedMessage) -> Result<&'a OwnedValue, Error> { + let json_value = match &message.payload { + Payload::Json(v) => v, + _ => return Err(Error::InvalidJsonPayload), + }; + Ok(json_value) + } + + fn get_json_row( + &self, + message: &ConsumedMessage, + fields: &[String], + ) -> Result { + let mut row_obj = simd_json::owned::Object::new(); + let json_value = self.get_json_payload(message)?; + for field_path in fields { + if let Some(value) = self.extract_field(json_value, field_path) { + let column_name = self.map_field_path_to_column(field_path)?; + row_obj.insert(column_name, value.clone()); + } + } + Ok(row_obj) + } + + fn get_bytes_from_json(&self, row_obj: simd_json::owned::Object) -> Result, Error> { + let row_bytes = simd_json::to_vec(&OwnedValue::Object(Box::new(row_obj))) + .map_err(|_e| Error::Serialization("bad json body".to_string()))?; + Ok(row_bytes) + } + + /// All metadata fields use the `iggy_` prefix + fn add_metadata_to_json_object( + &self, + json_obj: &mut simd_json::owned::Object, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) { + json_obj.insert( + "iggy_stream".to_string(), + OwnedValue::from(topic_metadata.stream.clone()), + ); + json_obj.insert( + "iggy_topic".to_string(), + OwnedValue::from(topic_metadata.topic.clone()), + ); + json_obj.insert( + "iggy_partition_id".to_string(), + OwnedValue::from(messages_metadata.partition_id), + ); + json_obj.insert( + "iggy_id".to_string(), + OwnedValue::from(message.id.to_string()), + ); + json_obj.insert("iggy_offset".to_string(), OwnedValue::from(message.offset)); + json_obj.insert( + "iggy_checksum".to_string(), + OwnedValue::from(message.checksum), + ); + json_obj.insert( + "iggy_timestamp".to_string(), + OwnedValue::from(message.timestamp), + ); + json_obj.insert( + "iggy_origin_timestamp".to_string(), + OwnedValue::from(message.origin_timestamp), + ); + } + + fn message_to_row_without_metadata( + &self, + message: &ConsumedMessage, + ) -> Result { + let payload_string = match &message.payload { + Payload::Json(json_val) => simd_json::to_string(json_val).map_err(|e| { + Error::Serialization(format!("Failed to serialize JSON payload: {}", e)) + })?, + _ => return Err(Error::InvalidPayloadType), + }; + + Ok(MessageRowWithoutMetadata { + payload: payload_string, + }) + } + + fn message_to_row_with_metadata( + &self, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> Result { + let payload_string = match &message.payload { + Payload::Json(json_val) => simd_json::to_string(json_val).map_err(|e| { + Error::Serialization(format!("Failed to serialize JSON payload: {}", e)) + })?, + _ => return Err(Error::InvalidPayloadType), + }; + + Ok(MessageRowWithMetadata { + iggy_stream: topic_metadata.stream.clone(), + iggy_topic: topic_metadata.topic.clone(), + iggy_partition_id: messages_metadata.partition_id, + iggy_id: message.id.to_string(), + iggy_offset: message.offset, + iggy_checksum: message.checksum, + iggy_timestamp: message.timestamp, + iggy_origin_timestamp: message.origin_timestamp, + payload: payload_string, + }) + } +} + +fn is_retryable(error: &clickhouse::error::Error) -> bool { + matches!( + error, + ChError::Network(_) | ChError::TimedOut | ChError::Other(_) + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{AuthType, ClickHouseSink, ClickHouseSinkConfig, InsertType}; + use iggy_connector_sdk::Schema; + use std::collections::HashMap; + + // ── Test fixtures ──────────────────────────────────────────────────────── + + fn test_config() -> ClickHouseSinkConfig { + ClickHouseSinkConfig { + url: "http://localhost:8123".to_string(), + compression_enabled: None, + tls_enabled: None, + tls_root_ca_cert: None, + tls_client_cert: None, + tls_client_key: None, + auth_type: AuthType::None, + username: None, + password: None, + jwt_token: None, + role: None, + table: "test_table".to_string(), + database: None, + max_batch_size: None, + chunk_size: None, + retry: None, + max_retry: None, + base_delay: None, + insert_type: InsertType::Json, + payload_data_type: None, + include_metadata: None, + field_mappings: None, + verbose_logging: None, + } + } + + fn test_config_with_mappings(mappings: HashMap) -> ClickHouseSinkConfig { + let mut config = test_config(); + config.field_mappings = Some(mappings); + config + } + + fn test_sink(config: ClickHouseSinkConfig) -> ClickHouseSink { + ClickHouseSink::new(1, config) + } + + fn test_json_message(json: simd_json::OwnedValue) -> ConsumedMessage { + ConsumedMessage { + id: 42, + offset: 100, + checksum: 999, + timestamp: 1_700_000_000_000_000, + origin_timestamp: 1_699_999_999_000_000, + headers: None, + payload: Payload::Json(json), + } + } + + fn test_topic_metadata() -> TopicMetadata { + TopicMetadata { + stream: "test_stream".to_string(), + topic: "test_topic".to_string(), + } + } + + fn test_messages_metadata() -> MessagesMetadata { + MessagesMetadata { + partition_id: 1, + current_offset: 100, + schema: Schema::Json, + } + } + + fn sample_json() -> simd_json::OwnedValue { + simd_json::json!({ + "name": "Alice", + "age": 30, + "address": { + "city": "Wonderland", + "zip": "12345" + } + }) + } + + // ── extract_field tests ────────────────────────────────────────────────── + + #[test] + fn given_nested_json_path_should_extract_value() { + let sink = test_sink(test_config()); + let json = sample_json(); + + let result = sink.extract_field(&json, "address.city"); + assert_eq!(result, Some(&OwnedValue::from("Wonderland"))); + } + + #[test] + fn given_single_level_path_should_extract_value() { + let sink = test_sink(test_config()); + let json = sample_json(); + + let result = sink.extract_field(&json, "name"); + assert_eq!(result, Some(&OwnedValue::from("Alice"))); + } + + #[test] + fn given_empty_path_should_return_root() { + let sink = test_sink(test_config()); + let json = sample_json(); + + let result = sink.extract_field(&json, ""); + assert_eq!(result, Some(&json)); + } + + #[test] + fn given_missing_path_should_return_none() { + let sink = test_sink(test_config()); + let json = sample_json(); + + assert_eq!(sink.extract_field(&json, "nonexistent"), None); + assert_eq!(sink.extract_field(&json, "address.country"), None); + assert_eq!(sink.extract_field(&json, "a.b.c"), None); + } + + // ── map_field_path_to_column tests ─────────────────────────────────────── + + #[test] + fn given_field_mappings_should_map_path_to_column() { + let mut mappings = HashMap::new(); + mappings.insert("user.name".to_string(), "username".to_string()); + let sink = test_sink(test_config_with_mappings(mappings)); + + let result = sink.map_field_path_to_column("user.name"); + assert_eq!(result.unwrap(), "username"); + } + + #[test] + fn given_unmapped_path_should_fall_back_to_path() { + let mut mappings = HashMap::new(); + mappings.insert("user.name".to_string(), "username".to_string()); + let sink = test_sink(test_config_with_mappings(mappings)); + + let result = sink.map_field_path_to_column("user.email"); + assert_eq!(result.unwrap(), "user.email"); + } + + #[test] + fn given_no_field_mappings_should_return_error() { + let sink = test_sink(test_config()); + + let result = sink.map_field_path_to_column("anything"); + assert!(matches!(result, Err(Error::InvalidConfig))); + } + + // ── get_json_payload tests ─────────────────────────────────────────────── + + #[test] + fn given_json_payload_should_return_json_value() { + let sink = test_sink(test_config()); + let json = sample_json(); + let message = test_json_message(json.clone()); + + let result = sink.get_json_payload(&message); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), &json); + } + + #[test] + fn given_non_json_payload_should_return_error() { + let sink = test_sink(test_config()); + let message = ConsumedMessage { + id: 1, + offset: 0, + checksum: 0, + timestamp: 0, + origin_timestamp: 0, + headers: None, + payload: Payload::Raw(vec![1, 2, 3]), + }; + + let result = sink.get_json_payload(&message); + assert!(matches!(result, Err(Error::InvalidJsonPayload))); + } + + // ── get_json_row tests ─────────────────────────────────────────────────── + + #[test] + fn given_fields_and_mappings_should_build_json_row() { + let mut mappings = HashMap::new(); + mappings.insert("name".to_string(), "user_name".to_string()); + mappings.insert("address.city".to_string(), "city".to_string()); + let sink = test_sink(test_config_with_mappings(mappings)); + + let message = test_json_message(sample_json()); + let fields = vec!["name".to_string(), "address.city".to_string()]; + + let row = sink.get_json_row(&message, &fields).unwrap(); + assert_eq!(row.get("user_name"), Some(&OwnedValue::from("Alice"))); + assert_eq!(row.get("city"), Some(&OwnedValue::from("Wonderland"))); + assert_eq!(row.len(), 2); + } + + #[test] + fn given_missing_fields_should_skip_in_json_row() { + let mut mappings = HashMap::new(); + mappings.insert("name".to_string(), "user_name".to_string()); + mappings.insert("nonexistent".to_string(), "nothing".to_string()); + let sink = test_sink(test_config_with_mappings(mappings)); + + let message = test_json_message(sample_json()); + let fields = vec!["name".to_string(), "nonexistent".to_string()]; + + let row = sink.get_json_row(&message, &fields).unwrap(); + assert_eq!(row.get("user_name"), Some(&OwnedValue::from("Alice"))); + assert_eq!(row.get("nothing"), None); + assert_eq!(row.len(), 1); + } + + // ── get_bytes_from_json tests ──────────────────────────────────────────── + + #[test] + fn given_json_object_should_serialize_to_bytes() { + let sink = test_sink(test_config()); + let mut obj = simd_json::owned::Object::new(); + obj.insert("key".to_string(), OwnedValue::from("value")); + + let bytes = sink.get_bytes_from_json(obj).unwrap(); + let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(parsed["key"], "value"); + } + + // ── add_metadata_to_json_object tests ──────────────────────────────────── + + #[test] + fn given_message_should_add_all_metadata_fields() { + let sink = test_sink(test_config()); + let message = test_json_message(sample_json()); + let topic_meta = test_topic_metadata(); + let msg_meta = test_messages_metadata(); + let mut obj = simd_json::owned::Object::new(); + + sink.add_metadata_to_json_object(&mut obj, &message, &topic_meta, &msg_meta); + + assert_eq!( + obj.get("iggy_stream"), + Some(&OwnedValue::from("test_stream")) + ); + assert_eq!(obj.get("iggy_topic"), Some(&OwnedValue::from("test_topic"))); + assert_eq!(obj.get("iggy_partition_id"), Some(&OwnedValue::from(1u32))); + assert_eq!(obj.get("iggy_id"), Some(&OwnedValue::from("42"))); + assert_eq!(obj.get("iggy_offset"), Some(&OwnedValue::from(100u64))); + assert_eq!(obj.get("iggy_checksum"), Some(&OwnedValue::from(999u64))); + assert_eq!( + obj.get("iggy_timestamp"), + Some(&OwnedValue::from(1_700_000_000_000_000u64)) + ); + assert_eq!( + obj.get("iggy_origin_timestamp"), + Some(&OwnedValue::from(1_699_999_999_000_000u64)) + ); + assert_eq!(obj.len(), 8); + } + + // ── message_to_row_with_metadata tests ─────────────────────────────────── + + #[test] + fn given_json_message_should_build_row_with_metadata() { + let sink = test_sink(test_config()); + let message = test_json_message(simd_json::json!({"key": "value"})); + let topic_meta = test_topic_metadata(); + let msg_meta = test_messages_metadata(); + + let row = sink + .message_to_row_with_metadata(&message, &topic_meta, &msg_meta) + .unwrap(); + + assert_eq!(row.iggy_stream, "test_stream"); + assert_eq!(row.iggy_topic, "test_topic"); + assert_eq!(row.iggy_partition_id, 1); + assert_eq!(row.iggy_id, "42"); + assert_eq!(row.iggy_offset, 100); + assert_eq!(row.iggy_checksum, 999); + assert_eq!(row.iggy_timestamp, 1_700_000_000_000_000); + assert_eq!(row.iggy_origin_timestamp, 1_699_999_999_000_000); + + let parsed: serde_json::Value = serde_json::from_str(&row.payload).unwrap(); + assert_eq!(parsed["key"], "value"); + } + + #[test] + fn given_non_json_message_should_fail_row_with_metadata() { + let sink = test_sink(test_config()); + let message = ConsumedMessage { + id: 1, + offset: 0, + checksum: 0, + timestamp: 0, + origin_timestamp: 0, + headers: None, + payload: Payload::Raw(vec![1, 2, 3]), + }; + let topic_meta = test_topic_metadata(); + let msg_meta = test_messages_metadata(); + + let result = sink.message_to_row_with_metadata(&message, &topic_meta, &msg_meta); + assert!(matches!(result, Err(Error::InvalidPayloadType))); + } + + // ── message_to_row_without_metadata tests ──────────────────────────────── + + #[test] + fn given_json_message_should_build_row_without_metadata() { + let sink = test_sink(test_config()); + let message = test_json_message(simd_json::json!({"hello": "world"})); + + let row = sink.message_to_row_without_metadata(&message).unwrap(); + + let parsed: serde_json::Value = serde_json::from_str(&row.payload).unwrap(); + assert_eq!(parsed["hello"], "world"); + } + + #[test] + fn given_non_json_message_should_fail_row_without_metadata() { + let sink = test_sink(test_config()); + let message = ConsumedMessage { + id: 1, + offset: 0, + checksum: 0, + timestamp: 0, + origin_timestamp: 0, + headers: None, + payload: Payload::Raw(vec![1, 2, 3]), + }; + + let result = sink.message_to_row_without_metadata(&message); + assert!(matches!(result, Err(Error::InvalidPayloadType))); + } +} diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 13afb04dcf..36cb6c17b8 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -32,6 +32,7 @@ assert_cmd = { workspace = true } async-trait = { workspace = true } bon = { workspace = true } bytes = { workspace = true } +clickhouse = { workspace = true } compio = { workspace = true } configs = { workspace = true } configs_derive = { workspace = true } diff --git a/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs new file mode 100644 index 0000000000..7ace447f3e --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::TEST_MESSAGE_COUNT; +use crate::connectors::fixtures::{ + ClickHouseOps, ClickHouseSinkJsonFieldMappingsFixture, ClickHouseSinkJsonFixture, + ClickHouseSinkJsonWithMetadataFixture, ClickHouseSinkRowBinaryFixture, + ClickHouseSinkRowBinaryWithMetadataFixture, +}; +use crate::connectors::{TestMessage, create_test_messages}; +use bytes::Bytes; +use clickhouse::Row; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_binary_protocol::MessageClient; +use iggy_common::Identifier; +use integration::harness::seeds; +use integration::iggy_harness; +use serde::Deserialize; + +// ── Row types for querying ClickHouse ──────────────────────────────────────── + +#[derive(Debug, Row, Deserialize)] +struct PayloadOnlyRow { + id: u64, + name: String, + count: u32, + amount: f64, + active: bool, + timestamp: i64, +} + +#[allow(dead_code)] +#[derive(Debug, Row, Deserialize)] +struct MetadataRow { + id: u64, + name: String, + count: u32, + amount: f64, + active: bool, + timestamp: i64, + iggy_stream: String, + iggy_topic: String, + iggy_partition_id: u32, + iggy_id: String, + iggy_offset: u64, + iggy_checksum: u64, + iggy_timestamp: u64, + iggy_origin_timestamp: u64, +} + +#[derive(Debug, Row, Deserialize)] +struct FieldMappedRow { + msg_id: u64, + msg_name: String, + msg_amount: f64, +} + +#[derive(Debug, Row, Deserialize)] +struct RowBinaryPayloadRow { + payload: String, +} + +#[allow(dead_code)] +#[derive(Debug, Row, Deserialize)] +struct RowBinaryMetadataRow { + iggy_stream: String, + iggy_topic: String, + iggy_partition_id: u32, + iggy_id: String, + iggy_offset: u64, + iggy_checksum: u64, + iggy_timestamp: u64, + iggy_origin_timestamp: u64, + payload: String, +} + +// ── Helper: build and send test messages ───────────────────────────────────── + +async fn send_test_messages( + client: &impl MessageClient, + stream_id: &Identifier, + topic_id: &Identifier, + messages_data: &[TestMessage], +) { + let mut messages: Vec = messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + stream_id, + topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); +} + +// ── Test 1: JSON insert, no metadata ───────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn json_insert_stores_test_messages( + harness: &TestHarness, + fixture: ClickHouseSinkJsonFixture, +) { + let client = harness.root_client().await.unwrap(); + let ch_client = fixture.create_client(); + + fixture.wait_for_table(&ch_client, "iggy_sink").await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + send_test_messages(&client, &stream_id, &topic_id, &messages_data).await; + + let query = "SELECT id, name, count, amount, active, timestamp FROM iggy_sink ORDER BY id"; + let rows: Vec = fixture + .fetch_rows(&ch_client, query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows"); + + assert_eq!(rows.len(), TEST_MESSAGE_COUNT); + + for (i, row) in rows.iter().enumerate() { + assert_eq!(row.id, messages_data[i].id, "id mismatch at row {i}"); + assert_eq!(row.name, messages_data[i].name, "name mismatch at row {i}"); + assert_eq!( + row.count, messages_data[i].count, + "count mismatch at row {i}" + ); + assert!( + (row.amount - messages_data[i].amount).abs() < f64::EPSILON, + "amount mismatch at row {i}" + ); + assert_eq!( + row.active, messages_data[i].active, + "active mismatch at row {i}" + ); + assert_eq!( + row.timestamp, messages_data[i].timestamp, + "timestamp mismatch at row {i}" + ); + } +} + +// ── Test 2: JSON insert, with metadata ─────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn json_insert_with_metadata_stores_test_messages( + harness: &TestHarness, + fixture: ClickHouseSinkJsonWithMetadataFixture, +) { + let client = harness.root_client().await.unwrap(); + let ch_client = fixture.create_client(); + + fixture.wait_for_table(&ch_client, "iggy_sink_meta").await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + send_test_messages(&client, &stream_id, &topic_id, &messages_data).await; + + let query = "SELECT id, name, count, amount, active, timestamp, \ + iggy_stream, iggy_topic, iggy_partition_id, iggy_id, \ + iggy_offset, iggy_checksum, iggy_timestamp, iggy_origin_timestamp \ + FROM iggy_sink_meta ORDER BY iggy_offset"; + let rows: Vec = fixture + .fetch_rows(&ch_client, query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows"); + + assert_eq!(rows.len(), TEST_MESSAGE_COUNT); + + for (i, row) in rows.iter().enumerate() { + assert_eq!(row.id, messages_data[i].id, "id mismatch at row {i}"); + assert_eq!(row.name, messages_data[i].name, "name mismatch at row {i}"); + assert_eq!( + row.iggy_stream, + seeds::names::STREAM, + "stream mismatch at row {i}" + ); + assert_eq!( + row.iggy_topic, + seeds::names::TOPIC, + "topic mismatch at row {i}" + ); + assert_eq!(row.iggy_offset, i as u64, "offset mismatch at row {i}"); + } +} + +// ── Test 3: JSON insert, field mappings ────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink_field_mappings.toml")), + seed = seeds::connector_stream +)] +async fn json_insert_with_field_mappings_stores_mapped_fields( + harness: &TestHarness, + fixture: ClickHouseSinkJsonFieldMappingsFixture, +) { + let client = harness.root_client().await.unwrap(); + let ch_client = fixture.create_client(); + + fixture.wait_for_table(&ch_client, "iggy_sink_mapped").await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + send_test_messages(&client, &stream_id, &topic_id, &messages_data).await; + + let query = "SELECT msg_id, msg_name, msg_amount FROM iggy_sink_mapped ORDER BY msg_id"; + let rows: Vec = fixture + .fetch_rows(&ch_client, query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows"); + + assert_eq!(rows.len(), TEST_MESSAGE_COUNT); + + for (i, row) in rows.iter().enumerate() { + assert_eq!( + row.msg_id, messages_data[i].id, + "msg_id mismatch at row {i}" + ); + assert_eq!( + row.msg_name, messages_data[i].name, + "msg_name mismatch at row {i}" + ); + assert!( + (row.msg_amount - messages_data[i].amount).abs() < f64::EPSILON, + "msg_amount mismatch at row {i}" + ); + } +} + +// ── Test 4: RowBinary insert, no metadata ──────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn rowbinary_insert_stores_payload_as_string( + harness: &TestHarness, + fixture: ClickHouseSinkRowBinaryFixture, +) { + let client = harness.root_client().await.unwrap(); + let ch_client = fixture.create_client(); + + fixture.wait_for_table(&ch_client, "iggy_sink_rb").await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + send_test_messages(&client, &stream_id, &topic_id, &messages_data).await; + + let query = "SELECT payload FROM iggy_sink_rb"; + let rows: Vec = fixture + .fetch_rows(&ch_client, query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows"); + + assert_eq!(rows.len(), TEST_MESSAGE_COUNT); + + let mut stored: Vec = rows + .iter() + .map(|r| serde_json::from_str(&r.payload).expect("Failed to deserialize payload")) + .collect(); + stored.sort_by_key(|m| m.id); + + for (i, msg) in stored.iter().enumerate() { + assert_eq!(msg, &messages_data[i], "message data mismatch at row {i}"); + } +} + +// ── Test 5: RowBinary insert, with metadata ────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn rowbinary_insert_with_metadata_stores_test_messages( + harness: &TestHarness, + fixture: ClickHouseSinkRowBinaryWithMetadataFixture, +) { + let client = harness.root_client().await.unwrap(); + let ch_client = fixture.create_client(); + + fixture + .wait_for_table(&ch_client, "iggy_sink_rb_meta") + .await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + send_test_messages(&client, &stream_id, &topic_id, &messages_data).await; + + let query = "SELECT iggy_stream, iggy_topic, iggy_partition_id, iggy_id, \ + iggy_offset, iggy_checksum, iggy_timestamp, iggy_origin_timestamp, \ + payload \ + FROM iggy_sink_rb_meta ORDER BY iggy_offset"; + let rows: Vec = fixture + .fetch_rows(&ch_client, query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows"); + + assert_eq!(rows.len(), TEST_MESSAGE_COUNT); + + for (i, row) in rows.iter().enumerate() { + assert_eq!( + row.iggy_stream, + seeds::names::STREAM, + "stream mismatch at row {i}" + ); + assert_eq!( + row.iggy_topic, + seeds::names::TOPIC, + "topic mismatch at row {i}" + ); + assert_eq!(row.iggy_offset, i as u64, "offset mismatch at row {i}"); + + let stored: TestMessage = + serde_json::from_str(&row.payload).expect("Failed to deserialize payload"); + assert_eq!(stored, messages_data[i], "message data mismatch at row {i}"); + } +} diff --git a/core/integration/tests/connectors/clickhouse/mod.rs b/core/integration/tests/connectors/clickhouse/mod.rs new file mode 100644 index 0000000000..d68c8a6ce9 --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/mod.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod clickhouse_sink; + +const TEST_MESSAGE_COUNT: usize = 3; diff --git a/core/integration/tests/connectors/clickhouse/sink.toml b/core/integration/tests/connectors/clickhouse/sink.toml new file mode 100644 index 0000000000..df52653bf3 --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/clickhouse_sink" diff --git a/core/integration/tests/connectors/clickhouse/sink_field_mappings.toml b/core/integration/tests/connectors/clickhouse/sink_field_mappings.toml new file mode 100644 index 0000000000..3737bdef54 --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/sink_field_mappings.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/clickhouse_sink/field_mappings" diff --git a/core/integration/tests/connectors/fixtures/clickhouse/container.rs b/core/integration/tests/connectors/fixtures/clickhouse/container.rs new file mode 100644 index 0000000000..370270a7d7 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/container.rs @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use clickhouse::{Row, RowOwned, RowRead}; +use integration::harness::TestBinaryError; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, Mount, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; + +pub(super) const CLICKHOUSE_HTTP_PORT: u16 = 8123; +pub(super) const CLICKHOUSE_IMAGE_TAG: &str = "latest-alpine"; //"25.3-alpine"; +pub(super) const CLICKHOUSE_DEFAULT_USER: &str = "default"; +pub(super) const CLICKHOUSE_DEFAULT_PASSWORD: &str = ""; +pub(super) const DEFAULT_POLL_ATTEMPTS: usize = 100; +pub(super) const DEFAULT_POLL_INTERVAL_MS: u64 = 50; + +pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic"; + +// Env var constants — prefix: IGGY_CONNECTORS_SINK_CLICKHOUSE_ +pub(super) const ENV_SINK_URL: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_URL"; +pub(super) const ENV_SINK_TABLE: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_TABLE"; +pub(super) const ENV_SINK_INSERT_TYPE: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_INSERT_TYPE"; +pub(super) const ENV_SINK_INCLUDE_METADATA: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_INCLUDE_METADATA"; +pub(super) const ENV_SINK_COMPRESSION: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_COMPRESSION_ENABLED"; +pub(super) const ENV_SINK_STREAMS_0_STREAM: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_STREAM"; +pub(super) const ENV_SINK_STREAMS_0_TOPICS: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_TOPICS"; +pub(super) const ENV_SINK_STREAMS_0_SCHEMA: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_SCHEMA"; +pub(super) const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_CONSUMER_GROUP"; +pub(super) const ENV_SINK_AUTH_TYPE: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_AUTH_TYPE"; +pub(super) const ENV_SINK_USERNAME: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_USERNAME"; +pub(super) const ENV_SINK_PASSWORD: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_PASSWORD"; +pub(super) const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PATH"; + +/// Trait for ClickHouse fixtures with common container operations. +pub trait ClickHouseOps: Sync { + fn container(&self) -> &ClickHouseContainer; + + fn create_client(&self) -> clickhouse::Client { + self.container().create_client() + } + + fn wait_for_table( + &self, + client: &clickhouse::Client, + table: &str, + ) -> impl std::future::Future + Send { + async move { + let query = format!("SELECT 1 FROM {table} LIMIT 1"); + for _ in 0..DEFAULT_POLL_ATTEMPTS { + if client.query(&query).fetch_optional::().await.is_ok() { + return; + } + sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await; + } + panic!("Table {table} was not created in time"); + } + } + + fn fetch_rows( + &self, + client: &clickhouse::Client, + query: &str, + expected_count: usize, + ) -> impl std::future::Future, TestBinaryError>> + Send + where + T: Row + RowOwned + RowRead + Send, + { + async move { + let mut rows = Vec::new(); + for _ in 0..DEFAULT_POLL_ATTEMPTS { + if let Ok(fetched) = client.query(query).fetch_all::().await { + rows = fetched; + if rows.len() >= expected_count { + return Ok(rows); + } + } + sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await; + } + Err(TestBinaryError::InvalidState { + message: format!( + "Expected {} rows but got {} after {} poll attempts", + expected_count, + rows.len(), + DEFAULT_POLL_ATTEMPTS + ), + }) + } + } +} + +/// Base container management for ClickHouse fixtures. +pub struct ClickHouseContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub(super) url: String, +} + +const CONTAINER_START_RETRIES: usize = 3; + +impl ClickHouseContainer { + pub(super) async fn start() -> Result { + let mut last_err = None; + + for attempt in 1..=CONTAINER_START_RETRIES { + match Self::try_start().await { + Ok(container) => return Ok(container), + Err(e) => { + eprintln!( + "ClickHouse container start attempt {attempt}/{CONTAINER_START_RETRIES} failed: {e}" + ); + last_err = Some(e); + } + } + } + + Err(last_err.unwrap()) + } + + async fn try_start() -> Result { + let current_dir = std::env::current_dir().map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to get current dir: {e}"), + })?; + + let fixtures_dir = current_dir.join("tests/connectors/fixtures/clickhouse/image_configs"); + + let container = GenericImage::new("clickhouse/clickhouse-server", CLICKHOUSE_IMAGE_TAG) + .with_exposed_port(CLICKHOUSE_HTTP_PORT.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new("/") + .with_port(CLICKHOUSE_HTTP_PORT.tcp()) + .with_expected_status_code(200_u16), + )) + .with_mount(Mount::bind_mount( + fixtures_dir + .join("config.xml") + .to_string_lossy() + .to_string(), + "/etc/clickhouse-server/config.xml", + )) + .with_mount(Mount::bind_mount( + fixtures_dir.join("users.xml").to_string_lossy().to_string(), + "/etc/clickhouse-server/users.xml", + )) + .with_mount(Mount::bind_mount( + fixtures_dir + .join("default-user.xml") + .to_string_lossy() + .to_string(), + "/etc/clickhouse-server/users.d/default-user.xml", + )) + .with_mount(Mount::bind_mount( + fixtures_dir + .join("docker_related_config.xml") + .to_string_lossy() + .to_string(), + "/etc/clickhouse-server/config.d/docker_related_config.xml", + )) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + let host_port = container + .get_host_port_ipv4(CLICKHOUSE_HTTP_PORT) + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to get port: {e}"), + })?; + + let url = format!("http://localhost:{host_port}"); + + Ok(Self { container, url }) + } + + pub fn create_client(&self) -> clickhouse::Client { + clickhouse::Client::default() + .with_url(&self.url) + .with_user(CLICKHOUSE_DEFAULT_USER) + .with_password(CLICKHOUSE_DEFAULT_PASSWORD) + .with_compression(clickhouse::Compression::None) + } + + pub async fn execute_ddl( + &self, + client: &clickhouse::Client, + ddl: &str, + ) -> Result<(), TestBinaryError> { + client + .query(ddl) + .execute() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to execute DDL: {e}"), + }) + } +} diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml new file mode 100644 index 0000000000..38e13d83c6 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml @@ -0,0 +1,35 @@ + + + 0.0.0.0 + 8123 + + + users.xml + default + default + + /var/lib/clickhouse/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + /var/lib/clickhouse/access/ + + + debug + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + 1 + + + + /var/lib/clickhouse/format_schemas/ + + + users.xml + + + + + 1000000000000000000 + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml new file mode 100644 index 0000000000..8afb528da4 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml @@ -0,0 +1,11 @@ + + + + + + + ::/0 + + + + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml new file mode 100644 index 0000000000..d2167a8a28 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml @@ -0,0 +1,6 @@ + + + + 0.0.0.0 + 1 + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml new file mode 100644 index 0000000000..3b865dff6e --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml @@ -0,0 +1,33 @@ + + + + + random + + + + + + + + ::/0 + + default + default + 1 + + + + + + + 3600 + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/mod.rs b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs new file mode 100644 index 0000000000..3efcf7314e --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod container; +mod sink; + +pub use container::ClickHouseOps; +pub use sink::{ + ClickHouseSinkJsonFieldMappingsFixture, ClickHouseSinkJsonFixture, + ClickHouseSinkJsonWithMetadataFixture, ClickHouseSinkRowBinaryFixture, + ClickHouseSinkRowBinaryWithMetadataFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/clickhouse/sink.rs b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs new file mode 100644 index 0000000000..f0fd0be7fb --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::container::{ + CLICKHOUSE_DEFAULT_PASSWORD, CLICKHOUSE_DEFAULT_USER, ClickHouseContainer, ClickHouseOps, + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_AUTH_TYPE, ENV_SINK_COMPRESSION, + ENV_SINK_INCLUDE_METADATA, ENV_SINK_INSERT_TYPE, ENV_SINK_PASSWORD, ENV_SINK_PATH, + ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, ENV_SINK_STREAMS_0_STREAM, + ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_TABLE, ENV_SINK_URL, ENV_SINK_USERNAME, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use std::collections::HashMap; + +/// Builds the common env vars shared by all ClickHouse sink fixtures. +fn base_envs( + url: &str, + table: &str, + insert_type: &str, + include_metadata: bool, +) -> HashMap { + let mut envs = HashMap::new(); + envs.insert(ENV_SINK_URL.to_string(), url.to_string()); + envs.insert(ENV_SINK_TABLE.to_string(), table.to_string()); + envs.insert(ENV_SINK_INSERT_TYPE.to_string(), insert_type.to_string()); + envs.insert( + ENV_SINK_INCLUDE_METADATA.to_string(), + include_metadata.to_string(), + ); + envs.insert(ENV_SINK_COMPRESSION.to_string(), "false".to_string()); + envs.insert(ENV_SINK_AUTH_TYPE.to_string(), "credential".to_string()); + envs.insert( + ENV_SINK_USERNAME.to_string(), + CLICKHOUSE_DEFAULT_USER.to_string(), + ); + envs.insert( + ENV_SINK_PASSWORD.to_string(), + CLICKHOUSE_DEFAULT_PASSWORD.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", DEFAULT_TEST_TOPIC), + ); + envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + "test".to_string(), + ); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_clickhouse_sink".to_string(), + ); + envs +} + +// ── Table DDLs ─────────────────────────────────────────────────────────────── + +const DDL_JSON: &str = "\ + CREATE TABLE IF NOT EXISTS iggy_sink (\ + id UInt64,\ + name String,\ + count UInt32,\ + amount Float64,\ + active Bool,\ + timestamp Int64\ + ) ENGINE = MergeTree() ORDER BY id"; + +const DDL_JSON_META: &str = "\ + CREATE TABLE IF NOT EXISTS iggy_sink_meta (\ + id UInt64,\ + name String,\ + count UInt32,\ + amount Float64,\ + active Bool,\ + timestamp Int64,\ + iggy_stream String,\ + iggy_topic String,\ + iggy_partition_id UInt32,\ + iggy_id String,\ + iggy_offset UInt64,\ + iggy_checksum UInt64,\ + iggy_timestamp UInt64,\ + iggy_origin_timestamp UInt64\ + ) ENGINE = MergeTree() ORDER BY iggy_offset"; + +const DDL_FIELD_MAPPINGS: &str = "\ + CREATE TABLE IF NOT EXISTS iggy_sink_mapped (\ + msg_id UInt64,\ + msg_name String,\ + msg_amount Float64\ + ) ENGINE = MergeTree() ORDER BY msg_id"; + +const DDL_ROWBINARY: &str = "\ + CREATE TABLE IF NOT EXISTS iggy_sink_rb (\ + payload String\ + ) ENGINE = MergeTree() ORDER BY tuple()"; + +const DDL_ROWBINARY_META: &str = "\ + CREATE TABLE IF NOT EXISTS iggy_sink_rb_meta (\ + iggy_stream String,\ + iggy_topic String,\ + iggy_partition_id UInt32,\ + iggy_id String,\ + iggy_offset UInt64,\ + iggy_checksum UInt64,\ + iggy_timestamp UInt64,\ + iggy_origin_timestamp UInt64,\ + payload String\ + ) ENGINE = MergeTree() ORDER BY iggy_offset"; + +// ── Fixture A: JSON insert, no metadata ────────────────────────────────────── + +pub struct ClickHouseSinkJsonFixture { + container: ClickHouseContainer, +} + +impl ClickHouseOps for ClickHouseSinkJsonFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkJsonFixture { + async fn setup() -> Result { + let container = ClickHouseContainer::start().await?; + let client = container.create_client(); + container.execute_ddl(&client, DDL_JSON).await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + base_envs(&self.container.url, "iggy_sink", "json", false) + } +} + +// ── Fixture B: JSON insert, with metadata ──────────────────────────────────── + +pub struct ClickHouseSinkJsonWithMetadataFixture { + container: ClickHouseContainer, +} + +impl ClickHouseOps for ClickHouseSinkJsonWithMetadataFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkJsonWithMetadataFixture { + async fn setup() -> Result { + let container = ClickHouseContainer::start().await?; + let client = container.create_client(); + container.execute_ddl(&client, DDL_JSON_META).await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + base_envs(&self.container.url, "iggy_sink_meta", "json", true) + } +} + +// ── Fixture C: JSON insert, field mappings ─────────────────────────────────── + +pub struct ClickHouseSinkJsonFieldMappingsFixture { + container: ClickHouseContainer, +} + +impl ClickHouseOps for ClickHouseSinkJsonFieldMappingsFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkJsonFieldMappingsFixture { + async fn setup() -> Result { + let container = ClickHouseContainer::start().await?; + let client = container.create_client(); + container.execute_ddl(&client, DDL_FIELD_MAPPINGS).await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + // field_mappings are baked into the TOML config — only override url/table/path/streams + base_envs(&self.container.url, "iggy_sink_mapped", "json", false) + } +} + +// ── Fixture D: RowBinary insert, no metadata ───────────────────────────────── + +pub struct ClickHouseSinkRowBinaryFixture { + container: ClickHouseContainer, +} + +impl ClickHouseOps for ClickHouseSinkRowBinaryFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkRowBinaryFixture { + async fn setup() -> Result { + let container = ClickHouseContainer::start().await?; + let client = container.create_client(); + container.execute_ddl(&client, DDL_ROWBINARY).await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + base_envs(&self.container.url, "iggy_sink_rb", "rowbinary", false) + } +} + +// ── Fixture E: RowBinary insert, with metadata ─────────────────────────────── + +pub struct ClickHouseSinkRowBinaryWithMetadataFixture { + container: ClickHouseContainer, +} + +impl ClickHouseOps for ClickHouseSinkRowBinaryWithMetadataFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkRowBinaryWithMetadataFixture { + async fn setup() -> Result { + let container = ClickHouseContainer::start().await?; + let client = container.create_client(); + container.execute_ddl(&client, DDL_ROWBINARY_META).await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + base_envs(&self.container.url, "iggy_sink_rb_meta", "rowbinary", true) + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 3c0ac0d93b..bc57e1429e 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -17,12 +17,18 @@ * under the License. */ +mod clickhouse; mod elasticsearch; mod iceberg; mod postgres; mod quickwit; mod wiremock; +pub use clickhouse::{ + ClickHouseOps, ClickHouseSinkJsonFieldMappingsFixture, ClickHouseSinkJsonFixture, + ClickHouseSinkJsonWithMetadataFixture, ClickHouseSinkRowBinaryFixture, + ClickHouseSinkRowBinaryWithMetadataFixture, +}; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; pub use postgres::{ diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index a85453d52c..8e134f172d 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -18,6 +18,7 @@ */ mod api; +mod clickhouse; mod elasticsearch; mod fixtures; mod http_config_provider; From 18d6ee4de1bfbfce0ae8930c2adf241fae8184f9 Mon Sep 17 00:00:00 2001 From: hemant Date: Mon, 9 Mar 2026 23:17:13 +0530 Subject: [PATCH 2/2] fix(connectors): fixed review comments #2891 - retry configuration with limits and delays, README updated for the same. - fix the log params in close() - pre-commit/ CI hooks fix --- DEPENDENCIES.md | 11 +++++++++++ .../sinks/clickhouse_sink/README.md | 6 +++--- .../src/clickhouse_inserter.rs | 18 ++++++++++++++++++ .../clickhouse_sink/src/generic_inserter.rs | 18 ++++++++++++++++++ .../sinks/clickhouse_sink/src/lib.rs | 2 ++ .../sinks/clickhouse_sink/src/sink.rs | 16 ++++++++++++---- .../clickhouse/image_configs/config.xml | 19 +++++++++++++++++++ .../clickhouse/image_configs/default-user.xml | 19 +++++++++++++++++++ .../image_configs/docker_related_config.xml | 19 +++++++++++++++++++ .../clickhouse/image_configs/users.xml | 19 +++++++++++++++++++ 10 files changed, 140 insertions(+), 7 deletions(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index e3f450a7e9..354dec2a43 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -113,6 +113,7 @@ blake2: 0.10.6, "Apache-2.0 OR MIT", blake3: 1.8.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0", block-buffer: 0.10.4, "Apache-2.0 OR MIT", block2: 0.6.2, "MIT", +bnum: 0.13.0, "Apache-2.0 OR MIT", bollard: 0.19.4, "Apache-2.0", bollard-buildkit-proto: 0.7.0, "Apache-2.0", bollard-stubs: 1.49.1-rc.28.4.0, "Apache-2.0", @@ -150,12 +151,16 @@ charming: 0.6.0, "Apache-2.0 OR MIT", charming_macros: 0.1.0, "Apache-2.0 OR MIT", chrono: 0.4.43, "Apache-2.0 OR MIT", cipher: 0.4.4, "Apache-2.0 OR MIT", +cityhash-rs: 1.0.1, "Apache-2.0 OR MIT", clang-sys: 1.8.1, "Apache-2.0", clap: 4.5.58, "Apache-2.0 OR MIT", clap_builder: 4.5.58, "Apache-2.0 OR MIT", clap_complete: 4.5.66, "Apache-2.0 OR MIT", clap_derive: 4.5.55, "Apache-2.0 OR MIT", clap_lex: 1.0.0, "Apache-2.0 OR MIT", +clickhouse: 0.14.2, "Apache-2.0 OR MIT", +clickhouse-macros: 0.3.0, "Apache-2.0 OR MIT", +clickhouse-types: 0.1.1, "Apache-2.0 OR MIT", clock: 0.1.0, "N/A", cmake: 0.1.57, "Apache-2.0 OR MIT", cobs: 0.3.0, "Apache-2.0 OR MIT", @@ -402,6 +407,7 @@ heapless: 0.7.17, "Apache-2.0 OR MIT", heck: 0.5.0, "Apache-2.0 OR MIT", hermit-abi: 0.5.2, "Apache-2.0 OR MIT", hex: 0.4.3, "Apache-2.0 OR MIT", +higher-kinded-types: 0.2.1, "Apache-2.0 OR MIT OR Zlib", hkdf: 0.12.4, "Apache-2.0 OR MIT", hmac: 0.12.1, "Apache-2.0 OR MIT", home: 0.5.12, "Apache-2.0 OR MIT", @@ -447,6 +453,7 @@ iggy-connectors: 0.3.1-edge.1, "Apache-2.0", iggy-mcp: 0.3.1-edge.1, "Apache-2.0", iggy_binary_protocol: 0.9.1-edge.1, "Apache-2.0", iggy_common: 0.9.1-edge.1, "Apache-2.0", +iggy_connector_clickhouse_sink: 0.1.0, "Apache-2.0", iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0", iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0", iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0", @@ -541,9 +548,12 @@ logos-derive: 0.15.1, "Apache-2.0 OR MIT", loom: 0.7.2, "MIT", loop9: 0.1.5, "MIT", lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib", +lz4_flex: 0.11.5, "MIT", lz4_flex: 0.12.0, "MIT", macro_rules_attribute: 0.1.3, "MIT", +macro_rules_attribute: 0.2.2, "Apache-2.0 OR MIT OR Zlib", macro_rules_attribute-proc_macro: 0.1.3, "MIT", +macro_rules_attribute-proc_macro: 0.2.2, "Apache-2.0 OR MIT OR Zlib", matchers: 0.2.0, "MIT", matchit: 0.8.4, "BSD-3-Clause AND MIT", maybe-rayon: 0.1.1, "MIT", @@ -663,6 +673,7 @@ png: 0.17.16, "Apache-2.0 OR MIT", png: 0.18.0, "Apache-2.0 OR MIT", polling: 3.11.0, "Apache-2.0 OR MIT", polonius-the-crab: 0.2.1, "Apache-2.0 OR MIT OR Zlib", +polonius-the-crab: 0.5.0, "Apache-2.0 OR MIT OR Zlib", polyval: 0.6.2, "Apache-2.0 OR MIT", portable-atomic: 1.13.1, "Apache-2.0 OR MIT", portable-atomic-util: 0.2.5, "Apache-2.0 OR MIT", diff --git a/core/connectors/sinks/clickhouse_sink/README.md b/core/connectors/sinks/clickhouse_sink/README.md index 6c9bccb14c..8fdd6484ae 100644 --- a/core/connectors/sinks/clickhouse_sink/README.md +++ b/core/connectors/sinks/clickhouse_sink/README.md @@ -372,11 +372,11 @@ include_metadata = true ### Automatic Retries -The connector automatically retries transient errors with exponential backoff. The retry delay is calculated as `base_delay * attempt_number` milliseconds. Configure with: +The connector automatically retries transient errors with exponential backoff. The retry delay is calculated as `base_delay * 2^attempt` milliseconds. Configure with: - `retry` (default: `true`) - Enable/disable retries -- `max_retry` (default: `3`) - Maximum retry attempts -- `base_delay` (default: `500ms`) - Base delay between retries +- `max_retry` (default: `3`) - Maximum retry attempts; the connector will fail to open at startup if this exceeds `10` +- `base_delay` (default: `500ms`) - Base delay between retries; the computed delay is capped at 15 minutes to prevent unbounded waits Non-transient errors fail immediately without retrying. diff --git a/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs b/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs index 79c17b90a8..be46911e54 100644 --- a/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs +++ b/core/connectors/sinks/clickhouse_sink/src/clickhouse_inserter.rs @@ -1,3 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + use std::mem; use tokio::time::Duration; diff --git a/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs b/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs index 75f80f4650..b0d8b9d46c 100644 --- a/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs +++ b/core/connectors/sinks/clickhouse_sink/src/generic_inserter.rs @@ -1,3 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + use crate::clickhouse_inserter::Quantities; use clickhouse::Row; use iggy_connector_sdk::ConsumedMessage; diff --git a/core/connectors/sinks/clickhouse_sink/src/lib.rs b/core/connectors/sinks/clickhouse_sink/src/lib.rs index f17a4631d2..c7538b9bf9 100644 --- a/core/connectors/sinks/clickhouse_sink/src/lib.rs +++ b/core/connectors/sinks/clickhouse_sink/src/lib.rs @@ -43,7 +43,9 @@ impl ClickHouseSinkDefaults { pub const CHUNK_SIZE: usize = 1024 * 1024; //1MB pub const RETRY: bool = true; pub const MAX_RETRY: u32 = 3; + pub const MAX_RETRY_LIMIT: u32 = 10; pub const RETRY_BASE_DELAY: u64 = 500; //500 milliseconds + pub const MAX_RETRY_DELAY_MS: u64 = 900_000; // 15 minutes in milliseconds pub const INCLUDE_METADATA: bool = false; pub const VERBOSE_LOGGING: bool = false; } diff --git a/core/connectors/sinks/clickhouse_sink/src/sink.rs b/core/connectors/sinks/clickhouse_sink/src/sink.rs index 600688afcc..06be680554 100644 --- a/core/connectors/sinks/clickhouse_sink/src/sink.rs +++ b/core/connectors/sinks/clickhouse_sink/src/sink.rs @@ -21,8 +21,8 @@ use std::time::Duration; use crate::clickhouse_client::ClickHouseClient; use crate::generic_inserter; use crate::{ - ClickHouseSink, ClickHouseSinkConfig, InsertType, MessageRowWithMetadata, - MessageRowWithoutMetadata, + ClickHouseSink, ClickHouseSinkConfig, ClickHouseSinkDefaults, InsertType, + MessageRowWithMetadata, MessageRowWithoutMetadata, }; use async_trait::async_trait; use clickhouse::error::Error as ChError; @@ -33,6 +33,9 @@ use tracing::{debug, error, info, warn}; #[async_trait] impl Sink for ClickHouseSink { async fn open(&mut self) -> Result<(), Error> { + if self.config.max_retry() > ClickHouseSinkDefaults::MAX_RETRY_LIMIT { + return Err(Error::InvalidConfig); + } let clickhouse_client = ClickHouseClient::init(self.config.clone()) .map_err(|e| Error::InitError(e.to_string()))?; self.client = Some(clickhouse_client); @@ -55,7 +58,10 @@ impl Sink for ClickHouseSink { let state = self.state.lock().await; info!( "ClickHouse sink ID: {} processed {} messages with {} batch attempt failures and with {} batch failures", - self.id, state.messages_processed, state.insert_batch_failed, state.insert_batch_failed + self.id, + state.messages_processed, + state.insert_attempt_failed, + state.insert_batch_failed ); Ok(()) } @@ -195,7 +201,9 @@ impl ClickHouseSink { } //TODO: is_retryable check -- done Err(ch_err) if retry && retry_count < max_retries && is_retryable(&ch_err) => { - let delay = base_delay * 2u32.pow(retry_count); + let max_delay = + Duration::from_millis(ClickHouseSinkDefaults::MAX_RETRY_DELAY_MS); + let delay = (base_delay * 2u32.pow(retry_count)).min(max_delay); warn!( "Failed to write messages (attempt {}): {}. Retrying in {:?}...", retry_count + 1, diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml index 38e13d83c6..1c43bc0d60 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/config.xml @@ -1,3 +1,22 @@ + + 0.0.0.0 diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml index 8afb528da4..f6c9c6fa23 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/default-user.xml @@ -1,3 +1,22 @@ + + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml index d2167a8a28..21377faa16 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/docker_related_config.xml @@ -1,3 +1,22 @@ + + diff --git a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml index 3b865dff6e..d035368e0e 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml +++ b/core/integration/tests/connectors/fixtures/clickhouse/image_configs/users.xml @@ -1,3 +1,22 @@ + +