feat(connectors): Feature/influxdb connector Sink and Source#2933
Open
ryerraguntla wants to merge 23 commits intoapache:masterfrom
Open
feat(connectors): Feature/influxdb connector Sink and Source#2933ryerraguntla wants to merge 23 commits intoapache:masterfrom
ryerraguntla wants to merge 23 commits intoapache:masterfrom
Conversation
Add example TOML configuration files for InfluxDB source and sink connectors under core/connectors/runtime/example_config/connectors/influx. Each file includes connector metadata (type, key, name, path, plugin format) and stream/plugin_config sections. The sink config defines connection (url, org, bucket, token), measurement/precision, payload and metadata/tag options, batching, retries and timeouts. The source config provides connection and token, a Flux query template using $cursor and $limit, polling, batch/cursor settings, initial offset, and retry/timeout settings. These serve as runtime examples for connector development and testing.
Improve resilience of InfluxDB source and sink connectors. Adds exponential backoff with ±20% jitter for open/poll/write retries, honours Retry-After on HTTP 429, and caps backoff delays. Introduces a simple consecutive-failure circuit breaker (threshold + cool-down) to avoid hammering unavailable InfluxDB instances and new config fields (max_open_retries, open_retry_max_delay, circuit_breaker_threshold, circuit_breaker_cool_down). Sink now propagates batch write errors (preventing silent data loss) and resets/records circuit state on success/failure. Consolidates helper functions (duration parsing, backoff, jitter, retry-after parsing, escaping), minor logging improvements, and type fixes. Adds integration test scaffolding for InfluxDB (docker-compose, sink/source tests) and updates connector Cargo.toml entries (rand/workspace and small feature tweaks in integration Cargo.toml).
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
…us brace escaping
Change InfluxDB connector defaults to nanosecond precision and increase sink default timeout to 30s. Updated DEFAULT_PRECISION and DEFAULT_TIMEOUT in the sink implementation, adjusted example connector TOML files and test runtime configs to set precision = "ns", and updated integration tests (influxdb_source.rs) to fix formatting, polling behavior, and assertions. These changes standardize precision handling and give the sink more time for network operations.
Insert a small dummy comment in the file header and consolidate two separate `pub use influxdb::...` lines into a single grouped import (`pub use influxdb::{InfluxDbSinkFixture, InfluxDbSourceFixture};`). No functional changes—just minor cleanup for clarity.
Change example and test configs to use microsecond precision (precision = "us") instead of nanoseconds. Update InfluxDbSink formatting to emit numeric fields without the integer suffix (`i`) and adjust timestamp handling/comment for clarity. Comment out the source crate's dependency on the influxdb sink and remove the corresponding entry from Cargo.lock to avoid the circular/implicit dependency. These changes align line protocol output and configs with the chosen precision and simplify crate dependencies.
Adjust InfluxDB sink/source behavior and tests: treat message.timestamp as microseconds, provide fallback to current time when timestamp is unset, and add per-message offset to ensure unique timestamps to avoid deduplication. Change default precision handling (config -> ns, sink default constant -> us) and update timestamp conversion logic to operate on microseconds. Fix source polling to avoid unconditional sleep and to sleep when circuit breaker is open. Improve test fixtures: use ns precision for write URL, refine CSV query row counting logic, update sink test queries to check the offset field, and make tests use dynamic timestamps (Utc::now) instead of fixed constants. Added logging/debugging to help diagnose InfluxDB responses and sink point timestamps.
Remove the unused ONE_DAY_MICROS constant and change test message timestamp increments from one day to 1000 microseconds (1ms). This makes generated TestMessage timestamps closely spaced for integration tests and avoids large time deltas that were previously used.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #2933 +/- ##
============================================
- Coverage 70.27% 70.26% -0.02%
Complexity 862 862
============================================
Files 1028 1028
Lines 85279 85279
Branches 62656 62666 +10
============================================
- Hits 59932 59923 -9
Misses 22833 22833
- Partials 2514 2523 +9
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Contributor
|
@ryerraguntla you said that precommit hooks were ran, yet clippy is failing. how is that possible? ;) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds a native InfluxDB connector for Apache Iggy resolving #2700.
New crates:
iggy_connector_influxdb_sink— consumes Iggy messages and writes them to InfluxDB 2.x via line protocol (/api/v2/write)iggy_connector_influxdb_source— polls InfluxDB Flux query results and produces messages to Iggy topicsFeatures:
ns,us,ms,s)Closes #2700
What changed?
Implemented the following features based on a good connector framework
InfluxDB Sink Connector
Writes Iggy stream messages to InfluxDB v2 using the line protocol write API.
Data writing — Messages are serialised as line protocol with correct escaping for measurements, tag values, and string fields. Payloads can be written as JSON (validated and escaped), UTF-8 text, or Base64-encoded raw bytes. Timestamps are converted from Iggy's microsecond epoch to the configured InfluxDB precision (ns/us/ms/s, default µs). An offset-based nanosecond blend prevents silent deduplication when multiple messages in the same batch share the same microsecond timestamp. If a producer sets timestamp=0, the connector falls back to SystemTime::now() to avoid Year-1970 points.
Resilience — Writes are retried up to max_retries (default 3) on HTTP 429 and 5xx responses using exponential backoff with ±20% jitter. The Retry-After header is honoured in both integer-seconds and RFC 7231 HTTP-date formats before falling back to own backoff. A circuit breaker opens after circuit_breaker_threshold (default 5) consecutive batch failures and holds for circuit_breaker_cool_down (default 30s) before a half-open probe. Batch errors are captured and propagated to the runtime after processing all remaining sub-batches, preventing silent data loss.
Startup — open() retries the InfluxDB /health endpoint up to max_open_retries (default 10) with capped exponential backoff, so the connector recovers from transient InfluxDB restarts without manual intervention.
Metadata — Stream name, topic, and partition can be written as InfluxDB tags; message checksum and origin timestamp as fields. Each is individually togglable. The measurement name is configurable (default iggy_messages). Verbose logging mode promotes per-batch diagnostics from debug! to info! without code changes.
InfluxDB Source Connector
Polls InfluxDB v2 via the Flux query API and publishes results to Iggy topics as structured JSON messages.
Incremental polling — Tracks max(_time) from each Flux response as a cursor, advanced by 1 ns before storing to prevent boundary-point re-delivery. The cursor is templated into any $cursor and $limit placeholder in the user-supplied Flux query, supporting arbitrary range, filter, pivot, and aggregation chains. The cursor and row count are persisted via ConnectorState so polling resumes exactly where it left off after a restart.
CSV parsing — Handles InfluxDB annotated CSV output: skips #group, #datatype, and #default annotation rows, detects header rows by field names, and correctly handles blank-line separators between multi-table results.
Message format — Each row is published as a structured JSON envelope {measurement, field, timestamp, value, row} by default. When payload_column is set, the raw field value is extracted and emitted in the configured format (JSON, Text, or Raw/Base64).
Resilience — Same exponential backoff, jitter, Retry-After parsing, and circuit breaker as the sink. open() retries the /health endpoint before declaring ready. close() drops the reqwest client to release all connection pool resources.
Configuration — cursor_field (default _time) and initial_offset allow operators to replay from any point in the bucket. poll_interval, batch_size, and timeout are all tunable. Verbose logging toggles per-poll diagnostics between debug! and info!.
Local Execution
AI Usage
If AI tools were used, please answer: