Skip to content

feat(connectors): Feature/influxdb connector Sink and Source#2933

Open
ryerraguntla wants to merge 23 commits intoapache:masterfrom
ryerraguntla:feat/influxdb-connector
Open

feat(connectors): Feature/influxdb connector Sink and Source#2933
ryerraguntla wants to merge 23 commits intoapache:masterfrom
ryerraguntla:feat/influxdb-connector

Conversation

@ryerraguntla
Copy link

Adds a native InfluxDB connector for Apache Iggy resolving #2700.

New crates:

  • iggy_connector_influxdb_sink — consumes Iggy messages and writes them to InfluxDB 2.x via line protocol (/api/v2/write)
  • iggy_connector_influxdb_source — polls InfluxDB Flux query results and produces messages to Iggy topics

Features:

  • Batch writes / cursor-based polling
  • Exponential backoff + jitter on retries
  • Circuit breaker after consecutive failures
  • HTTP 429 Retry-After header support
  • Configurable precision (ns, us, ms, s)
  • Full integration tests using Testcontainers (influxdb:2.7-alpine)

Closes #2700

What changed?

Implemented the following features based on a good connector framework
InfluxDB Sink Connector
Writes Iggy stream messages to InfluxDB v2 using the line protocol write API.
Data writing — Messages are serialised as line protocol with correct escaping for measurements, tag values, and string fields. Payloads can be written as JSON (validated and escaped), UTF-8 text, or Base64-encoded raw bytes. Timestamps are converted from Iggy's microsecond epoch to the configured InfluxDB precision (ns/us/ms/s, default µs). An offset-based nanosecond blend prevents silent deduplication when multiple messages in the same batch share the same microsecond timestamp. If a producer sets timestamp=0, the connector falls back to SystemTime::now() to avoid Year-1970 points.
Resilience — Writes are retried up to max_retries (default 3) on HTTP 429 and 5xx responses using exponential backoff with ±20% jitter. The Retry-After header is honoured in both integer-seconds and RFC 7231 HTTP-date formats before falling back to own backoff. A circuit breaker opens after circuit_breaker_threshold (default 5) consecutive batch failures and holds for circuit_breaker_cool_down (default 30s) before a half-open probe. Batch errors are captured and propagated to the runtime after processing all remaining sub-batches, preventing silent data loss.
Startup — open() retries the InfluxDB /health endpoint up to max_open_retries (default 10) with capped exponential backoff, so the connector recovers from transient InfluxDB restarts without manual intervention.
Metadata — Stream name, topic, and partition can be written as InfluxDB tags; message checksum and origin timestamp as fields. Each is individually togglable. The measurement name is configurable (default iggy_messages). Verbose logging mode promotes per-batch diagnostics from debug! to info! without code changes.

InfluxDB Source Connector
Polls InfluxDB v2 via the Flux query API and publishes results to Iggy topics as structured JSON messages.
Incremental polling — Tracks max(_time) from each Flux response as a cursor, advanced by 1 ns before storing to prevent boundary-point re-delivery. The cursor is templated into any $cursor and $limit placeholder in the user-supplied Flux query, supporting arbitrary range, filter, pivot, and aggregation chains. The cursor and row count are persisted via ConnectorState so polling resumes exactly where it left off after a restart.
CSV parsing — Handles InfluxDB annotated CSV output: skips #group, #datatype, and #default annotation rows, detects header rows by field names, and correctly handles blank-line separators between multi-table results.
Message format — Each row is published as a structured JSON envelope {measurement, field, timestamp, value, row} by default. When payload_column is set, the raw field value is extracted and emitted in the configured format (JSON, Text, or Raw/Base64).
Resilience — Same exponential backoff, jitter, Retry-After parsing, and circuit breaker as the sink. open() retries the /health endpoint before declaring ready. close() drops the reqwest client to release all connection pool resources.
Configuration — cursor_field (default _time) and initial_offset allow operators to replay from any point in the bucket. poll_interval, batch_size, and timeout are all tunable. Verbose logging toggles per-poll diagnostics between debug! and info!.

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

If AI tools were used, please answer:

  1. Which tools? GitHub Copilot, Claude
  2. Scope of usage? autocomplete, generated functions
  3. How did you verify the generated code works correctly? - Yes Verified by testing
  4. Can you explain every line of the code if asked? Yes

