Skip to content

feat(connectors): add ClickHouse sink connector#2891

Closed
hemantkrsh wants to merge 2 commits intoapache:masterfrom
hemantkrsh:feat/connector-clickhouse-sink
Closed

feat(connectors): add ClickHouse sink connector#2891
hemantkrsh wants to merge 2 commits intoapache:masterfrom
hemantkrsh:feat/connector-clickhouse-sink

Conversation

@hemantkrsh
Copy link

Which issue does this PR close?

Addresses the ClickHouse sink connector tracked in #2539, implementing end-to-end message delivery from iggy topics into ClickHouse tables with support for JSON and RowBinary insert formats, authentication, TLS/mTLS, field mappings, metadata injection, batching, and retry.

Rationale

ClickHouse is one of the most widely used columnar databases for real-time analytics. Having a native sink connector allows iggy users to stream messages directly into ClickHouse tables without any intermediate tooling.

What changed?

  • Messages consumed from iggy topics are now insertable into ClickHouse tables in two formats: JSON(JSONEachRow) for structured data with field-level access, and Rowbinary for high-throughput payload storage
  • JSON inserts support field mappings — specific nested JSON paths can be projected to named ClickHouse columns (e.g. address.city → city)
  • Iggy message metadata (stream, topic, partition, offset, checksum, timestamps) can be optionally injected as iggy_-prefixed columns alongside the payload
  • Batches are chunked to a configurable max_batch_size and retried with exponential backoff on transient network/timeout errors; permanent errors abort immediately
  • The connector authenticates via username/password credentials, JWT token, or no auth; TLS server verification and mTLS client certificates are supported without requiring OS trust store changes
  • LZ4 compression is enabled by default to reduce network overhead on insert traffic
  • Five integration tests run against a live ClickHouse container (testcontainers) and validate end-to-end row insertion for each supported format and metadata combination

Local Execution

All unit and integration tests passed locally.

All the quality checks executed.

cargo fmt --all
cargo clippy --all-targets --all-features -- -D warnings
cargo build
cargo test
cargo machete
cargo sort --workspace

Integration tests run against a live ClickHouse 25.3-alpine container via testcontainers:
cargo test -p integration -- connectors::clickhouse::clickhouse_sink --nocapture

All 5 integration test variants passed:

test_json_sink — JSON insert without metadata
test_json_sink_with_metadata — JSON insert with iggy_ metadata columns
test_json_sink_with_field_mappings — JSON insert with field-to-column mappings
test_rowbinary_sink — RowBinary insert without metadata
test_rowbinary_sink_with_metadata — RowBinary insert with iggy_ metadata columns

Screenshot 2026-03-08 at 2 34 27 PM Screenshot 2026-03-08 at 2 34 47 PM Screenshot 2026-03-08 at 2 35 00 PM

AI Usage

If AI tools were used, please answer:

  1. Which tools?
    Copilot(Claude Sonnet 4.6/Opus 4.6)

  2. Scope of usage?
    Autocomplete and generate functions. Mainly used in integration tests and documentation.

  3. How did you verify the generated code works correctly?
    The unit tests and integration tests pass. Verified data present in ClickHouse(local docker).

  4. Can you explain every line of the code if asked?
    Yes

Implements a ClickHouse sink connector for iggy supporting JSONEachRow
and RowBinary insert formats with full auth, TLS/mTLS, batching, and retry.

- Add ClickHouseSinkConfig with accessors and sane defaults (compression,
  batch size, retry, chunk size, metadata, verbose logging)
- Add ClickHouseClient::init with custom hyper+rustls connector for mTLS
  and custom CA support; falls back to default transport for public CAs
- Add ClickHouseInserter using buffered JSONEachRow writes
- Add GenericInserter trait abstracting typed Inserter<T> and ClickHouseInserter
  for polymorphic insert dispatch
- Add Sink impl with batch_insert/try_insert split, exponential backoff retry,
  field mapping, nested JSON path extraction, and iggy_ prefixed metadata injection
- Add integration tests via testcontainers (ClickHouse 25.3-alpine) covering
  JSON, JSON+metadata, JSON+field-mappings, RowBinary, and RowBinary+metadata
