feat(connectors): add ClickHouse sink connector#2891
feat(connectors): add ClickHouse sink connector#2891hemantkrsh wants to merge 2 commits intoapache:masterfrom
Conversation
Implements a ClickHouse sink connector for iggy supporting JSONEachRow and RowBinary insert formats with full auth, TLS/mTLS, batching, and retry. - Add ClickHouseSinkConfig with accessors and sane defaults (compression, batch size, retry, chunk size, metadata, verbose logging) - Add ClickHouseClient::init with custom hyper+rustls connector for mTLS and custom CA support; falls back to default transport for public CAs - Add ClickHouseInserter using buffered JSONEachRow writes - Add GenericInserter trait abstracting typed Inserter<T> and ClickHouseInserter for polymorphic insert dispatch - Add Sink impl with batch_insert/try_insert split, exponential backoff retry, field mapping, nested JSON path extraction, and iggy_ prefixed metadata injection - Add integration tests via testcontainers (ClickHouse 25.3-alpine) covering JSON, JSON+metadata, JSON+field-mappings, RowBinary, and RowBinary+metadata - Add README with config reference, schema examples, auth/TLS setup, performance tuning tips, and RowBinary chunk size caveat
| let state = self.state.lock().await; | ||
| info!( | ||
| "ClickHouse sink ID: {} processed {} messages with {} batch attempt failures and with {} batch failures", | ||
| self.id, state.messages_processed, state.insert_batch_failed, state.insert_batch_failed |
There was a problem hiding this comment.
the third format argument should be state.insert_attempt_failed, not state.insert_batch_failed. Right now insert_attempt_failed is tracked (line 206) but never surfaced, and insert_batch_failed is printed twice.
There was a problem hiding this comment.
Updated the second argument as insert_attempt_failed to cascade the attempts and last(third) arg for total batch failures.
| } | ||
| //TODO: is_retryable check -- done | ||
| Err(ch_err) if retry && retry_count < max_retries && is_retryable(&ch_err) => { | ||
| let delay = base_delay * 2u32.pow(retry_count); |
There was a problem hiding this comment.
retry_count is u32 (same type as max_retry() which returns u32), and max_retry is user-configurable via config with no upper bound validation. If a user sets max_retry to 32 or higher, 2u32.pow(32) overflows and panics in debug mode (or silently wraps to 0 in release mode, causing a zero-duration delay). Even values like 31 yield 2^31 = 2_147_483_648 which, multiplied by base_delay, can overflow the Duration multiplication.
There was a problem hiding this comment.
Handled this by capping the max_retries to 10 and max delay to 15 minutes to avoid unbounded or unreasonably high delay between retries.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2891 +/- ##
============================================
+ Coverage 68.33% 68.48% +0.14%
Complexity 739 739
============================================
Files 1053 1058 +5
Lines 84763 85743 +980
Branches 61297 62289 +992
============================================
+ Hits 57923 58717 +794
- Misses 24468 24601 +133
- Partials 2372 2425 +53
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
- retry configuration with limits and delays, README updated for the same. - fix the log params in close() - pre-commit/ CI hooks fix
|
hey @hemantkrsh, as you probably noticed @kriti-sc implemented same connector. please hop on discord so we can discuss the future of both PRs: https://discord.com/channels/1144142576266530928/1423706109402808504/1480863201775063151 |
Which issue does this PR close?
Addresses the ClickHouse sink connector tracked in #2539, implementing end-to-end message delivery from iggy topics into ClickHouse tables with support for JSON and RowBinary insert formats, authentication, TLS/mTLS, field mappings, metadata injection, batching, and retry.
Rationale
ClickHouse is one of the most widely used columnar databases for real-time analytics. Having a native sink connector allows iggy users to stream messages directly into ClickHouse tables without any intermediate tooling.
What changed?
Local Execution
All unit and integration tests passed locally.
All the quality checks executed.
Integration tests run against a live ClickHouse 25.3-alpine container via testcontainers:
cargo test -p integration -- connectors::clickhouse::clickhouse_sink --nocaptureAll 5 integration test variants passed:
test_json_sink— JSON insert without metadatatest_json_sink_with_metadata— JSON insert with iggy_ metadata columnstest_json_sink_with_field_mappings— JSON insert with field-to-column mappingstest_rowbinary_sink— RowBinary insert without metadatatest_rowbinary_sink_with_metadata— RowBinary insert with iggy_ metadata columnsAI Usage
If AI tools were used, please answer:
Which tools?
Copilot(Claude Sonnet 4.6/Opus 4.6)
Scope of usage?
Autocomplete and generate functions. Mainly used in integration tests and documentation.
How did you verify the generated code works correctly?
The unit tests and integration tests pass. Verified data present in ClickHouse(local docker).
Can you explain every line of the code if asked?
Yes