ryerraguntla and others added 23 commits March 14, 2026 14:21
Add example TOML configuration files for InfluxDB source and sink connectors under core/connectors/runtime/example_config/connectors/influx. Each file includes connector metadata (type, key, name, path, plugin format) and stream/plugin_config sections. The sink config defines connection (url, org, bucket, token), measurement/precision, payload and metadata/tag options, batching, retries and timeouts. The source config provides connection and token, a Flux query template using $cursor and $limit, polling, batch/cursor settings, initial offset, and retry/timeout settings. These serve as runtime examples for connector development and testing.
Improve resilience of InfluxDB source and sink connectors. Adds exponential backoff with ±20% jitter for open/poll/write retries, honours Retry-After on HTTP 429, and caps backoff delays. Introduces a simple consecutive-failure circuit breaker (threshold + cool-down) to avoid hammering unavailable InfluxDB instances and new config fields (max_open_retries, open_retry_max_delay, circuit_breaker_threshold, circuit_breaker_cool_down). Sink now propagates batch write errors (preventing silent data loss) and resets/records circuit state on success/failure. Consolidates helper functions (duration parsing, backoff, jitter, retry-after parsing, escaping), minor logging improvements, and type fixes. Adds integration test scaffolding for InfluxDB (docker-compose, sink/source tests) and updates connector Cargo.toml entries (rand/workspace and small feature tweaks in integration Cargo.toml).
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
Change InfluxDB connector defaults to nanosecond precision and increase sink default timeout to 30s. Updated DEFAULT_PRECISION and DEFAULT_TIMEOUT in the sink implementation, adjusted example connector TOML files and test runtime configs to set precision = "ns", and updated integration tests (influxdb_source.rs) to fix formatting, polling behavior, and assertions.  These changes standardize precision handling and give the sink more time for network operations.
Insert a small dummy comment in the file header and consolidate two separate `pub use influxdb::...` lines into a single grouped import (`pub use influxdb::{InfluxDbSinkFixture, InfluxDbSourceFixture};`). No functional changes—just minor cleanup for clarity.
Change example and test configs to use microsecond precision (precision = "us") instead of nanoseconds. Update InfluxDbSink formatting to emit numeric fields without the integer suffix (`i`) and adjust timestamp handling/comment for clarity. Comment out the source crate's dependency on the influxdb sink and remove the corresponding entry from Cargo.lock to avoid the circular/implicit dependency. These changes align line protocol output and configs with the chosen precision and simplify crate dependencies.
Adjust InfluxDB sink/source behavior and tests: treat message.timestamp as microseconds, provide fallback to current time when timestamp is unset, and add per-message offset to ensure unique timestamps to avoid deduplication. Change default precision handling (config -> ns, sink default constant -> us) and update timestamp conversion logic to operate on microseconds. Fix source polling to avoid unconditional sleep and to sleep when circuit breaker is open. Improve test fixtures: use ns precision for write URL, refine CSV query row counting logic, update sink test queries to check the offset field, and make tests use dynamic timestamps (Utc::now) instead of fixed constants. Added logging/debugging to help diagnose InfluxDB responses and sink point timestamps.
Remove the unused ONE_DAY_MICROS constant and change test message timestamp increments from one day to 1000 microseconds (1ms). This makes generated TestMessage timestamps closely spaced for integration tests and avoids large time deltas that were previously used.
@codecov
Copy link

codecov bot commented Mar 15, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 70.26%. Comparing base (3f651e0) to head (f86d30a).

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2933      +/-   ##
============================================
- Coverage     70.27%   70.26%   -0.02%     
  Complexity      862      862              
============================================
  Files          1028     1028              
  Lines         85279    85279              
  Branches      62656    62666      +10     
============================================
- Hits          59932    59923       -9     
  Misses        22833    22833              
- Partials       2514     2523       +9     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 36.37% <ø> (ø)
java 59.87% <ø> (ø)
node 91.37% <ø> (-0.15%) ⬇️
python 81.43% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.
see 6 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@hubcio
Copy link
Contributor

hubcio commented Mar 15, 2026

@ryerraguntla you said that precommit hooks were ran, yet clippy is failing. how is that possible? ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

InfluxDB Connector (Sink, Source) for Apache Iggy

2 participants