- Add README with config reference, schema examples, auth/TLS
  setup, performance tuning tips, and RowBinary chunk size caveat
@hemantkrsh hemantkrsh changed the title feat(connectors): add ClickHouse sink connector #2539 feat(connectors): add ClickHouse sink connector Mar 8, 2026
let state = self.state.lock().await;
info!(
"ClickHouse sink ID: {} processed {} messages with {} batch attempt failures and with {} batch failures",
self.id, state.messages_processed, state.insert_batch_failed, state.insert_batch_failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the third format argument should be state.insert_attempt_failed, not state.insert_batch_failed. Right now insert_attempt_failed is tracked (line 206) but never surfaced, and insert_batch_failed is printed twice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the second argument as insert_attempt_failed to cascade the attempts and last(third) arg for total batch failures.

}
//TODO: is_retryable check -- done
Err(ch_err) if retry && retry_count < max_retries && is_retryable(&ch_err) => {
let delay = base_delay * 2u32.pow(retry_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_count is u32 (same type as max_retry() which returns u32), and max_retry is user-configurable via config with no upper bound validation. If a user sets max_retry to 32 or higher, 2u32.pow(32) overflows and panics in debug mode (or silently wraps to 0 in release mode, causing a zero-duration delay). Even values like 31 yield 2^31 = 2_147_483_648 which, multiplied by base_delay, can overflow the Duration multiplication.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled this by capping the max_retries to 10 and max delay to 15 minutes to avoid unbounded or unreasonably high delay between retries.

@codecov
Copy link

codecov bot commented Mar 9, 2026

Codecov Report

❌ Patch coverage is 84.48980% with 152 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.48%. Comparing base (33bee3d) to head (27ca2d2).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/sinks/clickhouse_sink/src/sink.rs 86.67% 49 Missing and 22 partials ⚠️
...ors/sinks/clickhouse_sink/src/clickhouse_client.rs 43.18% 47 Missing and 3 partials ⚠️
core/connectors/sinks/clickhouse_sink/src/lib.rs 87.22% 14 Missing and 9 partials ⚠️
...s/sinks/clickhouse_sink/src/clickhouse_inserter.rs 92.53% 2 Missing and 3 partials ⚠️
...tors/sinks/clickhouse_sink/src/generic_inserter.rs 97.32% 0 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2891      +/-   ##
============================================
+ Coverage     68.33%   68.48%   +0.14%     
  Complexity      739      739              
============================================
  Files          1053     1058       +5     
  Lines         84763    85743     +980     
  Branches      61297    62289     +992     
============================================
+ Hits          57923    58717     +794     
- Misses        24468    24601     +133     
- Partials       2372     2425      +53     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 6.27% <ø> (ø)
java 54.83% <ø> (ø)
node 92.25% <ø> (-0.05%) ⬇️
python 81.57% <ø> (ø)
rust 70.23% <84.48%> (+0.19%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...tors/sinks/clickhouse_sink/src/generic_inserter.rs 97.32% <97.32%> (ø)
...s/sinks/clickhouse_sink/src/clickhouse_inserter.rs 92.53% <92.53%> (ø)
core/connectors/sinks/clickhouse_sink/src/lib.rs 87.22% <87.22%> (ø)
...ors/sinks/clickhouse_sink/src/clickhouse_client.rs 43.18% <43.18%> (ø)
core/connectors/sinks/clickhouse_sink/src/sink.rs 86.67% <86.67%> (ø)

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- retry configuration with limits and delays, README updated for the same.
- fix the log params in close()
- pre-commit/ CI hooks fix
@hubcio
Copy link
Contributor

hubcio commented Mar 10, 2026

hey @hemantkrsh, as you probably noticed @kriti-sc implemented same connector. please hop on discord so we can discuss the future of both PRs: https://discord.com/channels/1144142576266530928/1423706109402808504/1480863201775063151

@hubcio
Copy link
Contributor

hubcio commented Mar 15, 2026

Closing this in favor of @kriti-sc Clickhouse sink connector implemented in #2886

@hubcio hubcio closed this Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants