From 4c2ae737ebfead62b7ea5d52fb7d1698862f840e Mon Sep 17 00:00:00 2001 From: kriti-sc Date: Sat, 7 Mar 2026 01:07:12 +0530 Subject: [PATCH 1/3] add clickhouse sink --- Cargo.lock | 17 + Cargo.toml | 1 + .../connectors/clickhouse_sink.toml | 56 + .../sinks/clickhouse_sink/Cargo.toml | 48 + .../sinks/clickhouse_sink/README.md | 164 +++ .../sinks/clickhouse_sink/config.toml | 56 + .../sinks/clickhouse_sink/src/binary.rs | 1095 +++++++++++++++++ .../sinks/clickhouse_sink/src/body.rs | 286 +++++ .../sinks/clickhouse_sink/src/client.rs | 322 +++++ .../sinks/clickhouse_sink/src/lib.rs | 286 +++++ .../sinks/clickhouse_sink/src/schema.rs | 688 +++++++++++ .../sinks/clickhouse_sink/src/sink.rs | 153 +++ .../connectors/clickhouse/clickhouse_sink.rs | 315 +++++ .../tests/connectors/clickhouse/mod.rs | 22 + .../tests/connectors/clickhouse/sink.toml | 20 + .../fixtures/clickhouse/container.rs | 297 +++++ .../connectors/fixtures/clickhouse/mod.rs | 25 + .../connectors/fixtures/clickhouse/sink.rs | 247 ++++ .../tests/connectors/fixtures/mod.rs | 2 + core/integration/tests/connectors/mod.rs | 1 + 20 files changed, 4101 insertions(+) create mode 100644 core/connectors/runtime/example_config/connectors/clickhouse_sink.toml create mode 100644 core/connectors/sinks/clickhouse_sink/Cargo.toml create mode 100644 core/connectors/sinks/clickhouse_sink/README.md create mode 100644 core/connectors/sinks/clickhouse_sink/config.toml create mode 100644 core/connectors/sinks/clickhouse_sink/src/binary.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/body.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/client.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/lib.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/schema.rs create mode 100644 core/connectors/sinks/clickhouse_sink/src/sink.rs create mode 100644 core/integration/tests/connectors/clickhouse/clickhouse_sink.rs create mode 100644 core/integration/tests/connectors/clickhouse/mod.rs create mode 100644 core/integration/tests/connectors/clickhouse/sink.toml create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/container.rs create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/mod.rs create mode 100644 core/integration/tests/connectors/fixtures/clickhouse/sink.rs diff --git a/Cargo.lock b/Cargo.lock index 9bd245b86e..26c30bb645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5266,6 +5266,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_clickhouse_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "dashmap", + "humantime", + "iggy_connector_sdk", + "once_cell", + "reqwest", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", +] + [[package]] name = "iggy_connector_elasticsearch_sink" version = "0.3.1-edge.1" diff --git a/Cargo.toml b/Cargo.toml index d2bbfe5670..63502ff7da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "core/configs_derive", "core/connectors/runtime", "core/connectors/sdk", + "core/connectors/sinks/clickhouse_sink", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/postgres_sink", diff --git a/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml b/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml new file mode 100644 index 0000000000..9290b2f3d5 --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "target/release/libiggy_connector_clickhouse_sink" +verbose = false + +[[streams]] +stream = "qw" +topics = ["records"] +schema = "json" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "clickhouse_sink_connector" + +[plugin_config] +url = "http://localhost:8123" +database = "default" +username = "default" +password = "" +table = "users" + +# Insert format: "json_each_row" (default), "row_binary", or "string" +# json_each_row — accepts Payload::Json; ClickHouse handles type coercion +# row_binary — accepts Payload::Json; connector validates + serialises +# to RowBinaryWithDefaults (table must exist, schema is +# fetched at startup; fails if table has unsupported types) +# string — accepts Payload::Text; raw passthrough +insert_format = "json_each_row" + +# Only relevant when insert_format = "string": +# "json_each_row" (default), "csv", or "tsv" +# string_format = "csv" + +timeout_seconds = 30 +max_retries = 3 +retry_delay = "1s" +verbose_logging = false \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/Cargo.toml b/core/connectors/sinks/clickhouse_sink/Cargo.toml new file mode 100644 index 0000000000..28b013c348 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/Cargo.toml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_clickhouse_sink" +version = "0.1.0" +description = "Iggy ClickHouse sink connector for streaming messages into ClickHouse" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "clickhouse", "sink"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +humantime = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/README.md b/core/connectors/sinks/clickhouse_sink/README.md new file mode 100644 index 0000000000..13c51f6ba8 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/README.md @@ -0,0 +1,164 @@ +# ClickHouse Sink Connector + +The ClickHouse sink connector consumes messages from Iggy topics and inserts them into ClickHouse tables. Supports three insert formats: `json_each_row` (default), `row_binary`, and `string` passthrough. + +## Features + +- **Multiple Insert Formats**: Insert as `JSONEachRow`, `RowBinaryWithDefaults`, or raw string passthrough (CSV/TSV/JSON) +- **Schema Validation**: In `row_binary` mode, the table schema is fetched and validated at startup +- **Automatic Retries**: Configurable retry count and delay for transient errors +- **Batch Processing**: Insert messages in configurable batches via the stream configuration + +## Configuration + +```toml +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "target/release/libiggy_connector_clickhouse_sink" + +[[streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "clickhouse_sink_connector" + +[plugin_config] +url = "http://localhost:8123" +database = "default" +username = "default" +password = "" +table = "events" +insert_format = "json_each_row" +timeout_seconds = 30 +max_retries = 3 +retry_delay = "1s" +verbose_logging = false +``` + +## Configuration Options + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `url` | string | required | ClickHouse HTTP endpoint | +| `table` | string | required | Target table name | +| `database` | string | `"default"` | ClickHouse database | +| `username` | string | `"default"` | ClickHouse username | +| `password` | string | `""` | ClickHouse password | +| `insert_format` | string | `"json_each_row"` | Insert format: `json_each_row`, `row_binary`, or `string` | +| `string_format` | string | `"json_each_row"` | ClickHouse format for `string` mode: `json_each_row`, `csv`, or `tsv` | +| `timeout_seconds` | u64 | `30` | HTTP request timeout | +| `max_retries` | u32 | `3` | Max retry attempts on transient errors | +| `retry_delay` | string | `"1s"` | Delay between retries (e.g. `500ms`, `2s`) | +| `verbose_logging` | bool | `false` | Log inserts at info level instead of debug | + +## Insert Formats + +### `json_each_row` (Default) + +Accepts messages with a `Payload::Json` payload. Each message is sent as a JSON object on its own line using ClickHouse's `JSONEachRow` format. ClickHouse handles type coercion from the JSON values to the column types, so the table can have any schema. + +```toml +[plugin_config] +url = "http://localhost:8123" +table = "events" +insert_format = "json_each_row" +``` + +### `row_binary` + +Accepts messages with a `Payload::Json` payload. At startup the connector fetches the table schema from `system.columns` and validates that all column types are supported. Messages are then serialised to ClickHouse's `RowBinaryWithDefaults` binary format, which is more efficient than JSON for large volumes. + +The table must already exist. Columns with a `DEFAULT` or `MATERIALIZED` expression can be omitted from the message — the connector will emit a `0x01` prefix byte to signal that the default should be used. + +**Supported types:** all integer and float primitives, `String`, `FixedString(n)`, `Bool`/`Boolean`, `UUID`, `Date`, `Date32`, `DateTime`, `DateTime64(p)`, `Decimal`, `IPv4`, `IPv6`, `Enum8`, `Enum16`, and the composites `Nullable(T)`, `Array(T)`, `Map(K, V)`, `Tuple(...)`. + +**Unsupported types** (cause startup to fail): `LowCardinality`, `Variant`, `JSON` (native column type), and geo types. + +```toml +[plugin_config] +url = "http://localhost:8123" +table = "events" +insert_format = "row_binary" +``` + +### `string` + +Accepts messages with a `Payload::Text` payload and passes them through to ClickHouse without modification. Use `string_format` to tell ClickHouse which format to expect. + +```toml +[plugin_config] +url = "http://localhost:8123" +table = "events" +insert_format = "string" +string_format = "csv" # or "tsv" or "json_each_row" +``` + +## Example Configs + +### JSON Events + +```toml +[[streams]] +stream = "events" +topics = ["user_events"] +schema = "json" +batch_length = 500 +poll_interval = "10ms" +consumer_group = "clickhouse_sink" + +[plugin_config] +url = "http://localhost:8123" +database = "analytics" +table = "user_events" +insert_format = "json_each_row" +``` + +### High-Throughput with RowBinary + +```toml +[[streams]] +stream = "metrics" +topics = ["app_metrics"] +schema = "json" +batch_length = 5000 +poll_interval = "5ms" +consumer_group = "clickhouse_sink" + +[plugin_config] +url = "http://localhost:8123" +database = "telemetry" +table = "metrics" +insert_format = "row_binary" +max_retries = 5 +retry_delay = "500ms" +verbose_logging = true +``` + +### CSV Passthrough + +```toml +[[streams]] +stream = "exports" +topics = ["csv_data"] +schema = "text" +batch_length = 1000 +poll_interval = "50ms" +consumer_group = "clickhouse_sink" + +[plugin_config] +url = "http://localhost:8123" +table = "raw_imports" +insert_format = "string" +string_format = "csv" +``` + +## Reliability + +The connector retries failed inserts up to `max_retries` times with a fixed delay of `retry_delay` between attempts. The delay is applied as-is on each attempt (not exponential backoff). Non-retryable errors fail immediately. + +On shutdown the connector logs the total number of messages processed and errors encountered. diff --git a/core/connectors/sinks/clickhouse_sink/config.toml b/core/connectors/sinks/clickhouse_sink/config.toml new file mode 100644 index 0000000000..2a9a3aafcd --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/config.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "clickhouse" +enabled = true +version = 0 +name = "ClickHouse sink" +path = "target/release/libiggy_connector_clickhouse_sink" +verbose = false + +[[streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "clickhouse_sink_connector" + +[plugin_config] +url = "http://localhost:8123" +database = "default" +username = "default" +password = "" +table = "events" + +# Insert format: "json_each_row" (default), "row_binary", or "string" +# json_each_row — accepts Payload::Json; ClickHouse handles type coercion +# row_binary — accepts Payload::Json; connector validates + serialises +# to RowBinaryWithDefaults (table must exist, schema is +# fetched at startup; fails if table has unsupported types) +# string — accepts Payload::Text; raw passthrough +insert_format = "json_each_row" + +# Only relevant when insert_format = "string": +# "json_each_row" (default), "csv", or "tsv" +# string_format = "csv" + +timeout_seconds = 30 +max_retries = 3 +retry_delay = "1s" +verbose_logging = false \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/binary.rs b/core/connectors/sinks/clickhouse_sink/src/binary.rs new file mode 100644 index 0000000000..82ef9801c4 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/binary.rs @@ -0,0 +1,1095 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! RowBinary / RowBinaryWithDefaults byte serialization. +//! +//! Follows the ClickHouse binary format specification: +//! +//! +//! Key layout rules: +//! - All integers are **little-endian**. +//! - Strings are prefixed with an **unsigned LEB128 varint** length. +//! - `Nullable(T)`: 1-byte null marker (`0x01` = null, `0x00` = not null) +//! followed by T bytes when not null. +//! - `RowBinaryWithDefaults`: each top-level column is preceded by a 1-byte +//! flag (`0x01` = use server DEFAULT, `0x00` = value follows). + +use crate::schema::{ChType, Column}; +use iggy_connector_sdk::Error; +use simd_json::OwnedValue; +use simd_json::prelude::{TypedScalarValue, ValueAsArray, ValueAsObject}; +use tracing::error; + +// ─── Public API ────────────────────────────────────────────────────────────── + +/// Serialise one message (a JSON object) as a RowBinaryWithDefaults row. +/// +/// Columns are written in schema order. When a column is absent from the JSON +/// object and `has_default` is true the DEFAULT prefix byte (`0x01`) is +/// written and the column value is skipped. When a column is absent but has no +/// default and is not Nullable this is an error. +pub fn serialize_row( + value: &OwnedValue, + columns: &[Column], + buf: &mut Vec, +) -> Result<(), Error> { + let obj = value.as_object().ok_or_else(|| { + error!("RowBinary: message payload is not a JSON object"); + Error::InvalidRecord + })?; + + for col in columns { + let field_value = obj.get(col.name.as_str()); + + // RowBinaryWithDefaults prefix byte + let is_null_or_absent = field_value.map(|v| v.is_null()).unwrap_or(true); + if is_null_or_absent && col.has_default { + buf.push(0x01); // use DEFAULT + continue; + } + buf.push(0x00); // value follows + + match field_value { + Some(v) if !v.is_null() => serialize_value(v, &col.ch_type, buf)?, + _ => { + // Field is absent or null — write zero value if Nullable, + // otherwise error. + write_zero_or_null(&col.ch_type, buf, &col.name)?; + } + } + } + Ok(()) +} + +// ─── Core recursive serializer ──────────────────────────────────────────────── + +pub(crate) fn serialize_value( + value: &OwnedValue, + ch_type: &ChType, + buf: &mut Vec, +) -> Result<(), Error> { + match ch_type { + // ── Nullable ───────────────────────────────────────────────────────── + ChType::Nullable(inner) => { + if value.is_null() { + buf.push(0x01); // null + } else { + buf.push(0x00); // not null + serialize_value(value, inner, buf)?; + } + } + + // ── String ─────────────────────────────────────────────────────────── + ChType::String => { + let s = coerce_to_string(value)?; + write_string(s.as_bytes(), buf); + } + ChType::FixedString(n) => { + let s = coerce_to_string(value)?; + let bytes = s.as_bytes(); + // Pad or truncate to exactly n bytes + let mut fixed = vec![0u8; *n]; + let copy_len = bytes.len().min(*n); + fixed[..copy_len].copy_from_slice(&bytes[..copy_len]); + buf.extend_from_slice(&fixed); + } + + // ── Integers ───────────────────────────────────────────────────────── + ChType::Int8 => buf.push(coerce_i64(value)? as i8 as u8), + ChType::Int16 => buf.extend_from_slice(&(coerce_i64(value)? as i16).to_le_bytes()), + ChType::Int32 => buf.extend_from_slice(&(coerce_i64(value)? as i32).to_le_bytes()), + ChType::Int64 => buf.extend_from_slice(&coerce_i64(value)?.to_le_bytes()), + ChType::UInt8 => buf.push(coerce_u64(value)? as u8), + ChType::UInt16 => buf.extend_from_slice(&(coerce_u64(value)? as u16).to_le_bytes()), + ChType::UInt32 => buf.extend_from_slice(&(coerce_u64(value)? as u32).to_le_bytes()), + ChType::UInt64 => buf.extend_from_slice(&coerce_u64(value)?.to_le_bytes()), + + // ── Floats ─────────────────────────────────────────────────────────── + ChType::Float32 => { + let f = coerce_f64(value)? as f32; + buf.extend_from_slice(&f.to_le_bytes()); + } + ChType::Float64 => { + let f = coerce_f64(value)?; + buf.extend_from_slice(&f.to_le_bytes()); + } + + // ── Boolean ────────────────────────────────────────────────────────── + ChType::Boolean => { + let b = match value { + OwnedValue::Static(simd_json::StaticNode::Bool(b)) => *b, + OwnedValue::Static(simd_json::StaticNode::I64(n)) => *n != 0, + OwnedValue::Static(simd_json::StaticNode::U64(n)) => *n != 0, + _ => { + error!("Cannot convert to Boolean: {value:?}"); + return Err(Error::InvalidRecord); + } + }; + buf.push(b as u8); + } + + // ── UUID ───────────────────────────────────────────────────────────── + // ClickHouse stores UUID as two little-endian 64-bit words. + // Input: standard hyphenated UUID string "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" + ChType::Uuid => { + let s = coerce_to_string(value)?; + let hex: String = s.chars().filter(|c| c.is_ascii_hexdigit()).collect(); + if hex.len() != 32 { + error!("Invalid UUID string: {s}"); + return Err(Error::InvalidRecord); + } + let bytes = hex::decode(&hex).map_err(|_| { + error!("Cannot decode UUID hex: {hex}"); + Error::InvalidRecord + })?; + // ClickHouse UUID layout: first 8 bytes reversed, second 8 bytes reversed + let mut uuid_buf = [0u8; 16]; + uuid_buf[..8].copy_from_slice(&bytes[..8]); + uuid_buf[8..].copy_from_slice(&bytes[8..]); + uuid_buf[..8].reverse(); + uuid_buf[8..].reverse(); + buf.extend_from_slice(&uuid_buf); + } + + // ── Date types ─────────────────────────────────────────────────────── + ChType::Date => { + // Days since 1970-01-01 as UInt16. Accept integer or "YYYY-MM-DD". + let days = coerce_to_days(value)? as u16; + buf.extend_from_slice(&days.to_le_bytes()); + } + ChType::Date32 => { + let days = coerce_to_days(value)? as i32; + buf.extend_from_slice(&days.to_le_bytes()); + } + ChType::DateTime => { + // Unix seconds as UInt32. Accept integer or RFC 3339 string. + let secs = coerce_to_unix_seconds(value)? as u32; + buf.extend_from_slice(&secs.to_le_bytes()); + } + ChType::DateTime64(precision) => { + // Unix time scaled by 10^precision as Int64. + let secs_f64 = coerce_to_unix_seconds_f64(value)?; + let scale = 10i64.pow(*precision as u32) as f64; + let scaled = (secs_f64 * scale).round() as i64; + buf.extend_from_slice(&scaled.to_le_bytes()); + } + + // ── Decimal ────────────────────────────────────────────────────────── + ChType::Decimal(precision, scale) => { + let f = coerce_f64(value)?; + let scale_factor = 10f64.powi(*scale as i32); + let int_val = (f * scale_factor).round() as i128; + if *precision <= 9 { + buf.extend_from_slice(&(int_val as i32).to_le_bytes()); + } else if *precision <= 18 { + buf.extend_from_slice(&(int_val as i64).to_le_bytes()); + } else { + // Int128: two little-endian 64-bit words, low word first + let lo = int_val as i64; + let hi = (int_val >> 64) as i64; + buf.extend_from_slice(&lo.to_le_bytes()); + buf.extend_from_slice(&hi.to_le_bytes()); + } + } + + // ── IP addresses ───────────────────────────────────────────────────── + ChType::IPv4 => { + let s = coerce_to_string(value)?; + let addr: std::net::Ipv4Addr = s.parse().map_err(|_| { + error!("Invalid IPv4 address: {s}"); + Error::InvalidRecord + })?; + buf.extend_from_slice(&addr.octets()); // big-endian + } + ChType::IPv6 => { + let s = coerce_to_string(value)?; + let addr: std::net::Ipv6Addr = s.parse().map_err(|_| { + error!("Invalid IPv6 address: {s}"); + Error::InvalidRecord + })?; + buf.extend_from_slice(&addr.octets()); // big-endian + } + + // ── Enums ──────────────────────────────────────────────────────────── + ChType::Enum8(map) => { + let s = coerce_to_string(value)?; + let v = map.get(s.as_str()).ok_or_else(|| { + error!("Unknown Enum8 value: {s}"); + Error::InvalidRecord + })?; + buf.push(*v as u8); + } + ChType::Enum16(map) => { + let s = coerce_to_string(value)?; + let v = map.get(s.as_str()).ok_or_else(|| { + error!("Unknown Enum16 value: {s}"); + Error::InvalidRecord + })?; + buf.extend_from_slice(&(*v as i16).to_le_bytes()); + } + + // ── Composites ─────────────────────────────────────────────────────── + ChType::Array(elem_type) => { + let arr = value.as_array().ok_or_else(|| { + error!("Expected JSON array for Array type, got: {value:?}"); + Error::InvalidRecord + })?; + write_varint(arr.len() as u64, buf); + for elem in arr { + serialize_value(elem, elem_type, buf)?; + } + } + ChType::Map(key_type, val_type) => { + let obj = value.as_object().ok_or_else(|| { + error!("Expected JSON object for Map type, got: {value:?}"); + Error::InvalidRecord + })?; + write_varint(obj.len() as u64, buf); + for (k, v) in obj { + // Map keys must be serialisable as the key type. JSON object + // keys are always strings, so we wrap them in OwnedValue::String. + let key_val = OwnedValue::String(k.clone()); + serialize_value(&key_val, key_type, buf)?; + serialize_value(v, val_type, buf)?; + } + } + ChType::Tuple(field_types) => { + // Tuples may arrive as JSON arrays (unnamed) or objects (named). + match value { + OwnedValue::Array(arr) => { + if arr.len() != field_types.len() { + error!( + "Tuple length mismatch: expected {}, got {}", + field_types.len(), + arr.len() + ); + return Err(Error::InvalidRecord); + } + for (elem, ft) in arr.iter().zip(field_types.iter()) { + serialize_value(elem, ft, buf)?; + } + } + OwnedValue::Object(obj) => { + // Named tuple: fields are matched by position (insertion order). + let values: Vec<&OwnedValue> = obj.values().collect(); + if values.len() != field_types.len() { + error!( + "Tuple length mismatch: expected {}, got {}", + field_types.len(), + values.len() + ); + return Err(Error::InvalidRecord); + } + for (v, ft) in values.iter().zip(field_types.iter()) { + serialize_value(v, ft, buf)?; + } + } + other => { + error!("Expected JSON array or object for Tuple type, got: {other:?}"); + return Err(Error::InvalidRecord); + } + } + } + } + Ok(()) +} + +// ─── Low-level helpers ──────────────────────────────────────────────────────── + +/// Write a ClickHouse-style unsigned LEB128 varint (7 bits per byte, MSB = continuation). +pub fn write_varint(mut n: u64, buf: &mut Vec) { + loop { + let byte = (n & 0x7F) as u8; + n >>= 7; + if n == 0 { + buf.push(byte); + break; + } else { + buf.push(byte | 0x80); + } + } +} + +/// Write a string: varint length prefix + UTF-8 bytes. +fn write_string(bytes: &[u8], buf: &mut Vec) { + write_varint(bytes.len() as u64, buf); + buf.extend_from_slice(bytes); +} + +/// Write a zero / null value for a column that is absent and has no default. +/// Nullable columns get the null marker; non-nullable columns are an error. +fn write_zero_or_null(ch_type: &ChType, buf: &mut Vec, col_name: &str) -> Result<(), Error> { + match ch_type { + ChType::Nullable(_) => { + buf.push(0x01); // null + Ok(()) + } + _ => { + error!( + "Column '{col_name}' is non-nullable with no default, but is absent from the message" + ); + Err(Error::InvalidRecord) + } + } +} + +// ─── Value coercion helpers ─────────────────────────────────────────────────── + +fn coerce_i64(value: &OwnedValue) -> Result { + match value { + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n), + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n as i64), + OwnedValue::Static(simd_json::StaticNode::F64(f)) => Ok(*f as i64), + OwnedValue::String(s) => s.parse::().map_err(|_| { + error!("Cannot parse '{s}' as integer"); + Error::InvalidRecord + }), + other => { + error!("Cannot coerce {other:?} to integer"); + Err(Error::InvalidRecord) + } + } +} + +fn coerce_u64(value: &OwnedValue) -> Result { + match value { + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n), + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n as u64), + OwnedValue::Static(simd_json::StaticNode::F64(f)) => Ok(*f as u64), + OwnedValue::String(s) => s.parse::().map_err(|_| { + error!("Cannot parse '{s}' as unsigned integer"); + Error::InvalidRecord + }), + other => { + error!("Cannot coerce {other:?} to unsigned integer"); + Err(Error::InvalidRecord) + } + } +} + +fn coerce_f64(value: &OwnedValue) -> Result { + match value { + OwnedValue::Static(simd_json::StaticNode::F64(f)) => Ok(*f), + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n as f64), + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n as f64), + OwnedValue::String(s) => s.parse::().map_err(|_| { + error!("Cannot parse '{s}' as float"); + Error::InvalidRecord + }), + other => { + error!("Cannot coerce {other:?} to float"); + Err(Error::InvalidRecord) + } + } +} + +fn coerce_to_string(value: &OwnedValue) -> Result { + match value { + OwnedValue::String(s) => Ok(s.to_string()), + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(n.to_string()), + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(n.to_string()), + OwnedValue::Static(simd_json::StaticNode::F64(f)) => Ok(f.to_string()), + OwnedValue::Static(simd_json::StaticNode::Bool(b)) => Ok(b.to_string()), + other => { + error!("Cannot coerce {other:?} to string"); + Err(Error::InvalidRecord) + } + } +} + +/// Returns days since 1970-01-01. Accepts integer (days) or "YYYY-MM-DD" string. +fn coerce_to_days(value: &OwnedValue) -> Result { + match value { + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n), + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n as i64), + OwnedValue::String(s) => { + // Parse "YYYY-MM-DD" manually + let parts: Vec<&str> = s.splitn(3, '-').collect(); + if parts.len() != 3 { + error!("Invalid date string: {s}"); + return Err(Error::InvalidRecord); + } + let (y, m, d) = parse_ymd(parts[0], parts[1], parts[2]).map_err(|_| { + error!("Cannot parse date: {s}"); + Error::InvalidRecord + })?; + Ok(ymd_to_days(y, m, d)) + } + other => { + error!("Cannot coerce {other:?} to Date"); + Err(Error::InvalidRecord) + } + } +} + +/// Returns Unix seconds as f64 (fractional seconds for sub-second precision). +/// Accepts integer, float, or RFC 3339 / ISO 8601 string. +fn coerce_to_unix_seconds_f64(value: &OwnedValue) -> Result { + match value { + OwnedValue::Static(simd_json::StaticNode::I64(n)) => Ok(*n as f64), + OwnedValue::Static(simd_json::StaticNode::U64(n)) => Ok(*n as f64), + OwnedValue::Static(simd_json::StaticNode::F64(f)) => Ok(*f), + OwnedValue::String(s) => parse_datetime_string(s), + other => { + error!("Cannot coerce {other:?} to DateTime"); + Err(Error::InvalidRecord) + } + } +} + +/// Returns Unix seconds as i64 (truncates fractional seconds). +fn coerce_to_unix_seconds(value: &OwnedValue) -> Result { + Ok(coerce_to_unix_seconds_f64(value)? as i64) +} + +/// Parse "YYYY-MM-DDThh:mm:ss[.frac][Z|±hh:mm]" into Unix seconds (f64). +/// This is a minimal parser sufficient for common ISO 8601 / RFC 3339 formats. +fn parse_datetime_string(s: &str) -> Result { + // Split on 'T' or ' ' for date-time separator + let (date_part, time_part) = if let Some(idx) = s.find('T').or_else(|| s.find(' ')) { + (&s[..idx], &s[idx + 1..]) + } else { + // Date-only string — treat as midnight UTC + (s, "00:00:00") + }; + + let date_parts: Vec<&str> = date_part.splitn(3, '-').collect(); + if date_parts.len() != 3 { + error!("Cannot parse datetime string: {s}"); + return Err(Error::InvalidRecord); + } + let (y, m, d) = parse_ymd(date_parts[0], date_parts[1], date_parts[2]).map_err(|_| { + error!("Cannot parse date component of: {s}"); + Error::InvalidRecord + })?; + + // Strip timezone suffix and optional fractional seconds + let (time_no_tz, tz_offset_secs) = strip_timezone(time_part); + let frac_secs = parse_time(time_no_tz).map_err(|_| { + error!("Cannot parse time component of: {s}"); + Error::InvalidRecord + })?; + + let days = ymd_to_days(y, m, d) as f64; + Ok(days * 86400.0 + frac_secs - tz_offset_secs as f64) +} + +fn parse_ymd(y: &str, m: &str, d: &str) -> Result<(i32, u32, u32), ()> { + let year: i32 = y.trim().parse().map_err(|_| ())?; + let month: u32 = m.trim().parse().map_err(|_| ())?; + let day: u32 = d.trim().parse().map_err(|_| ())?; + Ok((year, month, day)) +} + +/// Days since Unix epoch for a proleptic Gregorian date (Gregorian calendar). +fn ymd_to_days(y: i32, m: u32, d: u32) -> i64 { + // Algorithm from https://www.tondering.dk/claus/cal/julperiod.php + let y = if m <= 2 { y as i64 - 1 } else { y as i64 }; + let m = m as i64; + let d = d as i64; + let era = if y >= 0 { y } else { y - 399 } / 400; + let yoe = y - era * 400; + let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1; + let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; + era * 146097 + doe - 719468 +} + +/// Parse "hh:mm:ss[.frac]" into total seconds (f64). +fn parse_time(s: &str) -> Result { + let parts: Vec<&str> = s.splitn(3, ':').collect(); + if parts.len() < 2 { + return Err(()); + } + let h: f64 = parts[0].parse().map_err(|_| ())?; + let min: f64 = parts[1].parse().map_err(|_| ())?; + let sec: f64 = parts.get(2).unwrap_or(&"0").parse().map_err(|_| ())?; + Ok(h * 3600.0 + min * 60.0 + sec) +} + +/// Strip a timezone suffix from a time string. Returns (time_without_tz, offset_seconds). +fn strip_timezone(s: &str) -> (&str, i64) { + if let Some(stripped) = s.strip_suffix('Z') { + return (stripped, 0); + } + // Look for ±hh:mm or ±hhmm suffix + #[allow(clippy::collapsible_if)] + for (sign, ch) in [(1i64, '+'), (-1i64, '-')] { + if let Some(pos) = s.rfind(ch) { + if pos > 0 { + let tz = &s[pos + 1..]; + let secs = if tz.contains(':') { + let parts: Vec<&str> = tz.splitn(2, ':').collect(); + if parts.len() == 2 { + if let (Ok(h), Ok(m)) = (parts[0].parse::(), parts[1].parse::()) { + Some(sign * (h * 3600 + m * 60)) + } else { + None + } + } else { + None + } + } else if tz.len() == 4 { + if let Ok(hhmm) = tz.parse::() { + Some(sign * ((hhmm / 100) * 3600 + (hhmm % 100) * 60)) + } else { + None + } + } else { + None + }; + if let Some(offset) = secs { + return (&s[..pos], offset); + } + } + } + } + (s, 0) +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::{ChType, Column}; + use simd_json::{OwnedValue, StaticNode}; + + fn json_str(s: &str) -> OwnedValue { + OwnedValue::String(s.into()) + } + fn json_i64(n: i64) -> OwnedValue { + OwnedValue::Static(StaticNode::I64(n)) + } + fn json_u64(n: u64) -> OwnedValue { + OwnedValue::Static(StaticNode::U64(n)) + } + fn json_f64(f: f64) -> OwnedValue { + OwnedValue::Static(StaticNode::F64(f)) + } + fn json_bool(b: bool) -> OwnedValue { + OwnedValue::Static(StaticNode::Bool(b)) + } + fn json_null() -> OwnedValue { + OwnedValue::Static(StaticNode::Null) + } + + fn col(name: &str, ch_type: ChType, has_default: bool) -> Column { + Column { + name: name.into(), + ch_type, + has_default, + } + } + + // ── varint ─────────────────────────────────────────────────────────────── + #[test] + fn varint_single_byte() { + let mut buf = vec![]; + write_varint(0, &mut buf); + assert_eq!(buf, [0x00]); + + buf.clear(); + write_varint(127, &mut buf); + assert_eq!(buf, [0x7F]); + } + + #[test] + fn varint_multi_byte() { + let mut buf = vec![]; + write_varint(128, &mut buf); + assert_eq!(buf, [0x80, 0x01]); + + buf.clear(); + write_varint(300, &mut buf); + assert_eq!(buf, [0xAC, 0x02]); + } + + // ── primitives ─────────────────────────────────────────────────────────── + #[test] + fn serialize_int32_little_endian() { + let mut buf = vec![]; + serialize_value(&json_i64(1000), &ChType::Int32, &mut buf).unwrap(); + assert_eq!(buf, 1000i32.to_le_bytes()); + } + + #[test] + fn serialize_uint64_little_endian() { + let mut buf = vec![]; + serialize_value(&json_u64(u64::MAX), &ChType::UInt64, &mut buf).unwrap(); + assert_eq!(buf, u64::MAX.to_le_bytes()); + } + + #[test] + fn serialize_float32() { + let mut buf = vec![]; + serialize_value(&json_f64(3.15), &ChType::Float32, &mut buf).unwrap(); + assert_eq!(buf, (3.15f64 as f32).to_le_bytes()); + } + + #[test] + fn serialize_float64() { + let mut buf = vec![]; + serialize_value(&json_f64(2.318281828), &ChType::Float64, &mut buf).unwrap(); + assert_eq!(buf, 2.318281828f64.to_le_bytes()); + } + + #[test] + fn serialize_boolean_true() { + let mut buf = vec![]; + serialize_value(&json_bool(true), &ChType::Boolean, &mut buf).unwrap(); + assert_eq!(buf, [0x01]); + } + + #[test] + fn serialize_boolean_false() { + let mut buf = vec![]; + serialize_value(&json_bool(false), &ChType::Boolean, &mut buf).unwrap(); + assert_eq!(buf, [0x00]); + } + + #[test] + fn serialize_boolean_from_nonzero_i64_is_true() { + let mut buf = vec![]; + serialize_value(&json_i64(1), &ChType::Boolean, &mut buf).unwrap(); + assert_eq!(buf, [0x01]); + } + + #[test] + fn serialize_boolean_from_zero_u64_is_false() { + let mut buf = vec![]; + serialize_value(&json_u64(0), &ChType::Boolean, &mut buf).unwrap(); + assert_eq!(buf, [0x00]); + } + + #[test] + fn serialize_string_with_varint_prefix() { + let mut buf = vec![]; + serialize_value(&json_str("hi"), &ChType::String, &mut buf).unwrap(); + // length=2 as varint, then "hi" + assert_eq!(buf, [0x02, b'h', b'i']); + } + + #[test] + fn serialize_fixed_string_pads_to_length() { + let mut buf = vec![]; + serialize_value(&json_str("ab"), &ChType::FixedString(4), &mut buf).unwrap(); + assert_eq!(buf, [b'a', b'b', 0x00, 0x00]); + } + + #[test] + fn serialize_fixed_string_truncates_to_length() { + let mut buf = vec![]; + serialize_value(&json_str("abcdef"), &ChType::FixedString(3), &mut buf).unwrap(); + assert_eq!(buf, [b'a', b'b', b'c']); + } + + // ── nullable ───────────────────────────────────────────────────────────── + #[test] + fn serialize_nullable_null_writes_marker() { + let mut buf = vec![]; + serialize_value( + &json_null(), + &ChType::Nullable(Box::new(ChType::Int32)), + &mut buf, + ) + .unwrap(); + assert_eq!(buf, [0x01]); + } + + #[test] + fn serialize_nullable_non_null_writes_zero_then_value() { + let mut buf = vec![]; + serialize_value( + &json_i64(42), + &ChType::Nullable(Box::new(ChType::Int32)), + &mut buf, + ) + .unwrap(); + let mut expected = vec![0x00u8]; + expected.extend_from_slice(&42i32.to_le_bytes()); + assert_eq!(buf, expected); + } + + // ── uuid ───────────────────────────────────────────────────────────────── + + #[test] + fn serialize_uuid_writes_split_reversed_halves() { + let mut buf = vec![]; + // Raw bytes: [55 0e 84 00 e2 9b 41 d4] [a7 16 44 66 55 44 00 00] + // First half reversed: [d4 41 9b e2 00 84 0e 55] + // Second half reversed: [00 00 44 55 66 44 16 a7] + serialize_value( + &json_str("550e8400-e29b-41d4-a716-446655440000"), + &ChType::Uuid, + &mut buf, + ) + .unwrap(); + assert_eq!( + buf, + [ + 0xd4, 0x41, 0x9b, 0xe2, 0x00, 0x84, 0x0e, 0x55, 0x00, 0x00, 0x44, 0x55, 0x66, 0x44, + 0x16, 0xa7, + ] + ); + } + + #[test] + fn serialize_uuid_invalid_string_is_error() { + let mut buf = vec![]; + let result = serialize_value(&json_str("not-a-uuid"), &ChType::Uuid, &mut buf); + assert!(result.is_err()); + } + + // ── enum ───────────────────────────────────────────────────────────────── + + #[test] + fn serialize_enum8_known_value() { + let mut map = std::collections::HashMap::new(); + map.insert("active".to_string(), 1i8); + map.insert("inactive".to_string(), 2i8); + let mut buf = vec![]; + serialize_value(&json_str("active"), &ChType::Enum8(map), &mut buf).unwrap(); + assert_eq!(buf, [0x01]); + } + + #[test] + fn serialize_enum8_unknown_value_is_error() { + let mut map = std::collections::HashMap::new(); + map.insert("active".to_string(), 1i8); + let mut buf = vec![]; + let result = serialize_value(&json_str("deleted"), &ChType::Enum8(map), &mut buf); + assert!(result.is_err()); + } + + #[test] + fn serialize_enum16_known_value_little_endian() { + let mut map = std::collections::HashMap::new(); + map.insert("low".to_string(), 300i16); + let mut buf = vec![]; + serialize_value(&json_str("low"), &ChType::Enum16(map), &mut buf).unwrap(); + assert_eq!(buf, 300i16.to_le_bytes()); + } + + // ── RowBinary row ──────────────────────────────────────────────────────── + #[test] + fn serialize_row_with_default_column_absent() { + use simd_json::OwnedValue; + let mut obj = simd_json::owned::Object::new(); + obj.insert("name".into(), json_str("alice")); + let value = OwnedValue::Object(Box::new(obj)); + + let columns = vec![ + col("name", ChType::String, false), + col("age", ChType::Int32, true), // has_default=true, absent → 0x01 + ]; + let mut buf = vec![]; + serialize_row(&value, &columns, &mut buf).unwrap(); + + // name: 0x00 prefix + varint(5) + "alice" + // age: 0x01 (use DEFAULT) + assert_eq!(buf[0], 0x00); // name: value follows + assert_eq!(buf[1], 5); // varint length of "alice" + assert_eq!(&buf[2..7], b"alice"); + assert_eq!(buf[7], 0x01); // age: use DEFAULT + } + + #[test] + fn serialize_row_non_nullable_absent_no_default_is_error() { + use simd_json::OwnedValue; + let value = OwnedValue::Object(Box::new(simd_json::owned::Object::new())); + let columns = vec![col("id", ChType::Int32, false)]; + let mut buf = vec![]; + // 0x00 prefix written, then error on missing non-nullable non-default column + let result = serialize_row(&value, &columns, &mut buf); + assert!(result.is_err()); + } + + #[test] + fn serialize_row_nullable_absent_writes_null_marker() { + // Absent field + Nullable column + no default → 0x00 prefix + 0x01 null marker. + let value = OwnedValue::Object(Box::new(simd_json::owned::Object::new())); + let columns = vec![col("x", ChType::Nullable(Box::new(ChType::Int32)), false)]; + let mut buf = vec![]; + serialize_row(&value, &columns, &mut buf).unwrap(); + assert_eq!(buf, [0x00, 0x01]); + } + + #[test] + fn serialize_row_non_nullable_explicit_null_is_error() { + // Field present in JSON but set to null, column is non-nullable → error. + let mut obj = simd_json::owned::Object::new(); + obj.insert("id".into(), json_null()); + let value = OwnedValue::Object(Box::new(obj)); + let columns = vec![col("id", ChType::Int32, false)]; + let mut buf = vec![]; + let result = serialize_row(&value, &columns, &mut buf); + assert!(result.is_err()); + } + + #[test] + fn serialize_row_non_object_payload_is_error() { + let value = OwnedValue::Array(Box::default()); + let columns = vec![col("x", ChType::Int32, false)]; + let mut buf = vec![]; + let result = serialize_row(&value, &columns, &mut buf); + assert!(result.is_err()); + } + + // ── date / datetime ────────────────────────────────────────────────────── + #[test] + fn serialize_date_from_integer() { + let mut buf = vec![]; + serialize_value(&json_u64(19000), &ChType::Date, &mut buf).unwrap(); + assert_eq!(buf, 19000u16.to_le_bytes()); + } + + #[test] + fn serialize_date_from_string() { + let mut buf = vec![]; + // 1970-01-02 = day 1 + serialize_value(&json_str("1970-01-02"), &ChType::Date, &mut buf).unwrap(); + assert_eq!(buf, 1u16.to_le_bytes()); + } + + #[test] + fn serialize_datetime_from_integer() { + let mut buf = vec![]; + serialize_value(&json_u64(1_700_000_000), &ChType::DateTime, &mut buf).unwrap(); + assert_eq!(buf, 1_700_000_000u32.to_le_bytes()); + } + + #[test] + fn serialize_datetime64_millis() { + let mut buf = vec![]; + // 1000 seconds → 1_000_000 milliseconds at precision=3 + serialize_value(&json_u64(1000), &ChType::DateTime64(3), &mut buf).unwrap(); + assert_eq!(buf, 1_000_000i64.to_le_bytes()); + } + + #[test] + fn serialize_date32_from_string() { + let mut buf = vec![]; + // 1970-01-02 = day 1 as i32 (Date32 uses signed Int32, not UInt16) + serialize_value(&json_str("1970-01-02"), &ChType::Date32, &mut buf).unwrap(); + assert_eq!(buf, 1i32.to_le_bytes()); + } + + #[test] + fn serialize_datetime_from_iso8601_utc_string() { + let mut buf = vec![]; + // "1970-01-02T00:00:00Z" = 86400 seconds + serialize_value( + &json_str("1970-01-02T00:00:00Z"), + &ChType::DateTime, + &mut buf, + ) + .unwrap(); + assert_eq!(buf, 86400u32.to_le_bytes()); + } + + #[test] + fn serialize_datetime_from_iso8601_positive_offset() { + let mut buf = vec![]; + // "1970-01-01T01:00:00+01:00" = midnight UTC = 0 seconds + serialize_value( + &json_str("1970-01-01T01:00:00+01:00"), + &ChType::DateTime, + &mut buf, + ) + .unwrap(); + assert_eq!(buf, 0u32.to_le_bytes()); + } + + #[test] + fn serialize_datetime64_from_string_with_fractional_seconds() { + let mut buf = vec![]; + // 1.5 seconds at precision=3 → 1500 milliseconds + serialize_value( + &json_str("1970-01-01T00:00:01.500Z"), + &ChType::DateTime64(3), + &mut buf, + ) + .unwrap(); + assert_eq!(buf, 1500i64.to_le_bytes()); + } + + // ── decimal ────────────────────────────────────────────────────────────── + #[test] + fn serialize_decimal32_scale2() { + let mut buf = vec![]; + // 3.15 * 10^2 = 314 → Int32 + serialize_value(&json_f64(3.15), &ChType::Decimal(9, 2), &mut buf).unwrap(); + assert_eq!(buf, 315i32.to_le_bytes()); + } + + #[test] + fn serialize_decimal64_scale4() { + let mut buf = vec![]; + serialize_value(&json_f64(1.2345), &ChType::Decimal(18, 4), &mut buf).unwrap(); + assert_eq!(buf, 12345i64.to_le_bytes()); + } + + #[test] + fn serialize_decimal128_two_word_layout() { + let mut buf = vec![]; + // Decimal(38, 2): 1.0 → int_val = 100 → fits in i128 + // Written as two little-endian i64 words: lo=100, hi=0 + serialize_value(&json_f64(1.0), &ChType::Decimal(38, 2), &mut buf).unwrap(); + let mut expected = 100i64.to_le_bytes().to_vec(); + expected.extend_from_slice(&0i64.to_le_bytes()); + assert_eq!(buf, expected); + } + + // ── array ──────────────────────────────────────────────────────────────── + #[test] + fn serialize_array_of_int32() { + let arr = OwnedValue::Array(Box::new(vec![json_i64(1), json_i64(2), json_i64(3)])); + let mut buf = vec![]; + serialize_value(&arr, &ChType::Array(Box::new(ChType::Int32)), &mut buf).unwrap(); + // varint(3) + 3×Int32 + assert_eq!(buf[0], 3); // varint + assert_eq!(&buf[1..5], 1i32.to_le_bytes()); + assert_eq!(&buf[5..9], 2i32.to_le_bytes()); + assert_eq!(&buf[9..13], 3i32.to_le_bytes()); + } + + // ── map ────────────────────────────────────────────────────────────────── + + #[test] + fn serialize_map_string_to_int32() { + // Map(String, Int32): {"k": 1} + // → varint(1) + string("k") + Int32(1) + let mut obj = simd_json::owned::Object::new(); + obj.insert("k".into(), json_i64(1)); + let value = OwnedValue::Object(Box::new(obj)); + let mut buf = vec![]; + serialize_value( + &value, + &ChType::Map(Box::new(ChType::String), Box::new(ChType::Int32)), + &mut buf, + ) + .unwrap(); + assert_eq!(buf[0], 1); // varint: 1 entry + assert_eq!(buf[1], 1); // varint: key length 1 + assert_eq!(buf[2], b'k'); + assert_eq!(&buf[3..7], 1i32.to_le_bytes()); + } + + #[test] + fn serialize_map_non_object_is_error() { + let value = OwnedValue::Array(Box::default()); + let mut buf = vec![]; + let result = serialize_value( + &value, + &ChType::Map(Box::new(ChType::String), Box::new(ChType::Int32)), + &mut buf, + ); + assert!(result.is_err()); + } + + // ── tuple ───────────────────────────────────────────────────────────────── + + #[test] + fn serialize_tuple_from_json_array() { + // Tuple(String, Int32): ["hi", 7] + let arr = OwnedValue::Array(Box::new(vec![json_str("hi"), json_i64(7)])); + let mut buf = vec![]; + serialize_value( + &arr, + &ChType::Tuple(vec![ChType::String, ChType::Int32]), + &mut buf, + ) + .unwrap(); + assert_eq!(&buf[..3], &[0x02, b'h', b'i']); // string "hi" + assert_eq!(&buf[3..], 7i32.to_le_bytes()); // Int32 7 + } + + #[test] + fn serialize_tuple_from_json_object() { + // Tuple(String, Int32) as named object: {"a": "hi", "b": 7} + let mut obj = simd_json::owned::Object::new(); + obj.insert("a".into(), json_str("hi")); + obj.insert("b".into(), json_i64(7)); + let value = OwnedValue::Object(Box::new(obj)); + let mut buf = vec![]; + serialize_value( + &value, + &ChType::Tuple(vec![ChType::String, ChType::Int32]), + &mut buf, + ) + .unwrap(); + assert_eq!(&buf[..3], &[0x02, b'h', b'i']); + assert_eq!(&buf[3..], 7i32.to_le_bytes()); + } + + #[test] + fn serialize_tuple_length_mismatch_is_error() { + // Schema expects 2 fields, array has 1 → error + let arr = OwnedValue::Array(Box::new(vec![json_i64(1)])); + let mut buf = vec![]; + let result = serialize_value( + &arr, + &ChType::Tuple(vec![ChType::Int32, ChType::String]), + &mut buf, + ); + assert!(result.is_err()); + } + + // ── ipv4 / ipv6 ────────────────────────────────────────────────────────── + #[test] + fn serialize_ipv4() { + let mut buf = vec![]; + serialize_value(&json_str("127.0.0.1"), &ChType::IPv4, &mut buf).unwrap(); + assert_eq!(buf, [127, 0, 0, 1]); + } + + #[test] + fn serialize_ipv6_loopback() { + let mut buf = vec![]; + serialize_value(&json_str("::1"), &ChType::IPv6, &mut buf).unwrap(); + assert_eq!(buf.len(), 16); + assert_eq!(buf[15], 1); + assert!(buf[..15].iter().all(|&b| b == 0)); + } +} + +// Hex decoding helper (no extra dep — manual implementation for UUID) +mod hex { + pub fn decode(s: &str) -> Result, ()> { + if !s.len().is_multiple_of(2) { + return Err(()); + } + s.as_bytes() + .chunks(2) + .map(|chunk| { + let hi = from_hex_digit(chunk[0])?; + let lo = from_hex_digit(chunk[1])?; + Ok((hi << 4) | lo) + }) + .collect() + } + + fn from_hex_digit(b: u8) -> Result { + match b { + b'0'..=b'9' => Ok(b - b'0'), + b'a'..=b'f' => Ok(b - b'a' + 10), + b'A'..=b'F' => Ok(b - b'A' + 10), + _ => Err(()), + } + } +} \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/body.rs b/core/connectors/sinks/clickhouse_sink/src/body.rs new file mode 100644 index 0000000000..2fb58fccc7 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/body.rs @@ -0,0 +1,286 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! HTTP request body builders for each INSERT format. +//! +//! Each function accepts a slice of [`ConsumedMessage`]s and returns the raw +//! bytes that will be sent to ClickHouse. Payloads of the wrong type for the +//! chosen format are skipped with a warning rather than erroring, so a mixed +//! batch never causes a complete failure. + +use crate::StringFormat; +use crate::schema::Column; +use iggy_connector_sdk::{ConsumedMessage, Error, Payload}; +use tracing::warn; + +// ─── Body builders ─────────────────────────────────────────────────────────── + +/// Build a newline-delimited JSON body for `FORMAT JSONEachRow`. +/// Each `Payload::Json` message becomes one line. Other payload types are skipped. +pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec { + let mut buf = Vec::with_capacity(messages.len() * 64); + for msg in messages { + match &msg.payload { + Payload::Json(value) => { + if simd_json::to_writer(&mut buf, value).is_ok() { + buf.push(b'\n'); + } else { + warn!("Failed to serialise JSON payload at offset {}", msg.offset); + } + } + other => { + warn!( + "JSONEachRow mode: skipping unsupported payload type {:?} at offset {}", + payload_type_name(other), + msg.offset + ); + } + } + } + buf +} + +/// Build a RowBinaryWithDefaults body. +/// Each `Payload::Json` message is serialised to binary using the table schema. +pub(crate) fn build_row_binary_body( + messages: &[ConsumedMessage], + schema: &[Column], +) -> Result, Error> { + let mut buf = Vec::with_capacity(messages.len() * 128); + for msg in messages { + match &msg.payload { + Payload::Json(value) => { + crate::binary::serialize_row(value, schema, &mut buf)?; + } + other => { + warn!( + "RowBinary mode: skipping unsupported payload type {:?} at offset {}", + payload_type_name(other), + msg.offset + ); + } + } + } + Ok(buf) +} + +/// Build a raw string body for CSV / TSV / JSONEachRow string passthrough. +/// Each `Payload::Text` message is written as-is with a trailing newline +/// appended for CSV/TSV if not already present. +pub(crate) fn build_string_body( + messages: &[ConsumedMessage], + string_format: StringFormat, +) -> Vec { + let mut buf = Vec::with_capacity(messages.len() * 64); + for msg in messages { + match &msg.payload { + Payload::Text(s) => { + buf.extend_from_slice(s.as_bytes()); + if string_format.requires_newline() && !s.ends_with('\n') { + buf.push(b'\n'); + } + } + other => { + warn!( + "String passthrough mode: skipping unsupported payload type {:?} at offset {}", + payload_type_name(other), + msg.offset + ); + } + } + } + buf +} + +fn payload_type_name(p: &Payload) -> &'static str { + match p { + Payload::Json(_) => "Json", + Payload::Raw(_) => "Raw", + Payload::Text(_) => "Text", + Payload::Proto(_) => "Proto", + Payload::FlatBuffer(_) => "FlatBuffer", + } +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::StringFormat; + use crate::schema::{ChType, Column}; + use iggy_connector_sdk::{ConsumedMessage, Payload}; + use simd_json::{OwnedValue, StaticNode}; + + fn msg(payload: Payload) -> ConsumedMessage { + ConsumedMessage { + id: 0, + offset: 0, + checksum: 0, + timestamp: 0, + origin_timestamp: 0, + headers: None, + payload, + } + } + + fn col(name: &str, ch_type: ChType) -> Column { + Column { + name: name.into(), + ch_type, + has_default: false, + } + } + + fn json_null() -> OwnedValue { + OwnedValue::Static(StaticNode::Null) + } + + // ── build_json_body ────────────────────────────────────────────────────── + + /// Expects each line to be a valid JSON object + #[test] + fn json_body_empty_input_returns_empty_buf() { + assert!(build_json_body(&[]).is_empty()); + } + + #[test] + fn json_body_null_payload_produces_null_line() { + let messages = vec![msg(Payload::Json(json_null()))]; + assert_eq!(build_json_body(&messages), b"null\n"); + } + + #[test] + fn json_body_appends_one_line_per_message() { + let messages = vec![ + msg(Payload::Json(json_null())), + msg(Payload::Json(json_null())), + ]; + assert_eq!(build_json_body(&messages), b"null\nnull\n"); + } + + #[test] + fn json_body_non_json_payload_is_skipped() { + let messages = vec![msg(Payload::Text("hello".into()))]; + assert!(build_json_body(&messages).is_empty()); + } + + #[test] + fn json_body_mixed_payloads_only_json_included() { + let messages = vec![ + msg(Payload::Json(json_null())), + msg(Payload::Text("skipped".into())), + msg(Payload::Raw(vec![1, 2, 3])), + msg(Payload::Json(json_null())), + ]; + assert_eq!(build_json_body(&messages), b"null\nnull\n"); + } + + // ── build_string_body ──────────────────────────────────────────────────── + + #[test] + fn string_body_empty_input_returns_empty_buf() { + assert!(build_string_body(&[], StringFormat::Csv).is_empty()); + } + + #[test] + fn string_body_csv_appends_newline_when_missing() { + let messages = vec![msg(Payload::Text("a,b,c".into()))]; + assert_eq!(build_string_body(&messages, StringFormat::Csv), b"a,b,c\n"); + } + + #[test] + fn string_body_csv_does_not_double_newline() { + let messages = vec![msg(Payload::Text("a,b,c\n".into()))]; + assert_eq!(build_string_body(&messages, StringFormat::Csv), b"a,b,c\n"); + } + + #[test] + fn string_body_tsv_appends_newline_when_missing() { + let messages = vec![msg(Payload::Text("a\tb\tc".into()))]; + assert_eq!( + build_string_body(&messages, StringFormat::Tsv), + b"a\tb\tc\n" + ); + } + + #[test] + fn string_body_json_each_row_does_not_append_newline() { + // JSONEachRow strings are already self-delimiting; no newline should be added. + let messages = vec![msg(Payload::Text("{\"k\":1}".into()))]; + assert_eq!( + build_string_body(&messages, StringFormat::JsonEachRow), + b"{\"k\":1}" + ); + } + + #[test] + fn string_body_non_text_payload_is_skipped() { + let messages = vec![ + msg(Payload::Raw(vec![1, 2, 3])), + msg(Payload::Json(json_null())), + ]; + assert!(build_string_body(&messages, StringFormat::Csv).is_empty()); + } + + // ── build_row_binary_body ──────────────────────────────────────────────── + + #[test] + fn row_binary_body_empty_input_returns_empty_buf() { + assert!(build_row_binary_body(&[], &[]).unwrap().is_empty()); + } + + #[test] + fn row_binary_body_non_json_payload_is_skipped() { + let messages = vec![ + msg(Payload::Text("hello".into())), + msg(Payload::Raw(vec![0xFF])), + ]; + let body = build_row_binary_body(&messages, &[col("x", ChType::String)]).unwrap(); + assert!(body.is_empty()); + } + + #[test] + fn row_binary_body_json_payload_writes_bytes() { + // Schema: one non-nullable String column named "name". + // JSON: {"name": "alice"} + let mut obj = simd_json::owned::Object::with_capacity(1); + obj.insert("name".to_string(), OwnedValue::String("alice".into())); + let messages = vec![msg(Payload::Json(OwnedValue::Object(Box::new(obj))))]; + let schema = vec![col("name", ChType::String)]; + let body = build_row_binary_body(&messages, &schema).unwrap(); + // RowBinaryWithDefaults: 0x00 (value follows) + LEB128 length (5) + UTF-8 bytes + assert_eq!(body, b"\x00\x05alice"); + } + + #[test] + fn row_binary_body_multiple_rows_concatenated() { + let mut obj1 = simd_json::owned::Object::with_capacity(1); + obj1.insert("n".to_string(), OwnedValue::String("x".into())); + let mut obj2 = simd_json::owned::Object::with_capacity(1); + obj2.insert("n".to_string(), OwnedValue::String("y".into())); + let messages = vec![ + msg(Payload::Json(OwnedValue::Object(Box::new(obj1)))), + msg(Payload::Json(OwnedValue::Object(Box::new(obj2)))), + ]; + let schema = vec![col("n", ChType::String)]; + let body = build_row_binary_body(&messages, &schema).unwrap(); + // Two rows: 0x00 (value follows) + \x01x and 0x00 + \x01y + assert_eq!(body, b"\x00\x01x\x00\x01y"); + } +} \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/client.rs b/core/connectors/sinks/clickhouse_sink/src/client.rs new file mode 100644 index 0000000000..47588a67b8 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/client.rs @@ -0,0 +1,322 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Thin `reqwest`-based HTTP client for the ClickHouse HTTP interface. +//! +//! ClickHouse exposes its HTTP API at `http://host:port/`. Queries are sent +//! either as a URL query parameter (`?query=...`) or in the request body. +//! Authentication uses the `X-ClickHouse-User` and `X-ClickHouse-Key` headers. +//! +//! Insert format: +//! POST /?database={db}&query=INSERT+INTO+{table}+FORMAT+{fmt} +//! Body: row data in the chosen format + +use crate::schema::{Column, parse_type}; +use iggy_connector_sdk::Error; +use reqwest::StatusCode; +use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; +use serde::Deserialize; +use std::time::Duration; +use tracing::{debug, error, info, warn}; + +const USER_HEADER: &str = "X-ClickHouse-User"; +const KEY_HEADER: &str = "X-ClickHouse-Key"; + +/// Thin wrapper around `reqwest::Client` pre-configured for a ClickHouse +/// endpoint. +#[derive(Debug)] +pub struct ClickHouseClient { + inner: reqwest::Client, + base_url: String, + database: String, + auth_headers: HeaderMap, +} + +impl ClickHouseClient { + /// Build a new client. + pub fn new( + base_url: String, + database: String, + username: &str, + password: &str, + timeout: Duration, + ) -> Result { + let inner = reqwest::Client::builder() + .timeout(timeout) + .build() + .map_err(|e| Error::InitError(format!("Failed to build HTTP client: {e}")))?; + + let mut auth_headers = HeaderMap::new(); + auth_headers.insert( + USER_HEADER, + HeaderValue::from_str(username) + .map_err(|e| Error::InitError(format!("Invalid username header value: {e}")))?, + ); + auth_headers.insert( + KEY_HEADER, + HeaderValue::from_str(password) + .map_err(|e| Error::InitError(format!("Invalid password header value: {e}")))?, + ); + + Ok(ClickHouseClient { + inner, + base_url, + database, + auth_headers, + }) + } + + /// Send `SELECT 1` to verify the server is reachable. + pub async fn ping(&self) -> Result<(), Error> { + let url = format!("{}/ping", self.base_url); + let response = self + .inner + .get(&url) + .headers(self.auth_headers.clone()) + .send() + .await + .map_err(|e| Error::InitError(format!("Ping failed: {e}")))?; + + if response.status().is_success() { + Ok(()) + } else { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + error!("ClickHouse ping returned HTTP {status}: {body}"); + Err(Error::InitError(format!( + "ClickHouse ping returned HTTP {status}: {body}" + ))) + } + } + + /// Fetch the column definitions for `table` in the configured database. + /// Returns columns ordered by their position in the table definition. + pub async fn fetch_schema(&self, table: &str) -> Result, Error> { + let query = format!( + "SELECT name, type, default_kind FROM system.columns \ + WHERE database = '{}' AND table = '{}' \ + ORDER BY position \ + FORMAT JSONEachRow", + escape_single_quote(&self.database), + escape_single_quote(table), + ); + + let body = self.run_query(&query).await?; + let mut columns = Vec::new(); + + for line in body.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let row: SchemaRow = serde_json::from_str(line).map_err(|e| { + error!("Failed to parse schema row '{line}': {e}"); + Error::InitError(format!("Schema parse error: {e}")) + })?; + + let ch_type = parse_type(&row.r#type)?; + let has_default = matches!( + row.default_kind.as_deref(), + Some("DEFAULT") | Some("MATERIALIZED") | Some("ALIAS") + ); + columns.push(Column { + name: row.name, + ch_type, + has_default, + }); + } + + if columns.is_empty() { + error!( + "Table '{table}' not found or has no columns in database '{}'", + self.database + ); + return Err(Error::InitError(format!( + "Table '{table}' not found in database '{}'", + self.database + ))); + } + + info!( + "Fetched schema for table '{}': {} columns", + table, + columns.len() + ); + Ok(columns) + } + + /// Insert `body` into `table` using the given ClickHouse FORMAT string. + /// + /// Retries up to `max_retries` times on transient errors (network errors, + /// HTTP 429, HTTP 5xx). Does not retry on HTTP 4xx (data errors). + pub async fn insert( + &self, + table: &str, + format: &str, + body: Vec, + max_retries: u32, + retry_delay: Duration, + ) -> Result<(), Error> { + if body.is_empty() { + debug!("insert called with empty body — skipping"); + return Ok(()); + } + + let query = format!( + "INSERT INTO `{}`.`{}` FORMAT {}", + escape_backtick(&self.database), + escape_backtick(table), + format, + ); + let url = format!( + "{}/?database={}&date_time_input_format=best_effort", + self.base_url, + urlencoded(&self.database), + ); + + let mut attempts = 0u32; + loop { + let result = self + .inner + .post(&url) + .headers(self.auth_headers.clone()) + .header(CONTENT_TYPE, "application/octet-stream") + .query(&[("query", &query)]) + .body(body.clone()) + .send() + .await; + + match result { + Ok(response) => { + let status = response.status(); + if status.is_success() { + debug!( + "Inserted {} bytes into {}.{} FORMAT {}", + body.len(), + self.database, + table, + format + ); + return Ok(()); + } + + let body_text = response.text().await.unwrap_or_default(); + + if is_retryable_status(status) { + attempts += 1; + if attempts >= max_retries { + error!( + "Insert failed after {attempts} attempts (HTTP {status}): {body_text}" + ); + return Err(Error::CannotStoreData(format!( + "HTTP {status}: {body_text}" + ))); + } + warn!( + "Retryable HTTP {status} on attempt {attempts}/{max_retries}: {body_text}" + ); + tokio::time::sleep(retry_delay * attempts).await; + } else { + // Non-retryable: 4xx data error — log and fail immediately. + error!("ClickHouse insert error HTTP {status}: {body_text}"); + return Err(Error::CannotStoreData(format!( + "HTTP {status}: {body_text}" + ))); + } + } + Err(e) => { + // Network / timeout error — retryable. + attempts += 1; + if attempts >= max_retries { + error!("Insert failed after {attempts} attempts: {e}"); + return Err(Error::CannotStoreData(format!( + "Network error after {attempts} attempts: {e}" + ))); + } + warn!("Network error on attempt {attempts}/{max_retries}: {e}. Retrying..."); + tokio::time::sleep(retry_delay * attempts).await; + } + } + } + } + + // ── Private helpers ─────────────────────────────────────────────────────── + + /// Run a read-only query and return the response body as a String. + async fn run_query(&self, query: &str) -> Result { + let url = format!("{}/?database={}", self.base_url, urlencoded(&self.database)); + let response = self + .inner + .post(&url) + .headers(self.auth_headers.clone()) + .body(query.to_owned()) + .send() + .await + .map_err(|e| Error::InitError(format!("Query failed: {e}")))?; + + let status = response.status(); + let body = response + .text() + .await + .map_err(|e| Error::InitError(format!("Failed to read response: {e}")))?; + + if !status.is_success() { + error!("Query returned HTTP {status}: {body}"); + return Err(Error::InitError(format!("HTTP {status}: {body}"))); + } + Ok(body) + } +} + +// ─── Helper types ───────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct SchemaRow { + name: String, + r#type: String, + default_kind: Option, +} + +fn is_retryable_status(status: StatusCode) -> bool { + status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() +} + +fn escape_single_quote(s: &str) -> String { + s.replace('\'', "\\'") +} + +fn escape_backtick(s: &str) -> String { + s.replace('`', "\\`") +} + +fn urlencoded(s: &str) -> String { + // Minimal percent-encoding for the database name query parameter. + let mut out = String::with_capacity(s.len()); + for ch in s.chars() { + match ch { + 'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' => out.push(ch), + other => { + let mut buf = [0u8; 4]; + for byte in other.encode_utf8(&mut buf).bytes() { + out.push_str(&format!("%{byte:02X}")); + } + } + } + } + out +} \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/lib.rs b/core/connectors/sinks/clickhouse_sink/src/lib.rs new file mode 100644 index 0000000000..b20d26895f --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/lib.rs @@ -0,0 +1,286 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use humantime::Duration as HumanDuration; +use iggy_connector_sdk::{Error, sink_connector}; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use std::time::Duration; +use tokio::sync::Mutex; + +mod binary; +mod body; +mod client; +mod schema; +mod sink; + +sink_connector!(ClickHouseSink); + +const DEFAULT_DATABASE: &str = "default"; +const DEFAULT_USERNAME: &str = "default"; +const DEFAULT_PASSWORD: &str = ""; +const DEFAULT_TIMEOUT_SECONDS: u64 = 30; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClickHouseSinkConfig { + pub url: String, + pub database: Option, + pub username: Option, + pub password: Option, + pub table: String, + /// "json_each_row" (default), "row_binary", or "string" + pub insert_format: Option, + /// "json_each_row" (default), "csv", or "tsv" — only used when insert_format = "string" + pub string_format: Option, + pub timeout_seconds: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub verbose_logging: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum InsertFormat { + #[default] + JsonEachRow, + RowBinary, + StringPassthrough, +} + +impl InsertFormat { + fn from_config(s: Option<&str>) -> Self { + match s.map(|s| s.to_lowercase()).as_deref() { + Some("row_binary") => InsertFormat::RowBinary, + Some("string") => InsertFormat::StringPassthrough, + _ => InsertFormat::JsonEachRow, + } + } + + pub fn clickhouse_format_name(&self, string_fmt: StringFormat) -> &'static str { + match self { + InsertFormat::JsonEachRow => "JSONEachRow", + InsertFormat::RowBinary => "RowBinaryWithDefaults", + InsertFormat::StringPassthrough => string_fmt.clickhouse_format_name(), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum StringFormat { + #[default] + JsonEachRow, + Csv, + Tsv, +} + +impl StringFormat { + fn from_config(s: Option<&str>) -> Self { + match s.map(|s| s.to_lowercase()).as_deref() { + Some("csv") => StringFormat::Csv, + Some("tsv") => StringFormat::Tsv, + _ => StringFormat::JsonEachRow, + } + } + + pub fn clickhouse_format_name(&self) -> &'static str { + match self { + StringFormat::JsonEachRow => "JSONEachRow", + StringFormat::Csv => "CSV", + StringFormat::Tsv => "TSV", + } + } + + /// CSV and TSV rows must end with a newline; JSON rows are already delimited. + pub fn requires_newline(&self) -> bool { + matches!(self, StringFormat::Csv | StringFormat::Tsv) + } +} + +#[derive(Debug)] +struct State { + messages_processed: u64, + errors_count: u64, +} + +#[derive(Debug)] +pub struct ClickHouseSink { + id: u32, + config: ClickHouseSinkConfig, + client: Option, + table_schema: Option>, + insert_format: InsertFormat, + string_format: StringFormat, + retry_delay: Duration, + state: Mutex, +} + +impl ClickHouseSink { + pub fn new(id: u32, config: ClickHouseSinkConfig) -> Self { + let insert_format = InsertFormat::from_config(config.insert_format.as_deref()); + let string_format = StringFormat::from_config(config.string_format.as_deref()); + let delay_str = config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY); + let retry_delay = HumanDuration::from_str(delay_str) + .map(|d| d.into()) + .unwrap_or_else(|_| Duration::from_secs(1)); + + ClickHouseSink { + id, + config, + client: None, + table_schema: None, + insert_format, + string_format, + retry_delay, + state: Mutex::new(State { + messages_processed: 0, + errors_count: 0, + }), + } + } + + pub fn database(&self) -> &str { + self.config.database.as_deref().unwrap_or(DEFAULT_DATABASE) + } + + pub fn username(&self) -> &str { + self.config.username.as_deref().unwrap_or(DEFAULT_USERNAME) + } + + pub fn password(&self) -> &str { + self.config.password.as_deref().unwrap_or(DEFAULT_PASSWORD) + } + + pub fn timeout(&self) -> Duration { + Duration::from_secs( + self.config + .timeout_seconds + .unwrap_or(DEFAULT_TIMEOUT_SECONDS), + ) + } + + pub fn max_retries(&self) -> u32 { + self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES) + } + + pub fn verbose(&self) -> bool { + self.config.verbose_logging.unwrap_or(false) + } + + fn get_client(&self) -> Result<&client::ClickHouseClient, Error> { + self.client + .as_ref() + .ok_or_else(|| Error::InitError("ClickHouse client not initialised".into())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> ClickHouseSinkConfig { + ClickHouseSinkConfig { + url: "http://localhost:8123".into(), + database: None, + username: None, + password: None, + table: "events".into(), + insert_format: None, + string_format: None, + timeout_seconds: None, + max_retries: None, + retry_delay: None, + verbose_logging: None, + } + } + + #[test] + fn given_no_insert_format_should_default_to_json_each_row() { + let sink = ClickHouseSink::new(1, test_config()); + assert_eq!(sink.insert_format, InsertFormat::JsonEachRow); + } + + #[test] + fn given_row_binary_insert_format_should_parse_correctly() { + let mut config = test_config(); + config.insert_format = Some("row_binary".into()); + let sink = ClickHouseSink::new(1, config); + assert_eq!(sink.insert_format, InsertFormat::RowBinary); + } + + #[test] + fn given_string_insert_format_should_parse_correctly() { + let mut config = test_config(); + config.insert_format = Some("string".into()); + let sink = ClickHouseSink::new(1, config); + assert_eq!(sink.insert_format, InsertFormat::StringPassthrough); + } + + #[test] + fn given_csv_string_format_should_parse_correctly() { + let mut config = test_config(); + config.insert_format = Some("string".into()); + config.string_format = Some("csv".into()); + let sink = ClickHouseSink::new(1, config); + assert_eq!(sink.string_format, StringFormat::Csv); + } + + #[test] + fn given_no_retry_delay_should_default_to_one_second() { + let sink = ClickHouseSink::new(1, test_config()); + assert_eq!(sink.retry_delay, Duration::from_secs(1)); + } + + #[test] + fn given_custom_retry_delay_should_parse_humantime() { + let mut config = test_config(); + config.retry_delay = Some("500ms".into()); + let sink = ClickHouseSink::new(1, config); + assert_eq!(sink.retry_delay, Duration::from_millis(500)); + } + + #[test] + fn given_no_database_should_use_default() { + let sink = ClickHouseSink::new(1, test_config()); + assert_eq!(sink.database(), DEFAULT_DATABASE); + } + + #[test] + fn given_json_format_should_return_correct_clickhouse_name() { + assert_eq!( + InsertFormat::JsonEachRow.clickhouse_format_name(StringFormat::JsonEachRow), + "JSONEachRow" + ); + } + + #[test] + fn given_row_binary_format_should_return_row_binary_with_defaults() { + assert_eq!( + InsertFormat::RowBinary.clickhouse_format_name(StringFormat::JsonEachRow), + "RowBinaryWithDefaults" + ); + } + + #[test] + fn given_string_format_csv_should_return_csv() { + assert_eq!( + InsertFormat::StringPassthrough.clickhouse_format_name(StringFormat::Csv), + "CSV" + ); + } +} \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/schema.rs b/core/connectors/sinks/clickhouse_sink/src/schema.rs new file mode 100644 index 0000000000..1f959554c3 --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/schema.rs @@ -0,0 +1,688 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! ClickHouse column schema model and type string parser used by RowBinary mode. +//! +//! +//! # Type string grammar +//! +//! Grammar followed by ClickHouse type strings (as returned by `SELECT type FROM system.columns`): +//! ```text +//! type ::= composite | parameterised | primitive +//! +//! composite ::= "Nullable(" type ")" +//! | "Array(" type ")" +//! | "Map(" type ", " type ")" +//! | "Tuple(" tuple_fields ")" +//! +//! tuple_fields ::= type ("," type)* -- unnamed fields +//! | ident type ("," ident type)* -- named fields +//! +//! parameterised ::= "FixedString(" n ")" +//! | "DateTime64(" precision ["," tz] ")" +//! | "DateTime(" tz ")" +//! | "Decimal(" precision "," scale ")" +//! | "Decimal32(" scale ")" +//! | "Decimal64(" scale ")" +//! | "Decimal128(" scale ")" +//! | "Enum8(" enum_pairs ")" +//! | "Enum16(" enum_pairs ")" +//! +//! enum_pairs ::= "'" name "' = " int ("," "'" name "' = " int)* +//! +//! primitive ::= "String" | "Int8" | "Int16" | "Int32" | "Int64" +//! | "UInt8" | "UInt16" | "UInt32" | "UInt64" +//! | "Float32" | "Float64" | "Bool" | "Boolean" +//! | "UUID" | "Date" | "Date32" | "DateTime" +//! | "IPv4" | "IPv6" +//! ``` +//! +//! +//! ## Example +//! +//! ```text +//! Nullable(Map(String, Array(Tuple(id Int32, ts DateTime64(3, 'UTC'))))) +//! ``` +//! +//! Parses into the AST: +//! +//! ```text +//! Nullable +//! └── Map +//! ├── key: String +//! └── value: Array +//! └── Tuple +//! ├── [0] Int32 +//! └── [1] DateTime64(3) +//! ``` + +use iggy_connector_sdk::Error; +use std::collections::HashMap; +use tracing::error; + +/// A single ClickHouse table column. +#[derive(Debug, Clone)] +pub struct Column { + pub name: String, + pub ch_type: ChType, + /// True when the column has a DEFAULT or MATERIALIZED expression, meaning + /// RowBinaryWithDefaults can skip it with a 0x01 prefix byte. + pub has_default: bool, +} + +/// Supported ClickHouse column types. +/// +/// Unsupported types (LowCardinality, Variant, the new JSON column type, geo +/// types) cause `parse_type` to return an error, which in turn makes `open()` +/// fail rather than silently producing corrupt data. +#[derive(Debug, Clone)] +pub enum ChType { + // ── Primitives ────────────────────────────────────────────────────────── + String, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, + Float32, + Float64, + Boolean, + Uuid, + /// Days since 1970-01-01 stored as UInt16. + Date, + /// Days since 1970-01-01 stored as Int32. + Date32, + /// Unix seconds stored as UInt32. Optional timezone suffix is ignored for + /// serialisation purposes. + DateTime, + /// Unix time scaled by 10^precision stored as Int64. + DateTime64(u8), + /// Fixed-width byte string padded with zeros. + FixedString(usize), + /// Decimal(precision, scale). Serialised as Int32 / Int64 / Int128. + Decimal(u8, u8), + /// IPv4 address — 4 bytes, big-endian. + IPv4, + /// IPv6 address — 16 bytes, big-endian. + IPv6, + /// Enum8: maps string → i8. Values parsed from the type definition. + Enum8(HashMap), + /// Enum16: maps string → i16. Values parsed from the type definition. + Enum16(HashMap), + + // ── Composites (recursive) ─────────────────────────────────────────────── + Nullable(Box), + Array(Box), + /// Map(key_type, value_type) + Map(Box, Box), + /// Tuple of ordered field types (named or unnamed). + Tuple(Vec), +} + +// ─── Public entry point ────────────────────────────────────────────────────── + +/// Parse a ClickHouse type string (as returned by `system.columns`) into a +/// `ChType`. Returns `Err(Error::InitError(...))` for unsupported or +/// unrecognised types. +pub fn parse_type(s: &str) -> Result { + parse_type_inner(s.trim()) +} + +// ─── Recursive descent parser ──────────────────────────────────────────────── + +fn parse_type_inner(s: &str) -> Result { + // Strip a single pair of outer parentheses if the entire string is wrapped. + // This shouldn't be needed for well-formed ClickHouse type strings, but is + // a defensive measure. + let s = s.trim(); + // e.g. "Nullable(Int32)" + if let Some(inner) = strip_wrapper(s, "Nullable") { + return Ok(ChType::Nullable(Box::new(parse_type_inner(inner)?))); + } + // e.g. "Array(String)" + if let Some(inner) = strip_wrapper(s, "Array") { + return Ok(ChType::Array(Box::new(parse_type_inner(inner)?))); + } + // e.g. "Map(String, Int64)" + if let Some(inner) = strip_wrapper(s, "Map") { + let (k, v) = split_two_args(inner)?; + return Ok(ChType::Map( + Box::new(parse_type_inner(k)?), + Box::new(parse_type_inner(v)?), + )); + } + // e.g. "Tuple(Int32, String)" or "Tuple(id Int32, name String)" + if let Some(inner) = strip_wrapper(s, "Tuple") { + let parts = split_args(inner)?; + // Named tuples look like `field_name Type, …`. Strip names if present. + let types: Result, Error> = parts + .iter() + .map(|p| { + let p = p.trim(); + // If the first token is an identifier followed by a space and a + // valid type keyword, treat it as a named field. + if let Some(rest) = strip_named_tuple_field(p) { + parse_type_inner(rest) + } else { + parse_type_inner(p) + } + }) + .collect(); + return Ok(ChType::Tuple(types?)); + } + // e.g. "Enum8('a' = 1, 'b' = 2)" + if let Some(inner) = strip_wrapper(s, "Enum8") { + let map = parse_enum_values_i8(inner)?; + return Ok(ChType::Enum8(map)); + } + // e.g. "Enum16('a' = 1, 'b' = 2)" + if let Some(inner) = strip_wrapper(s, "Enum16") { + let map = parse_enum_values_i16(inner)?; + return Ok(ChType::Enum16(map)); + } + // e.g. "FixedString(16)" + if let Some(inner) = strip_wrapper(s, "FixedString") { + let n: usize = inner + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid FixedString length: {inner}")))?; + return Ok(ChType::FixedString(n)); + } + // e.g. "DateTime64(3)" or "DateTime64(3, 'UTC')" + if let Some(inner) = strip_wrapper(s, "DateTime64") { + // DateTime64(precision) or DateTime64(precision, 'timezone') + let precision_str = inner.split(',').next().unwrap_or(inner).trim(); + let precision: u8 = precision_str + .parse() + .map_err(|_| init_err(format!("Invalid DateTime64 precision: {precision_str}")))?; + return Ok(ChType::DateTime64(precision)); + } + // e.g. "DateTime('UTC')" + if let Some(inner) = strip_wrapper(s, "DateTime") { + // DateTime('timezone') — timezone is ignored for serialisation. + let _ = inner; + return Ok(ChType::DateTime); + } + // e.g. "Decimal(18, 4)" + if let Some(inner) = strip_wrapper(s, "Decimal") { + let (p_str, s_str) = split_two_args(inner)?; + let precision: u8 = p_str + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid Decimal precision: {p_str}")))?; + let scale: u8 = s_str + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid Decimal scale: {s_str}")))?; + return Ok(ChType::Decimal(precision, scale)); + } + // e.g. "Decimal32(4)" + if let Some(inner) = strip_wrapper(s, "Decimal32") { + let scale: u8 = inner + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid Decimal32 scale: {inner}")))?; + return Ok(ChType::Decimal(9, scale)); + } + // e.g. "Decimal64(4)" + if let Some(inner) = strip_wrapper(s, "Decimal64") { + let scale: u8 = inner + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid Decimal64 scale: {inner}")))?; + return Ok(ChType::Decimal(18, scale)); + } + // e.g. "Decimal128(4)" + if let Some(inner) = strip_wrapper(s, "Decimal128") { + let scale: u8 = inner + .trim() + .parse() + .map_err(|_| init_err(format!("Invalid Decimal128 scale: {inner}")))?; + return Ok(ChType::Decimal(38, scale)); + } + + // Primitive leaf types + match s { + "String" => Ok(ChType::String), + "Int8" => Ok(ChType::Int8), + "Int16" => Ok(ChType::Int16), + "Int32" => Ok(ChType::Int32), + "Int64" => Ok(ChType::Int64), + "UInt8" => Ok(ChType::UInt8), + "UInt16" => Ok(ChType::UInt16), + "UInt32" => Ok(ChType::UInt32), + "UInt64" => Ok(ChType::UInt64), + "Float32" => Ok(ChType::Float32), + "Float64" => Ok(ChType::Float64), + "Bool" | "Boolean" => Ok(ChType::Boolean), + "UUID" => Ok(ChType::Uuid), + "Date" => Ok(ChType::Date), + "Date32" => Ok(ChType::Date32), + "DateTime" => Ok(ChType::DateTime), + "IPv4" => Ok(ChType::IPv4), + "IPv6" => Ok(ChType::IPv6), + + // ── Explicitly unsupported ───────────────────────────────────────── + s if s.starts_with("LowCardinality") => { + error!( + "Unsupported ClickHouse type: {s}. LowCardinality uses dictionary encoding that is not supported in RowBinary mode." + ); + Err(init_err(format!("Unsupported type: {s}"))) + } + s if s.starts_with("Variant") => { + error!("Unsupported ClickHouse type: {s}. Variant is not supported in RowBinary mode."); + Err(init_err(format!("Unsupported type: {s}"))) + } + "JSON" => { + error!( + "Unsupported ClickHouse type: JSON. The native JSON column type is not supported in RowBinary mode." + ); + Err(init_err("Unsupported type: JSON".into())) + } + s if matches!( + s, + "Point" | "Ring" | "Polygon" | "MultiPolygon" | "LineString" | "MultiLineString" + ) => + { + error!( + "Unsupported ClickHouse type: {s}. Geo types are not supported in RowBinary mode." + ); + Err(init_err(format!("Unsupported type: {s}"))) + } + other => { + error!("Unrecognised ClickHouse type: {other}"); + Err(init_err(format!("Unrecognised type: {other}"))) + } + } +} + +// ─── Helpers ───────────────────────────────────────────────────────────────── + +fn init_err(msg: String) -> Error { + Error::InitError(msg) +} + +/// If `s` starts with `prefix(` and ends with `)`, return the inner content. +fn strip_wrapper<'a>(s: &'a str, prefix: &str) -> Option<&'a str> { + let with_paren = format!("{prefix}("); + if s.starts_with(with_paren.as_str()) && s.ends_with(')') { + Some(&s[with_paren.len()..s.len() - 1]) + } else { + None + } +} + +/// Split a comma-separated argument list, respecting nested parentheses. +/// e.g. `"String, Map(String, Int32)"` → `["String", "Map(String, Int32)"]` +fn split_args(s: &str) -> Result, Error> { + let mut args = Vec::new(); + let mut depth = 0usize; + let mut start = 0usize; + + for (i, ch) in s.char_indices() { + match ch { + '(' => depth += 1, + ')' => { + if depth == 0 { + return Err(init_err(format!("Unmatched ')' in type string: {s}"))); + } + depth -= 1; + } + ',' if depth == 0 => { + args.push(s[start..i].trim()); + start = i + 1; + } + _ => {} + } + } + let last = s[start..].trim(); + if !last.is_empty() { + args.push(last); + } + Ok(args) +} + +/// Split exactly two comma-separated arguments (e.g. for Map or Decimal). +fn split_two_args(s: &str) -> Result<(&str, &str), Error> { + let parts = split_args(s)?; + if parts.len() != 2 { + return Err(init_err(format!( + "Expected exactly 2 arguments, got {}: {s}", + parts.len() + ))); + } + Ok((parts[0], parts[1])) +} + +/// If `s` starts with an identifier followed by a space and the rest looks +/// like a type, return the type portion. This handles named Tuple fields like +/// `id Int32`. +fn strip_named_tuple_field(s: &str) -> Option<&str> { + let mut chars = s.char_indices().peekable(); + // Consume identifier characters (letters, digits, underscore) + while let Some((_, ch)) = chars.peek() { + if ch.is_alphanumeric() || *ch == '_' { + chars.next(); + } else { + break; + } + } + // Expect a single space after the identifier + if let Some((idx, ' ')) = chars.next() { + let rest = s[idx + 1..].trim(); + // Sanity: the rest should start with an uppercase letter (type name) + if rest.starts_with(|c: char| c.is_uppercase()) { + return Some(rest); + } + } + None +} + +/// Parse `'name' = value, ...` pairs for Enum8. +fn parse_enum_values_i8(s: &str) -> Result, Error> { + let mut map = HashMap::new(); + for pair in split_args(s)? { + let (name, val) = parse_enum_pair(pair)?; + let v: i8 = val + .parse() + .map_err(|_| init_err(format!("Invalid Enum8 value: {val}")))?; + map.insert(name, v); + } + Ok(map) +} + +/// Parse `'name' = value, ...` pairs for Enum16. +fn parse_enum_values_i16(s: &str) -> Result, Error> { + let mut map = HashMap::new(); + for pair in split_args(s)? { + let (name, val) = parse_enum_pair(pair)?; + let v: i16 = val + .parse() + .map_err(|_| init_err(format!("Invalid Enum16 value: {val}")))?; + map.insert(name, v); + } + Ok(map) +} + +/// Parse a single `'name' = value` pair, returning (name, value_str). +fn parse_enum_pair(pair: &str) -> Result<(String, &str), Error> { + let pair = pair.trim(); + // Format: 'name' = value + let eq_pos = pair + .rfind('=') + .ok_or_else(|| init_err(format!("Invalid enum pair (no '='): {pair}")))?; + let name_part = pair[..eq_pos].trim(); + let val_part = pair[eq_pos + 1..].trim(); + // Strip surrounding single quotes from name + let name = if name_part.starts_with('\'') && name_part.ends_with('\'') { + name_part[1..name_part.len() - 1].to_string() + } else { + return Err(init_err(format!("Enum name not quoted: {name_part}"))); + }; + Ok((name, val_part)) +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_primitives() { + assert!(matches!(parse_type("String").unwrap(), ChType::String)); + assert!(matches!(parse_type("Int8").unwrap(), ChType::Int8)); + assert!(matches!(parse_type("Int16").unwrap(), ChType::Int16)); + assert!(matches!(parse_type("Int32").unwrap(), ChType::Int32)); + assert!(matches!(parse_type("Int64").unwrap(), ChType::Int64)); + assert!(matches!(parse_type("UInt8").unwrap(), ChType::UInt8)); + assert!(matches!(parse_type("UInt16").unwrap(), ChType::UInt16)); + assert!(matches!(parse_type("UInt32").unwrap(), ChType::UInt32)); + assert!(matches!(parse_type("UInt64").unwrap(), ChType::UInt64)); + assert!(matches!(parse_type("Float32").unwrap(), ChType::Float32)); + assert!(matches!(parse_type("Float64").unwrap(), ChType::Float64)); + assert!(matches!(parse_type("Boolean").unwrap(), ChType::Boolean)); + assert!(matches!(parse_type("Bool").unwrap(), ChType::Boolean)); + assert!(matches!(parse_type("UUID").unwrap(), ChType::Uuid)); + assert!(matches!(parse_type("Date").unwrap(), ChType::Date)); + assert!(matches!(parse_type("Date32").unwrap(), ChType::Date32)); + assert!(matches!(parse_type("DateTime").unwrap(), ChType::DateTime)); + assert!(matches!(parse_type("IPv4").unwrap(), ChType::IPv4)); + assert!(matches!(parse_type("IPv6").unwrap(), ChType::IPv6)); + } + + #[test] + fn parses_nullable_string() { + let t = parse_type("Nullable(String)").unwrap(); + assert!(matches!(t, ChType::Nullable(inner) if matches!(*inner, ChType::String))); + } + + #[test] + fn parses_nullable_int32() { + let t = parse_type("Nullable(Int32)").unwrap(); + assert!(matches!(t, ChType::Nullable(inner) if matches!(*inner, ChType::Int32))); + } + + #[test] + fn parses_fixed_string() { + let t = parse_type("FixedString(16)").unwrap(); + assert!(matches!(t, ChType::FixedString(16))); + } + + #[test] + fn parses_datetime64_precision() { + let t = parse_type("DateTime64(3)").unwrap(); + assert!(matches!(t, ChType::DateTime64(3))); + } + + #[test] + fn parses_datetime64_with_timezone() { + let t = parse_type("DateTime64(6, 'UTC')").unwrap(); + assert!(matches!(t, ChType::DateTime64(6))); + } + + #[test] + fn parses_datetime_with_timezone() { + let t = parse_type("DateTime('Europe/London')").unwrap(); + assert!(matches!(t, ChType::DateTime)); + } + + #[test] + fn parses_decimal() { + let t = parse_type("Decimal(18, 4)").unwrap(); + assert!(matches!(t, ChType::Decimal(18, 4))); + } + + #[test] + fn parses_decimal32() { + let t = parse_type("Decimal32(4)").unwrap(); + assert!(matches!(t, ChType::Decimal(9, 4))); + } + + #[test] + fn parses_decimal64() { + let t = parse_type("Decimal64(6)").unwrap(); + assert!(matches!(t, ChType::Decimal(18, 6))); + } + + #[test] + fn parses_array_of_string() { + let t = parse_type("Array(String)").unwrap(); + assert!(matches!(t, ChType::Array(inner) if matches!(*inner, ChType::String))); + } + + #[test] + fn parses_array_of_nullable_int32() { + let t = parse_type("Array(Nullable(Int32))").unwrap(); + assert!(matches!( + t, + ChType::Array(inner) + if matches!(*inner, ChType::Nullable(ref i) if matches!(**i, ChType::Int32)) + )); + } + + #[test] + fn parses_map_string_int32() { + let t = parse_type("Map(String, Int32)").unwrap(); + assert!(matches!(t, ChType::Map(k, v) + if matches!(*k, ChType::String) && matches!(*v, ChType::Int32))); + } + + #[test] + fn parses_map_with_complex_value() { + let t = parse_type("Map(String, Array(Int64))").unwrap(); + assert!(matches!(t, ChType::Map(k, v) + if matches!(*k, ChType::String) && matches!(*v, ChType::Array(_)))); + } + + #[test] + fn parses_tuple_unnamed() { + let t = parse_type("Tuple(String, Int32)").unwrap(); + assert!(matches!(t, ChType::Tuple(fields) if fields.len() == 2)); + } + + #[test] + fn parses_tuple_named() { + let t = parse_type("Tuple(id Int32, name String)").unwrap(); + assert!(matches!(t, ChType::Tuple(fields) if fields.len() == 2)); + } + + #[test] + fn parses_enum8() { + let t = parse_type("Enum8('active' = 1, 'inactive' = 2)").unwrap(); + if let ChType::Enum8(map) = t { + assert_eq!(map["active"], 1i8); + assert_eq!(map["inactive"], 2i8); + } else { + panic!("expected Enum8"); + } + } + + #[test] + fn parses_enum16() { + let t = parse_type("Enum16('a' = 100, 'b' = 200)").unwrap(); + if let ChType::Enum16(map) = t { + assert_eq!(map["a"], 100i16); + assert_eq!(map["b"], 200i16); + } else { + panic!("expected Enum16"); + } + } + + #[test] + fn rejects_low_cardinality() { + assert!(parse_type("LowCardinality(String)").is_err()); + } + + #[test] + fn rejects_variant() { + assert!(parse_type("Variant(String, Int32)").is_err()); + } + + #[test] + fn rejects_json_column_type() { + assert!(parse_type("JSON").is_err()); + } + + #[test] + fn rejects_geo_types() { + assert!(parse_type("Point").is_err()); + assert!(parse_type("Polygon").is_err()); + } + + #[test] + fn rejects_unknown_type() { + assert!(parse_type("WeirdType").is_err()); + } + + // Parse deeply-nested expressions ────────────────────────────────── + + /// Validates the exact example shown in the module-level grammar comment. + #[test] + fn parses_doc_comment_example() { + // Nullable(Map(String, Array(Tuple(id Int32, ts DateTime64(3, 'UTC'))))) + let t = + parse_type("Nullable(Map(String, Array(Tuple(id Int32, ts DateTime64(3, 'UTC')))))") + .unwrap(); + let ChType::Nullable(inner) = t else { + panic!("expected Nullable") + }; + let ChType::Map(k, v) = *inner else { + panic!("expected Map") + }; + assert!(matches!(*k, ChType::String)); + let ChType::Array(inner) = *v else { + panic!("expected Array") + }; + let ChType::Tuple(fields) = *inner else { + panic!("expected Tuple") + }; + assert_eq!(fields.len(), 2); + assert!(matches!(fields[0], ChType::Int32)); + assert!(matches!(fields[1], ChType::DateTime64(3))); + } + + /// Nullable wrapping a composite type (not just a primitive). + #[test] + fn parses_nullable_wrapping_composite() { + let t = parse_type("Nullable(Array(Int32))").unwrap(); + let ChType::Nullable(inner) = t else { + panic!("expected Nullable") + }; + assert!(matches!(*inner, ChType::Array(_))); + } + + /// Named tuple whose fields are themselves composite types. + #[test] + fn parses_named_tuple_with_composite_fields() { + // strip_named_tuple_field must correctly skip names whose type contains parens + let t = parse_type("Tuple(tags Array(String), meta Map(String, Int32))").unwrap(); + let ChType::Tuple(fields) = t else { + panic!("expected Tuple") + }; + assert_eq!(fields.len(), 2); + assert!(matches!(fields[0], ChType::Array(_))); + assert!(matches!(fields[1], ChType::Map(_, _))); + } + + /// Map whose value type is itself a Map — exercises that split_two_args + /// does not split on the comma inside the nested Map's argument list. + #[test] + fn parses_map_of_maps() { + let t = parse_type("Map(String, Map(String, Int32))").unwrap(); + assert!(matches!(t, ChType::Map(k, v) + if matches!(*k, ChType::String) && matches!(*v, ChType::Map(_, _)))); + } + + /// Array wrapping an unnamed tuple with more than two elements. + #[test] + fn parses_array_of_three_element_tuple() { + let t = parse_type("Array(Tuple(Float32, Float32, Float32))").unwrap(); + let ChType::Array(inner) = t else { + panic!("expected Array") + }; + let ChType::Tuple(fields) = *inner else { + panic!("expected Tuple") + }; + assert_eq!(fields.len(), 3); + assert!(fields.iter().all(|f| matches!(f, ChType::Float32))); + } +} \ No newline at end of file diff --git a/core/connectors/sinks/clickhouse_sink/src/sink.rs b/core/connectors/sinks/clickhouse_sink/src/sink.rs new file mode 100644 index 0000000000..21a8f721ea --- /dev/null +++ b/core/connectors/sinks/clickhouse_sink/src/sink.rs @@ -0,0 +1,153 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::body::{build_json_body, build_row_binary_body, build_string_body}; +use crate::{ClickHouseSink, InsertFormat, client::ClickHouseClient}; +use async_trait::async_trait; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata}; +use tracing::{debug, error, info, warn}; + +#[async_trait] +impl Sink for ClickHouseSink { + // ─── open ──────────────────────────────────────────────────────────────── + + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening ClickHouse sink connector ID: {} → {}/{} (format: {:?})", + self.id, self.config.url, self.config.table, self.insert_format, + ); + + let client = ClickHouseClient::new( + self.config.url.clone(), + self.database().to_owned(), + self.username(), + self.password(), + self.timeout(), + )?; + + client.ping().await?; + info!("ClickHouse sink ID: {} — ping OK", self.id); + + // For RowBinary mode, fetch and validate the table schema at startup. + // This fails fast if the table doesn't exist or contains unsupported types. + if self.insert_format == InsertFormat::RowBinary { + let schema = client.fetch_schema(&self.config.table).await?; + info!( + "ClickHouse sink ID: {} — loaded schema ({} columns) for table '{}'", + self.id, + schema.len(), + self.config.table + ); + self.table_schema = Some(schema); + } + + self.client = Some(client); + Ok(()) + } + + // ─── consume ───────────────────────────────────────────────────────────── + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + debug!( + "ClickHouse sink ID: {} received {} messages from {}/{} partition {} offset {}", + self.id, + messages.len(), + topic_metadata.stream, + topic_metadata.topic, + messages_metadata.partition_id, + messages_metadata.current_offset, + ); + + let client = self.get_client()?; + let table = &self.config.table; + let format_name = self + .insert_format + .clickhouse_format_name(self.string_format); + + let body = match self.insert_format { + InsertFormat::JsonEachRow => build_json_body(&messages), + InsertFormat::RowBinary => { + let schema = self.table_schema.as_deref().ok_or_else(|| { + error!("RowBinary mode but table schema is not loaded"); + Error::InitError("Table schema not loaded".into()) + })?; + build_row_binary_body(&messages, schema)? + } + InsertFormat::StringPassthrough => build_string_body(&messages, self.string_format), + }; + + if body.is_empty() { + warn!( + "ClickHouse sink ID: {} — no serialisable messages in batch of {}", + self.id, + messages.len() + ); + return Ok(()); + } + + client + .insert( + table, + format_name, + body, + self.max_retries(), + self.retry_delay, + ) + .await?; + + let count = messages.len() as u64; + let mut state = self.state.lock().await; + state.messages_processed += count; + + if self.verbose() { + info!( + "ClickHouse sink ID: {} inserted {} messages into '{table}' FORMAT {format_name}", + self.id, count + ); + } else { + debug!( + "ClickHouse sink ID: {} inserted {} messages into '{table}' FORMAT {format_name}", + self.id, count + ); + } + + Ok(()) + } + + // ─── close ─────────────────────────────────────────────────────────────── + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "ClickHouse sink ID: {} closed. Processed {} messages, {} errors.", + self.id, state.messages_processed, state.errors_count, + ); + self.client = None; + self.table_schema = None; + Ok(()) + } +} \ No newline at end of file diff --git a/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs new file mode 100644 index 0000000000..7986b0c44c --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::TEST_MESSAGE_COUNT; +use crate::connectors::create_test_messages; +use crate::connectors::fixtures::{ + ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture, +}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_binary_protocol::MessageClient; +use iggy_common::Identifier; +use integration::harness::seeds; +use integration::iggy_harness; + +// ── json_each_row: basic ────────────────────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn clickhouse_sink_stores_json_messages( + harness: &TestHarness, + fixture: ClickHouseSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages: Vec = messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_rows(TEST_MESSAGE_COUNT) + .await + .expect("Timed out waiting for rows in ClickHouse"); + + let rows = fixture + .fetch_rows() + .await + .expect("Failed to fetch rows from ClickHouse"); + + assert_eq!( + rows.len(), + TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} rows in ClickHouse" + ); + + for (i, row) in rows.iter().enumerate() { + let expected = &messages_data[i]; + + let id = row["id"] + .as_str() + .and_then(|s| s.parse::().ok()) + .or_else(|| row["id"].as_u64()) + .unwrap_or_else(|| panic!("Missing 'id' at row {i}")); + assert_eq!(id, expected.id, "id mismatch at row {i}"); + + let name = row["name"] + .as_str() + .unwrap_or_else(|| panic!("Missing 'name' at row {i}")); + assert_eq!(name, expected.name, "name mismatch at row {i}"); + + let amount = row["amount"] + .as_str() + .and_then(|s| s.parse::().ok()) + .or_else(|| row["amount"].as_f64()) + .unwrap_or_else(|| panic!("Missing 'amount' at row {i}")); + assert!( + (amount - expected.amount).abs() < 1e-6, + "amount mismatch at row {i}: got {amount}, expected {}", + expected.amount + ); + + // ClickHouse returns Bool columns as JSON true/false + let active = row["active"] + .as_bool() + .or_else(|| row["active"].as_u64().map(|v| v != 0)) + .unwrap_or_else(|| panic!("Missing 'active' at row {i}")); + assert_eq!(active, expected.active, "active mismatch at row {i}"); + } +} + +// ── json_each_row: bulk ─────────────────────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn clickhouse_sink_handles_bulk_messages( + harness: &TestHarness, + fixture: ClickHouseSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let bulk_count = 50; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(bulk_count); + let mut messages: Vec = messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_rows(bulk_count) + .await + .expect("Timed out waiting for bulk rows in ClickHouse"); + + let row_count = fixture + .count_rows() + .await + .expect("Failed to count rows from ClickHouse"); + + assert!( + row_count >= bulk_count, + "Expected at least {bulk_count} rows, got {row_count}" + ); +} + +// ── row_binary ──────────────────────────────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn clickhouse_sink_stores_messages_with_row_binary( + harness: &TestHarness, + fixture: ClickHouseSinkRowBinaryFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages: Vec = messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_rows(TEST_MESSAGE_COUNT) + .await + .expect("Timed out waiting for row_binary rows in ClickHouse"); + + let rows = fixture + .fetch_rows() + .await + .expect("Failed to fetch rows from ClickHouse"); + + assert_eq!( + rows.len(), + TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} rows in ClickHouse (row_binary)" + ); + + for (i, row) in rows.iter().enumerate() { + let expected = &messages_data[i]; + + let id = row["id"] + .as_str() + .and_then(|s| s.parse::().ok()) + .or_else(|| row["id"].as_u64()) + .unwrap_or_else(|| panic!("Missing 'id' at row {i}")); + assert_eq!(id, expected.id, "id mismatch at row {i}"); + + let name = row["name"] + .as_str() + .unwrap_or_else(|| panic!("Missing 'name' at row {i}")); + assert_eq!(name, expected.name, "name mismatch at row {i}"); + + let count = row["count"] + .as_str() + .and_then(|s| s.parse::().ok()) + .or_else(|| row["count"].as_u64().map(|v| v as u32)) + .unwrap_or_else(|| panic!("Missing 'count' at row {i}")); + assert_eq!(count, expected.count, "count mismatch at row {i}"); + } +} + +// ── string passthrough (CSV) ────────────────────────────────────────────────── + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/clickhouse/sink.toml")), + seed = seeds::connector_stream +)] +async fn clickhouse_sink_stores_string_passthrough_csv( + harness: &TestHarness, + fixture: ClickHouseSinkStringFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + // Build CSV rows matching the iggy_messages table schema: + // id UInt64, name String, count UInt32, amount Float64, active UInt8, timestamp Int64 + let messages_data = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages: Vec = messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let csv = format!( + "{},{},{},{},{},{}\n", + msg.id, msg.name, msg.count, msg.amount, msg.active as u8, msg.timestamp, + ); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(csv.into_bytes())) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_rows(TEST_MESSAGE_COUNT) + .await + .expect("Timed out waiting for CSV rows in ClickHouse"); + + let row_count = fixture + .count_rows() + .await + .expect("Failed to count rows from ClickHouse"); + + assert_eq!( + row_count, TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} rows in ClickHouse (string/CSV)" + ); +} \ No newline at end of file diff --git a/core/integration/tests/connectors/clickhouse/mod.rs b/core/integration/tests/connectors/clickhouse/mod.rs new file mode 100644 index 0000000000..25011fba33 --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/mod.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod clickhouse_sink; + +const TEST_MESSAGE_COUNT: usize = 3; \ No newline at end of file diff --git a/core/integration/tests/connectors/clickhouse/sink.toml b/core/integration/tests/connectors/clickhouse/sink.toml new file mode 100644 index 0000000000..7bfaf3bf7b --- /dev/null +++ b/core/integration/tests/connectors/clickhouse/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/clickhouse_sink" \ No newline at end of file diff --git a/core/integration/tests/connectors/fixtures/clickhouse/container.rs b/core/integration/tests/connectors/fixtures/clickhouse/container.rs new file mode 100644 index 0000000000..fb3e2c7dc4 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/container.rs @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use integration::harness::TestBinaryError; +use reqwest::Client as HttpClient; +use reqwest::header::{HeaderMap, HeaderValue}; +use serde::Deserialize; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; +use tracing::info; + +const CLICKHOUSE_IMAGE: &str = "clickhouse/clickhouse-server"; +const CLICKHOUSE_TAG: &str = "25.1"; +const CLICKHOUSE_HTTP_PORT: u16 = 8123; + +pub const CLICKHOUSE_TEST_USER: &str = "default"; +pub const CLICKHOUSE_TEST_PASSWORD: &str = "iggy_test"; + +pub const HEALTH_CHECK_ATTEMPTS: usize = 60; +pub const HEALTH_CHECK_INTERVAL_MS: u64 = 500; +pub const DEFAULT_POLL_ATTEMPTS: usize = 100; +pub const DEFAULT_POLL_INTERVAL_MS: u64 = 100; + +pub const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub const DEFAULT_TEST_TOPIC: &str = "test_topic"; +pub const DEFAULT_DATABASE: &str = "default"; +pub const DEFAULT_SINK_TABLE: &str = "iggy_messages"; + +pub const ENV_SINK_URL: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_URL"; +pub const ENV_SINK_DATABASE: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_DATABASE"; +pub const ENV_SINK_TABLE: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_TABLE"; +pub const ENV_SINK_USERNAME: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_USERNAME"; +pub const ENV_SINK_PASSWORD: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_PASSWORD"; +pub const ENV_SINK_INSERT_FORMAT: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_INSERT_FORMAT"; +pub const ENV_SINK_STRING_FORMAT: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_PLUGIN_CONFIG_STRING_FORMAT"; +pub const ENV_SINK_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_STREAM"; +pub const ENV_SINK_STREAMS_0_TOPICS: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_TOPICS"; +pub const ENV_SINK_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_SCHEMA"; +pub const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_CLICKHOUSE_STREAMS_0_CONSUMER_GROUP"; +pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_CLICKHOUSE_PATH"; + +/// DDL for the test table — matches the `TestMessage` struct. +pub const CREATE_TABLE_SQL: &str = " + CREATE TABLE IF NOT EXISTS iggy_messages ( + id UInt64, + name String, + count UInt32, + amount Float64, + active Bool, + timestamp Int64 + ) ENGINE = MergeTree() + ORDER BY id +"; + +#[derive(Debug, Deserialize)] +pub struct ClickHouseJsonResponse { + pub data: Vec, + #[allow(dead_code)] + pub rows: usize, +} + +pub struct ClickHouseContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl ClickHouseContainer { + pub async fn start() -> Result { + let container = GenericImage::new(CLICKHOUSE_IMAGE, CLICKHOUSE_TAG) + .with_exposed_port(CLICKHOUSE_HTTP_PORT.tcp()) + .with_wait_for(WaitFor::Nothing) + .with_mapped_port(0, CLICKHOUSE_HTTP_PORT.tcp()) + .with_env_var("CLICKHOUSE_USER", CLICKHOUSE_TEST_USER) + .with_env_var("CLICKHOUSE_PASSWORD", CLICKHOUSE_TEST_PASSWORD) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + info!("Started ClickHouse container"); + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(CLICKHOUSE_HTTP_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseContainer".to_string(), + message: "No mapping for ClickHouse HTTP port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("ClickHouse container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +pub fn create_http_client() -> HttpClient { + let mut headers = HeaderMap::new(); + headers.insert( + "X-ClickHouse-User", + HeaderValue::from_static(CLICKHOUSE_TEST_USER), + ); + headers.insert( + "X-ClickHouse-Key", + HeaderValue::from_static(CLICKHOUSE_TEST_PASSWORD), + ); + reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .default_headers(headers) + .build() + .expect("Failed to build HTTP client") +} + +/// Common operations against ClickHouse via its HTTP API. +pub trait ClickHouseOps: Sync { + fn container(&self) -> &ClickHouseContainer; + fn http_client(&self) -> &HttpClient; + + /// Execute a DDL or DML statement (no result expected). + #[allow(dead_code)] + fn execute_query( + &self, + sql: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = &self.container().base_url; + let response = self + .http_client() + .post(url) + .body(sql.to_string()) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseOps".to_string(), + message: format!("Failed to execute query: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseOps".to_string(), + message: format!("Query failed: status={status}, body={body}"), + }); + } + + Ok(()) + } + } + + /// Count the rows in a table. + fn count_rows( + &self, + table: &str, + ) -> impl std::future::Future> + Send { + async move { + let sql = format!("SELECT count() AS c FROM {table} FORMAT JSON"); + let url = &self.container().base_url; + let response = self + .http_client() + .post(url) + .body(sql) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to count rows: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("count() failed: status={status}, body={body}"), + }); + } + + let parsed: ClickHouseJsonResponse = + response + .json() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse count response: {e}"), + })?; + + let count = parsed + .data + .first() + .and_then(|row| row.get("c")) + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + + Ok(count) + } + } + + /// Poll until the table contains at least `expected` rows. + fn wait_for_rows( + &self, + table: &str, + expected: usize, + ) -> impl std::future::Future> + Send { + async move { + for _ in 0..DEFAULT_POLL_ATTEMPTS { + match self.count_rows(table).await { + Ok(count) if count >= expected => { + info!("Found {count} rows in {table} (expected {expected})"); + return Ok(count); + } + Ok(_) => {} + Err(_) => {} + } + sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await; + } + + let final_count = self.count_rows(table).await.unwrap_or(0); + Err(TestBinaryError::InvalidState { + message: format!( + "Expected at least {expected} rows in {table}, found {final_count} after {} attempts", + DEFAULT_POLL_ATTEMPTS + ), + }) + } + } + + /// Fetch all rows from a table ordered by `id`, returned as JSON objects. + fn fetch_rows_as_json( + &self, + table: &str, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let sql = format!("SELECT * FROM {table} ORDER BY id FORMAT JSON"); + let url = &self.container().base_url; + let response = self + .http_client() + .post(url) + .body(sql) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to fetch rows: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("SELECT failed: status={status}, body={body}"), + }); + } + + let parsed: ClickHouseJsonResponse = + response + .json() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse SELECT response: {e}"), + })?; + + Ok(parsed.data) + } + } +} \ No newline at end of file diff --git a/core/integration/tests/connectors/fixtures/clickhouse/mod.rs b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs new file mode 100644 index 0000000000..71a4265491 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod container; +mod sink; + +pub use sink::{ + ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture, +}; \ No newline at end of file diff --git a/core/integration/tests/connectors/fixtures/clickhouse/sink.rs b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs new file mode 100644 index 0000000000..ad077ce968 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::container::{ + CLICKHOUSE_TEST_PASSWORD, CLICKHOUSE_TEST_USER, CREATE_TABLE_SQL, ClickHouseContainer, + ClickHouseOps, DEFAULT_DATABASE, DEFAULT_SINK_TABLE, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, + ENV_SINK_DATABASE, ENV_SINK_INSERT_FORMAT, ENV_SINK_PASSWORD, ENV_SINK_PATH, + ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, ENV_SINK_STREAMS_0_STREAM, + ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_STRING_FORMAT, ENV_SINK_TABLE, ENV_SINK_URL, + ENV_SINK_USERNAME, HEALTH_CHECK_ATTEMPTS, HEALTH_CHECK_INTERVAL_MS, create_http_client, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use reqwest::Client as HttpClient; +use std::collections::HashMap; +use std::time::Duration; +use tokio::time::sleep; +use tracing::info; + +// ── shared helper ──────────────────────────────────────────────────────────── + +async fn start_and_init_container( + table_sql: &str, + client: &HttpClient, +) -> Result { + let container = ClickHouseContainer::start().await?; + + // Wait until ClickHouse accepts HTTP requests. + let ping_url = format!("{}/ping", container.base_url); + let mut ready = false; + for _ in 0..HEALTH_CHECK_ATTEMPTS { + if let Ok(resp) = client.get(&ping_url).send().await + && resp.status().is_success() + { + info!("ClickHouse HTTP interface is ready"); + ready = true; + break; + } + sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await; + } + + if !ready { + return Err(TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseSink".to_string(), + message: format!( + "ClickHouse did not become healthy after {} attempts", + HEALTH_CHECK_ATTEMPTS + ), + }); + } + + // Create the test table. + let response = client + .post(&container.base_url) + .body(table_sql.to_string()) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseSink".to_string(), + message: format!("Failed to create test table: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "ClickHouseSink".to_string(), + message: format!("CREATE TABLE failed: status={status}, body={body}"), + }); + } + + info!("Created ClickHouse test table"); + Ok(container) +} + +fn base_envs(container: &ClickHouseContainer, schema: &str) -> HashMap { + let mut envs = HashMap::new(); + envs.insert(ENV_SINK_URL.to_string(), container.base_url.clone()); + envs.insert(ENV_SINK_DATABASE.to_string(), DEFAULT_DATABASE.to_string()); + envs.insert(ENV_SINK_TABLE.to_string(), DEFAULT_SINK_TABLE.to_string()); + envs.insert( + ENV_SINK_USERNAME.to_string(), + CLICKHOUSE_TEST_USER.to_string(), + ); + envs.insert( + ENV_SINK_PASSWORD.to_string(), + CLICKHOUSE_TEST_PASSWORD.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", DEFAULT_TEST_TOPIC), + ); + envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), schema.to_string()); + envs.insert( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + "clickhouse_sink_cg".to_string(), + ); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_clickhouse_sink".to_string(), + ); + envs +} + +// ── JSONEachRow fixture ─────────────────────────────────────────────────────── + +/// ClickHouse sink fixture using the default `json_each_row` insert format. +pub struct ClickHouseSinkFixture { + pub container: ClickHouseContainer, + pub http_client: HttpClient, +} + +impl ClickHouseOps for ClickHouseSinkFixture { + fn container(&self) -> &ClickHouseContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +impl ClickHouseSinkFixture { + pub async fn wait_for_rows(&self, expected: usize) -> Result { + ClickHouseOps::wait_for_rows(self, DEFAULT_SINK_TABLE, expected).await + } + + pub async fn fetch_rows(&self) -> Result, TestBinaryError> { + ClickHouseOps::fetch_rows_as_json(self, DEFAULT_SINK_TABLE).await + } + + pub async fn count_rows(&self) -> Result { + ClickHouseOps::count_rows(self, DEFAULT_SINK_TABLE).await + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkFixture { + async fn setup() -> Result { + let http_client = create_http_client(); + let container = start_and_init_container(CREATE_TABLE_SQL, &http_client).await?; + Ok(Self { + container, + http_client, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + // insert_format defaults to json_each_row — no env var needed. + base_envs(&self.container, "json") + } +} + +// ── RowBinary fixture ───────────────────────────────────────────────────────── + +/// ClickHouse sink fixture using `row_binary` insert format. +/// +/// The connector fetches the table schema at startup and serialises each JSON +/// message to ClickHouse's RowBinaryWithDefaults wire format. +pub struct ClickHouseSinkRowBinaryFixture { + inner: ClickHouseSinkFixture, +} + +impl std::ops::Deref for ClickHouseSinkRowBinaryFixture { + type Target = ClickHouseSinkFixture; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkRowBinaryFixture { + async fn setup() -> Result { + let http_client = create_http_client(); + let container = start_and_init_container(CREATE_TABLE_SQL, &http_client).await?; + Ok(Self { + inner: ClickHouseSinkFixture { + container, + http_client, + }, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = base_envs(&self.inner.container, "json"); + envs.insert(ENV_SINK_INSERT_FORMAT.to_string(), "row_binary".to_string()); + envs + } +} + +// ── String (CSV) passthrough fixture ───────────────────────────────────────── + +/// ClickHouse sink fixture using `string` insert format with CSV sub-format. +/// +/// Payloads are raw CSV bytes forwarded directly to ClickHouse without any +/// parsing by the connector. +pub struct ClickHouseSinkStringFixture { + inner: ClickHouseSinkFixture, +} + +impl std::ops::Deref for ClickHouseSinkStringFixture { + type Target = ClickHouseSinkFixture; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[async_trait] +impl TestFixture for ClickHouseSinkStringFixture { + async fn setup() -> Result { + let http_client = create_http_client(); + let container = start_and_init_container(CREATE_TABLE_SQL, &http_client).await?; + Ok(Self { + inner: ClickHouseSinkFixture { + container, + http_client, + }, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = base_envs(&self.inner.container, "text"); + envs.insert(ENV_SINK_INSERT_FORMAT.to_string(), "string".to_string()); + envs.insert(ENV_SINK_STRING_FORMAT.to_string(), "csv".to_string()); + envs + } +} \ No newline at end of file diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 3c0ac0d93b..f51d53f28b 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -17,12 +17,14 @@ * under the License. */ +mod clickhouse; mod elasticsearch; mod iceberg; mod postgres; mod quickwit; mod wiremock; +pub use clickhouse::{ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture}; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; pub use postgres::{ diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index a85453d52c..8e134f172d 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -18,6 +18,7 @@ */ mod api; +mod clickhouse; mod elasticsearch; mod fixtures; mod http_config_provider; From 7c7430b917a892bec597d79ba1cdabf624250446 Mon Sep 17 00:00:00 2001 From: kriti-sc Date: Sat, 7 Mar 2026 01:18:07 +0530 Subject: [PATCH 2/3] fix linting errors --- DEPENDENCIES.md | 1 + .../sinks/clickhouse_sink/Cargo.toml | 20 +++++++++---------- .../sinks/clickhouse_sink/src/binary.rs | 2 +- .../sinks/clickhouse_sink/src/body.rs | 2 +- .../sinks/clickhouse_sink/src/client.rs | 2 +- .../sinks/clickhouse_sink/src/lib.rs | 2 +- .../sinks/clickhouse_sink/src/schema.rs | 2 +- .../sinks/clickhouse_sink/src/sink.rs | 2 +- .../connectors/clickhouse/clickhouse_sink.rs | 2 +- .../tests/connectors/clickhouse/mod.rs | 2 +- .../fixtures/clickhouse/container.rs | 2 +- .../connectors/fixtures/clickhouse/mod.rs | 2 +- .../connectors/fixtures/clickhouse/sink.rs | 2 +- .../tests/connectors/fixtures/mod.rs | 4 +++- 14 files changed, 25 insertions(+), 22 deletions(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index e3f450a7e9..f7825c4c64 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -447,6 +447,7 @@ iggy-connectors: 0.3.1-edge.1, "Apache-2.0", iggy-mcp: 0.3.1-edge.1, "Apache-2.0", iggy_binary_protocol: 0.9.1-edge.1, "Apache-2.0", iggy_common: 0.9.1-edge.1, "Apache-2.0", +iggy_connector_clickhouse_sink: 0.1.0, "Apache-2.0", iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0", iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0", iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0", diff --git a/core/connectors/sinks/clickhouse_sink/Cargo.toml b/core/connectors/sinks/clickhouse_sink/Cargo.toml index 28b013c348..b599e5c8c2 100644 --- a/core/connectors/sinks/clickhouse_sink/Cargo.toml +++ b/core/connectors/sinks/clickhouse_sink/Cargo.toml @@ -35,14 +35,14 @@ ignored = ["dashmap", "once_cell"] crate-type = ["cdylib", "lib"] [dependencies] -async-trait = { workspace = true } -dashmap = { workspace = true } -humantime = { workspace = true } +async-trait = { workspace = true } +dashmap = { workspace = true } +humantime = { workspace = true } iggy_connector_sdk = { workspace = true } -once_cell = { workspace = true } -reqwest = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -simd-json = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } \ No newline at end of file +once_cell = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/clickhouse_sink/src/binary.rs b/core/connectors/sinks/clickhouse_sink/src/binary.rs index 82ef9801c4..982923e908 100644 --- a/core/connectors/sinks/clickhouse_sink/src/binary.rs +++ b/core/connectors/sinks/clickhouse_sink/src/binary.rs @@ -1092,4 +1092,4 @@ mod hex { _ => Err(()), } } -} \ No newline at end of file +} diff --git a/core/connectors/sinks/clickhouse_sink/src/body.rs b/core/connectors/sinks/clickhouse_sink/src/body.rs index 2fb58fccc7..eeef9df834 100644 --- a/core/connectors/sinks/clickhouse_sink/src/body.rs +++ b/core/connectors/sinks/clickhouse_sink/src/body.rs @@ -283,4 +283,4 @@ mod tests { // Two rows: 0x00 (value follows) + \x01x and 0x00 + \x01y assert_eq!(body, b"\x00\x01x\x00\x01y"); } -} \ No newline at end of file +} diff --git a/core/connectors/sinks/clickhouse_sink/src/client.rs b/core/connectors/sinks/clickhouse_sink/src/client.rs index 47588a67b8..3a95f71d2f 100644 --- a/core/connectors/sinks/clickhouse_sink/src/client.rs +++ b/core/connectors/sinks/clickhouse_sink/src/client.rs @@ -319,4 +319,4 @@ fn urlencoded(s: &str) -> String { } } out -} \ No newline at end of file +} diff --git a/core/connectors/sinks/clickhouse_sink/src/lib.rs b/core/connectors/sinks/clickhouse_sink/src/lib.rs index b20d26895f..3d434ddbde 100644 --- a/core/connectors/sinks/clickhouse_sink/src/lib.rs +++ b/core/connectors/sinks/clickhouse_sink/src/lib.rs @@ -283,4 +283,4 @@ mod tests { "CSV" ); } -} \ No newline at end of file +} diff --git a/core/connectors/sinks/clickhouse_sink/src/schema.rs b/core/connectors/sinks/clickhouse_sink/src/schema.rs index 1f959554c3..d13fe6c63e 100644 --- a/core/connectors/sinks/clickhouse_sink/src/schema.rs +++ b/core/connectors/sinks/clickhouse_sink/src/schema.rs @@ -685,4 +685,4 @@ mod tests { assert_eq!(fields.len(), 3); assert!(fields.iter().all(|f| matches!(f, ChType::Float32))); } -} \ No newline at end of file +} diff --git a/core/connectors/sinks/clickhouse_sink/src/sink.rs b/core/connectors/sinks/clickhouse_sink/src/sink.rs index 21a8f721ea..0f28e0514f 100644 --- a/core/connectors/sinks/clickhouse_sink/src/sink.rs +++ b/core/connectors/sinks/clickhouse_sink/src/sink.rs @@ -150,4 +150,4 @@ impl Sink for ClickHouseSink { self.table_schema = None; Ok(()) } -} \ No newline at end of file +} diff --git a/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs index 7986b0c44c..276b648a4f 100644 --- a/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs +++ b/core/integration/tests/connectors/clickhouse/clickhouse_sink.rs @@ -312,4 +312,4 @@ async fn clickhouse_sink_stores_string_passthrough_csv( row_count, TEST_MESSAGE_COUNT, "Expected {TEST_MESSAGE_COUNT} rows in ClickHouse (string/CSV)" ); -} \ No newline at end of file +} diff --git a/core/integration/tests/connectors/clickhouse/mod.rs b/core/integration/tests/connectors/clickhouse/mod.rs index 25011fba33..d68c8a6ce9 100644 --- a/core/integration/tests/connectors/clickhouse/mod.rs +++ b/core/integration/tests/connectors/clickhouse/mod.rs @@ -19,4 +19,4 @@ mod clickhouse_sink; -const TEST_MESSAGE_COUNT: usize = 3; \ No newline at end of file +const TEST_MESSAGE_COUNT: usize = 3; diff --git a/core/integration/tests/connectors/fixtures/clickhouse/container.rs b/core/integration/tests/connectors/fixtures/clickhouse/container.rs index fb3e2c7dc4..0325dc16fe 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/container.rs +++ b/core/integration/tests/connectors/fixtures/clickhouse/container.rs @@ -294,4 +294,4 @@ pub trait ClickHouseOps: Sync { Ok(parsed.data) } } -} \ No newline at end of file +} diff --git a/core/integration/tests/connectors/fixtures/clickhouse/mod.rs b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs index 71a4265491..89233a63f8 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/mod.rs +++ b/core/integration/tests/connectors/fixtures/clickhouse/mod.rs @@ -22,4 +22,4 @@ mod sink; pub use sink::{ ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture, -}; \ No newline at end of file +}; diff --git a/core/integration/tests/connectors/fixtures/clickhouse/sink.rs b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs index ad077ce968..d8197b1b13 100644 --- a/core/integration/tests/connectors/fixtures/clickhouse/sink.rs +++ b/core/integration/tests/connectors/fixtures/clickhouse/sink.rs @@ -244,4 +244,4 @@ impl TestFixture for ClickHouseSinkStringFixture { envs.insert(ENV_SINK_STRING_FORMAT.to_string(), "csv".to_string()); envs } -} \ No newline at end of file +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index f51d53f28b..01c9713122 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -24,7 +24,9 @@ mod postgres; mod quickwit; mod wiremock; -pub use clickhouse::{ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture}; +pub use clickhouse::{ + ClickHouseSinkFixture, ClickHouseSinkRowBinaryFixture, ClickHouseSinkStringFixture, +}; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; pub use postgres::{ From fc34e4d1df2f010c5665a965c2126b1f64263dbe Mon Sep 17 00:00:00 2001 From: kriti-sc Date: Sat, 7 Mar 2026 01:22:51 +0530 Subject: [PATCH 3/3] fix linting errors --- .../runtime/example_config/connectors/clickhouse_sink.toml | 2 +- core/connectors/sinks/clickhouse_sink/config.toml | 2 +- core/integration/tests/connectors/clickhouse/sink.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml b/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml index 9290b2f3d5..7818baf7f2 100644 --- a/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml +++ b/core/connectors/runtime/example_config/connectors/clickhouse_sink.toml @@ -53,4 +53,4 @@ insert_format = "json_each_row" timeout_seconds = 30 max_retries = 3 retry_delay = "1s" -verbose_logging = false \ No newline at end of file +verbose_logging = false diff --git a/core/connectors/sinks/clickhouse_sink/config.toml b/core/connectors/sinks/clickhouse_sink/config.toml index 2a9a3aafcd..067e690fc2 100644 --- a/core/connectors/sinks/clickhouse_sink/config.toml +++ b/core/connectors/sinks/clickhouse_sink/config.toml @@ -53,4 +53,4 @@ insert_format = "json_each_row" timeout_seconds = 30 max_retries = 3 retry_delay = "1s" -verbose_logging = false \ No newline at end of file +verbose_logging = false diff --git a/core/integration/tests/connectors/clickhouse/sink.toml b/core/integration/tests/connectors/clickhouse/sink.toml index 7bfaf3bf7b..df52653bf3 100644 --- a/core/integration/tests/connectors/clickhouse/sink.toml +++ b/core/integration/tests/connectors/clickhouse/sink.toml @@ -17,4 +17,4 @@ [connectors] config_type = "local" -config_dir = "../connectors/sinks/clickhouse_sink" \ No newline at end of file +config_dir = "../connectors/sinks/clickhouse_sink"