diff --git a/Cargo.lock b/Cargo.lock index 6f4d7aa024..63100a61c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "ctor" version = "0.6.3" @@ -5433,6 +5454,47 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_influxdb_sink" +version = "0.2.2-edge.1" +dependencies = [ + "async-trait", + "base64 0.22.1", + "dashmap", + "futures", + "humantime", + "iggy_connector_sdk", + "once_cell", + "rand 0.10.0", + "reqwest 0.13.2", + "serde", + "serde_json", + "tokio", + "tracing", +] + +[[package]] +name = "iggy_connector_influxdb_source" +version = "0.2.2-edge.1" +dependencies = [ + "async-trait", + "base64 0.22.1", + "csv", + "dashmap", + "futures", + "humantime", + "iggy_common", + "iggy_connector_sdk", + "once_cell", + "rand 0.10.0", + "reqwest 0.13.2", + "serde", + "serde_json", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "iggy_connector_mongodb_sink" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index d914767715..a92b9b34cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,11 +35,13 @@ members = [ "core/connectors/sdk", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/iceberg_sink", + "core/connectors/sinks/influxdb_sink", "core/connectors/sinks/mongodb_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", + "core/connectors/sources/influxdb_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -118,6 +120,7 @@ consensus = { path = "core/consensus" } console-subscriber = "0.5.0" crossbeam = "0.8.4" crossfire = "3.1.5" +csv = "1.3.1" ctor = "0.6.3" ctrlc = { version = "3.5", features = ["termination"] } cucumber = "0.22" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index a4069999ca..e03299f12a 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -218,6 +218,8 @@ crossterm_winapi: 0.9.1, "MIT", crunchy: 0.2.4, "MIT", crypto-bigint: 0.5.5, "Apache-2.0 OR MIT", crypto-common: 0.1.7, "Apache-2.0 OR MIT", +csv: 1.4.0, "MIT OR Unlicense", +csv-core: 0.1.13, "MIT OR Unlicense", ctor: 0.6.3, "Apache-2.0 OR MIT", ctor-proc-macro: 0.0.7, "Apache-2.0 OR MIT", ctr: 0.9.2, "Apache-2.0 OR MIT", @@ -457,6 +459,8 @@ iggy_common: 0.9.2-edge.1, "Apache-2.0", iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0", iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0", iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0", +iggy_connector_influxdb_sink: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_influxdb_source: 0.2.2-edge.1, "Apache-2.0", iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0", iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0", iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0", diff --git a/README.md b/README.md index c6ad969517..eac0063a57 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,9 @@ --- -**Iggy** is a persistent message streaming platform written in Rust, supporting QUIC, WebSocket, TCP (custom binary specification) and HTTP (regular REST API) transport protocols, **capable of processing millions of messages per second at ultra-low latency**. +**Iggy** is a persistent message streaming platform written in Rust, supporting QUIC, WebSocket, TCP (custom binary specification) and HTTP (regular REST API) transport protocols, capable of processing millions of messages per second with low latency. -Iggy provides **exceptionally high throughput and performance** while utilizing minimal computing resources. +Iggy provides high throughput and predictable performance while utilizing minimal computing resources. This is **not yet another extension** running on top of existing infrastructure, such as Kafka or SQL database. @@ -174,7 +174,7 @@ fields = ["email", "created_at"] ## Model Context Protocol -The [Model Context Protocol](https://modelcontextprotocol.io) (MCP) is an open protocol that standardizes how applications provide context to LLMs. The **[Iggy MCP Server](https://github.com/apache/iggy/tree/master/core/ai/mcp)** is an implementation of the MCP protocol for the message streaming infrastructure. It can be used to provide context to LLMs in real-time, allowing for more accurate and relevant responses. +The [Model Context Protocol](https://modelcontextprotocol.io) (MCP) is an open protocol that standardizes how applications provide context to LLMs. The **[Iggy MCP Server](https://github.com/apache/iggy/tree/master/core/ai/mcp)** is an implementation of the MCP protocol for the message streaming infrastructure. It can be used to provide context to LLMs in real-time. ![server](assets/iggy_mcp_server.png) @@ -399,7 +399,7 @@ while let Some(message) = consumer.next().await { ## Benchmarks -**Benchmarks should be the first-class citizens**. We believe that performance is crucial for any system, and we strive to provide the best possible performance for our users. Please check, why we believe that the **[transparent +Please check, why we believe that the **[transparent benchmarking](https://iggy.apache.org/blogs/2025/02/17/transparent-benchmarks)** is so important. We've also built the **[benchmarking platform](https://benchmarks.iggy.apache.org)** where anyone can upload the benchmarks and compare the results with others. Source code for the platform is available in the `core/bench/dashboard` directory. @@ -464,7 +464,7 @@ These benchmarks would start the server with the default configuration, create a For example, to run the benchmark for the already started server, provide the additional argument `--server-address 0.0.0.0:8090`. - **Iggy is already capable of processing millions of messages per second at the microseconds range for p99+ latency** Depending on the hardware, transport protocol (`quic`, `websocket`, `tcp` or `http`) and payload size (`messages-per-batch * message-size`) you might expect **over 5000 MB/s (e.g. 5M of 1 KB msg/sec) throughput for writes and reads**. + Depending on the hardware, transport protocol (`quic`, `websocket`, `tcp` or `http`) and payload size (`messages-per-batch * message-size`) you might expect **over 5000 MB/s (e.g. 5M of 1 KB msg/sec) throughput for writes and reads**. Please refer to the mentioned [benchmarking platform](https://benchmarks.iggy.apache.org) where you can browse the results achieved on the different hardware configurations, using the different Iggy server versions. diff --git a/core/connectors/runtime/example_config/connectors/influx/influxdb_sink.toml b/core/connectors/runtime/example_config/connectors/influx/influxdb_sink.toml new file mode 100644 index 0000000000..1378a9d34a --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/influx/influxdb_sink.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB sink" +path = "/target/release/libiggy_connector_influxdb_sink" +plugin_config_format = "toml" +verbose = false + +[[streams]] +stream = "events" +topics = ["influx_events"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "influxdb_sink" + +[plugin_config] +url = "http://localhost:8086" +org = "iggy" +bucket = "events" +token = "my_super_secret_token_123" +measurement = "iggy_messages" +precision = "ns" +batch_size = 500 +include_metadata = true +include_checksum = true +include_origin_timestamp = true +include_stream_tag = true +include_topic_tag = true +include_partition_tag = true +payload_format = "json" +max_retries = 3 +retry_delay = "1s" +timeout = "10s" diff --git a/core/connectors/runtime/example_config/connectors/influx/influxdb_source.toml b/core/connectors/runtime/example_config/connectors/influx/influxdb_source.toml new file mode 100644 index 0000000000..8422482564 --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/influx/influxdb_source.toml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB source" +path = "/target/release/libiggy_connector_influxdb_source" +plugin_config_format = "toml" +verbose = false + +[[streams]] +stream = "events" +topic = "influx_events" +schema = "json" +batch_length = 100 + +[plugin_config] +url = "http://localhost:8086" +org = "iggy_org" +token = "my_super_secret_token_123" +query = ''' +from(bucket: "iggy_bucket") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "iggy_messages") + |> filter(fn: (r) => r._time > time(v: "$cursor")) + |> sort(columns: ["_time"]) + |> limit(n: $limit) +''' +poll_interval = "5s" +batch_size = 500 +cursor_field = "_time" +initial_offset = "1970-01-01T00:00:00Z" +include_metadata = true +payload_format = "json" +max_retries = 3 +retry_delay = "1s" +timeout = "10s" +precision = "us" diff --git a/core/connectors/runtime/example_config/connectors/influxdb_source.toml b/core/connectors/runtime/example_config/connectors/influxdb_source.toml new file mode 100644 index 0000000000..55233294e8 --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/influxdb_source.toml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB source" +path = "target/release/libiggy_connector_influxdb_source" +plugin_config_format = "toml" +verbose = false + +[[streams]] +stream = "events" +topic = "influx_events" +schema = "json" +batch_length = 100 + +[plugin_config] +url = "http://localhost:8086" +org = "iggy" +token = "replace-with-token" +query = ''' +from(bucket: "events") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "iggy_messages") + |> filter(fn: (r) => r._time > time(v: "$cursor")) + |> sort(columns: ["_time"]) + |> limit(n: $limit) +''' +poll_interval = "5s" +batch_size = 500 +cursor_field = "_time" +initial_offset = "1970-01-01T00:00:00Z" +include_metadata = true +payload_format = "json" +max_retries = 3 +retry_delay = "1s" +timeout = "10s" +precision = "us" diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 8ba37a0830..43c5cada6e 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -341,7 +341,11 @@ pub enum Error { #[error("Invalid config")] InvalidConfig, #[error("Invalid record")] + InvalidConfigValue(String), + #[error("Invalid record")] InvalidRecord, + #[error("Invalid record value : {0}")] + InvalidRecordValue(String), #[error("Invalid transformer")] InvalidTransformer, #[error("HTTP request failed: {0}")] diff --git a/core/connectors/sinks/influxdb_sink/Cargo.toml b/core/connectors/sinks/influxdb_sink/Cargo.toml new file mode 100644 index 0000000000..0917bb4fdd --- /dev/null +++ b/core/connectors/sinks/influxdb_sink/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_influxdb_sink" +version = "0.2.2-edge.1" +description = "Iggy InfluxDB sink connector for storing stream messages as line protocol" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "influxdb", "sink"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "futures"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +humantime = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rand.workspace = true +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/influxdb_sink/config.toml b/core/connectors/sinks/influxdb_sink/config.toml new file mode 100644 index 0000000000..621c6e7f77 --- /dev/null +++ b/core/connectors/sinks/influxdb_sink/config.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB sink" +path = "../../target/release/libiggy_connector_influxdb_sink" +verbose = false + +[[streams]] +stream = "user_events" +topics = ["users", "orders"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "influxdb_sink" + +[plugin_config] +url = "http://localhost:8086" +org = "iggy" +bucket = "events" +token = "replace-with-token" +measurement = "iggy_messages" +precision = "ns" +batch_size = 500 +include_metadata = true +include_checksum = true +include_origin_timestamp = true +include_stream_tag = true +include_topic_tag = true +include_partition_tag = true +payload_format = "json" +max_retries = 3 +retry_delay = "1s" +timeout = "10s" diff --git a/core/connectors/sinks/influxdb_sink/src/lib.rs b/core/connectors/sinks/influxdb_sink/src/lib.rs new file mode 100644 index 0000000000..cc9b95c07b --- /dev/null +++ b/core/connectors/sinks/influxdb_sink/src/lib.rs @@ -0,0 +1,774 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use base64::{Engine as _, engine::general_purpose}; +use humantime::Duration as HumanDuration; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, sink_connector, +}; +use rand::RngExt; +use reqwest::{Client, StatusCode, Url}; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; + +sink_connector!(InfluxDbSink); + +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_PRECISION: &str = "us"; +// Maximum attempts for open() connectivity retries +const DEFAULT_MAX_OPEN_RETRIES: u32 = 10; +// Cap for exponential backoff in open() — never wait longer than this +const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "60s"; +// How many consecutive batch failures open the circuit breaker +const DEFAULT_CIRCUIT_BREAKER_THRESHOLD: u32 = 5; +// How long the circuit stays open before allowing a probe attempt +const DEFAULT_CIRCUIT_COOL_DOWN: &str = "30s"; + +// --------------------------------------------------------------------------- +// Simple consecutive-failure circuit breaker +// --------------------------------------------------------------------------- +#[derive(Debug)] +struct CircuitBreaker { + threshold: u32, + consecutive_failures: AtomicU32, + open_until: Mutex>, + cool_down: Duration, +} + +impl CircuitBreaker { + fn new(threshold: u32, cool_down: Duration) -> Self { + CircuitBreaker { + threshold, + consecutive_failures: AtomicU32::new(0), + open_until: Mutex::new(None), + cool_down, + } + } + + /// Call when a batch write succeeds — resets failure count and closes circuit. + fn record_success(&self) { + self.consecutive_failures.store(0, Ordering::SeqCst); + } + + /// Call when a batch write fails after all retries — may open the circuit. + async fn record_failure(&self) { + let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1; + if failures >= self.threshold { + let mut guard = self.open_until.lock().await; + let deadline = tokio::time::Instant::now() + self.cool_down; + *guard = Some(deadline); + warn!( + "Circuit breaker OPENED after {failures} consecutive batch failures. \ + Pausing writes for {:?}.", + self.cool_down + ); + } + } + + /// Returns true if the circuit is currently open (writes should be skipped). + async fn is_open(&self) -> bool { + let mut guard = self.open_until.lock().await; + if let Some(deadline) = *guard { + if tokio::time::Instant::now() < deadline { + return true; + } + // Cool-down elapsed — allow one probe attempt (half-open state) + *guard = None; + self.consecutive_failures.store(0, Ordering::SeqCst); + info!("Circuit breaker entering HALF-OPEN state — probing InfluxDB."); + } + false + } +} + +// --------------------------------------------------------------------------- +// Main connector structs +// --------------------------------------------------------------------------- + +#[derive(Debug)] +pub struct InfluxDbSink { + pub id: u32, + config: InfluxDbSinkConfig, + client: Option, + state: Mutex, + verbose: bool, + retry_delay: Duration, + circuit_breaker: Arc, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InfluxDbSinkConfig { + pub url: String, + pub org: String, + pub bucket: String, + pub token: String, + pub measurement: Option, + pub precision: Option, + pub batch_size: Option, + pub include_metadata: Option, + pub include_checksum: Option, + pub include_origin_timestamp: Option, + pub include_stream_tag: Option, + pub include_topic_tag: Option, + pub include_partition_tag: Option, + pub payload_format: Option, + pub verbose_logging: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub timeout: Option, + // How many times open() will retry before giving up + pub max_open_retries: Option, + // Upper cap on open() backoff delay + pub open_retry_max_delay: Option, + // Circuit breaker configuration + pub circuit_breaker_threshold: Option, + pub circuit_breaker_cool_down: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum PayloadFormat { + #[default] + Json, + Text, + Base64, +} + +impl PayloadFormat { + fn from_config(value: Option<&str>) -> Self { + match value.map(|v| v.to_ascii_lowercase()).as_deref() { + Some("text") | Some("utf8") => PayloadFormat::Text, + Some("base64") | Some("raw") => PayloadFormat::Base64, + _ => PayloadFormat::Json, + } + } +} + +#[derive(Debug)] +struct State { + messages_processed: u64, + write_errors: u64, +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn parse_duration(value: Option<&str>, default_value: &str) -> Duration { + let raw = value.unwrap_or(default_value); + HumanDuration::from_str(raw) + .map(|d| d.into()) + .unwrap_or_else(|_| Duration::from_secs(1)) +} + +fn is_transient_status(status: StatusCode) -> bool { + status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() +} + +// Apply ±20% random jitter to a duration to spread retry storms +fn jitter(base: Duration) -> Duration { + let millis = base.as_millis() as u64; + let jitter_range = millis / 5; // 20% of base + if jitter_range == 0 { + return base; + } + let delta = rand::rng().random_range(0..=jitter_range * 2); + Duration::from_millis(millis.saturating_sub(jitter_range) + delta) +} + +// True exponential backoff: base * 2^attempt, capped at max_delay +fn exponential_backoff(base: Duration, attempt: u32, max_delay: Duration) -> Duration { + let factor = 2u64.saturating_pow(attempt); + let raw = Duration::from_millis(base.as_millis().saturating_mul(factor as u128) as u64); + raw.min(max_delay) +} + +// Parse Retry-After header value (integer seconds or HTTP date) +fn parse_retry_after(value: &str) -> Option { + if let Ok(secs) = value.trim().parse::() { + return Some(Duration::from_secs(secs)); + } + // HTTP-date fallback would require httpdate crate; return None to use own backoff + None +} + +fn escape_measurement(value: &str) -> String { + value + .replace('\\', "\\\\") + .replace(',', "\\,") + .replace(' ', "\\ ") +} + +fn escape_tag_value(value: &str) -> String { + value + .replace('\\', "\\\\") + .replace(',', "\\,") + .replace('=', "\\=") + .replace(' ', "\\ ") +} + +fn escape_field_string(value: &str) -> String { + value.replace('\\', "\\\\").replace('"', "\\\"") +} + +// --------------------------------------------------------------------------- +// InfluxDbSink implementation +// --------------------------------------------------------------------------- + +impl InfluxDbSink { + pub fn new(id: u32, config: InfluxDbSinkConfig) -> Self { + let verbose = config.verbose_logging.unwrap_or(false); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + + // Build circuit breaker from config + let cb_threshold = config + .circuit_breaker_threshold + .unwrap_or(DEFAULT_CIRCUIT_BREAKER_THRESHOLD); + let cb_cool_down = parse_duration( + config.circuit_breaker_cool_down.as_deref(), + DEFAULT_CIRCUIT_COOL_DOWN, + ); + + InfluxDbSink { + id, + config, + client: None, + state: Mutex::new(State { + messages_processed: 0, + write_errors: 0, + }), + verbose, + retry_delay, + circuit_breaker: Arc::new(CircuitBreaker::new(cb_threshold, cb_cool_down)), + } + } + + fn build_client(&self) -> Result { + let timeout = parse_duration(self.config.timeout.as_deref(), DEFAULT_TIMEOUT); + Client::builder() + .timeout(timeout) + .build() + .map_err(|e| Error::InitError(format!("Failed to create HTTP client: {e}"))) + } + + fn build_write_url(&self) -> Result { + let base = self.config.url.trim_end_matches('/'); + let mut url = Url::parse(&format!("{base}/api/v2/write")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + + let precision = self + .config + .precision + .as_deref() + .unwrap_or(DEFAULT_PRECISION); + url.query_pairs_mut() + .append_pair("org", &self.config.org) + .append_pair("bucket", &self.config.bucket) + .append_pair("precision", precision); + + Ok(url) + } + + fn build_health_url(&self) -> Result { + let base = self.config.url.trim_end_matches('/'); + Url::parse(&format!("{base}/health")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}"))) + } + + async fn check_connectivity(&self) -> Result<(), Error> { + let client = self.get_client()?; + let url = self.build_health_url()?; + + let response = client + .get(url) + .send() + .await + .map_err(|e| Error::Connection(format!("InfluxDB health check failed: {e}")))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + return Err(Error::Connection(format!( + "InfluxDB health check returned status {status}: {body}" + ))); + } + + Ok(()) + } + + // Retry connectivity check with exponential backoff + jitter + // instead of failing hard on the first attempt. + async fn check_connectivity_with_retry(&self) -> Result<(), Error> { + let max_open_retries = self + .config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) + .max(1); + + let max_delay = parse_duration( + self.config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, + ); + + let mut attempt = 0u32; + loop { + match self.check_connectivity().await { + Ok(()) => { + if attempt > 0 { + info!( + "InfluxDB connectivity established after {attempt} retries \ + for sink connector ID: {}", + self.id + ); + } + return Ok(()); + } + Err(e) => { + attempt += 1; + if attempt >= max_open_retries { + error!( + "InfluxDB connectivity check failed after {attempt} attempts \ + for sink connector ID: {}. Giving up: {e}", + self.id + ); + return Err(e); + } + // Exponential backoff, with jitter + let backoff = jitter(exponential_backoff(self.retry_delay, attempt, max_delay)); + warn!( + "InfluxDB health check failed (attempt {attempt}/{max_open_retries}) \ + for sink connector ID: {}. Retrying in {backoff:?}: {e}", + self.id + ); + tokio::time::sleep(backoff).await; + } + } + } + } + + fn get_client(&self) -> Result<&Client, Error> { + self.client + .as_ref() + .ok_or_else(|| Error::Connection("InfluxDB client is not initialized".to_string())) + } + + fn measurement(&self) -> &str { + self.config + .measurement + .as_deref() + .unwrap_or("iggy_messages") + } + + fn payload_format(&self) -> PayloadFormat { + PayloadFormat::from_config(self.config.payload_format.as_deref()) + } + + fn timestamp_precision(&self) -> &str { + self.config + .precision + .as_deref() + .unwrap_or(DEFAULT_PRECISION) + } + + fn get_max_retries(&self) -> u32 { + self.config + .max_retries + .unwrap_or(DEFAULT_MAX_RETRIES) + .max(1) + } + + fn to_precision_timestamp(&self, micros: u64) -> u64 { + match self.timestamp_precision() { + "ns" => micros.saturating_mul(1_000), + "us" => micros, + "ms" => micros / 1_000, + "s" => micros / 1_000_000, + _ => micros, + } + } + fn line_from_message( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: &ConsumedMessage, + ) -> Result { + let include_metadata = self.config.include_metadata.unwrap_or(true); + let include_checksum = self.config.include_checksum.unwrap_or(true); + let include_origin_timestamp = self.config.include_origin_timestamp.unwrap_or(true); + + let mut tags = Vec::new(); + if include_metadata && self.config.include_stream_tag.unwrap_or(true) { + tags.push(format!( + "stream={}", + escape_tag_value(&topic_metadata.stream) + )); + } + if include_metadata && self.config.include_topic_tag.unwrap_or(true) { + tags.push(format!("topic={}", escape_tag_value(&topic_metadata.topic))); + } + if include_metadata && self.config.include_partition_tag.unwrap_or(true) { + tags.push(format!("partition={}", messages_metadata.partition_id)); + } + + let mut fields = vec![ + format!( + "message_id=\"{}\"", + escape_field_string(&message.id.to_string()) + ), + format!("offset={}i", message.offset as i64), + ]; + + if include_metadata && !self.config.include_stream_tag.unwrap_or(true) { + fields.push(format!( + "iggy_stream=\"{}\"", + escape_field_string(&topic_metadata.stream) + )); + } + if include_metadata && !self.config.include_topic_tag.unwrap_or(true) { + fields.push(format!( + "iggy_topic=\"{}\"", + escape_field_string(&topic_metadata.topic) + )); + } + if include_metadata && !self.config.include_partition_tag.unwrap_or(true) { + fields.push(format!( + "iggy_partition={}", + messages_metadata.partition_id as i64 + )); + } + + if include_checksum { + fields.push(format!("iggy_checksum={}", message.checksum as i64)); + } + if include_origin_timestamp { + fields.push(format!( + "iggy_origin_timestamp={}", + message.origin_timestamp as i64 + )); + } + + let payload_bytes = message.payload.clone().try_into_vec().map_err(|e| { + Error::CannotStoreData(format!("Failed to convert payload to bytes: {e}")) + })?; + + match self.payload_format() { + PayloadFormat::Json => { + let value: serde_json::Value = + serde_json::from_slice(&payload_bytes).map_err(|e| { + Error::CannotStoreData(format!( + "Payload format is json but payload is invalid JSON: {e}" + )) + })?; + let compact = serde_json::to_string(&value).map_err(|e| { + Error::CannotStoreData(format!("Failed to serialize JSON payload: {e}")) + })?; + fields.push(format!( + "payload_json=\"{}\"", + escape_field_string(&compact) + )); + } + PayloadFormat::Text => { + let text = String::from_utf8(payload_bytes).map_err(|e| { + Error::CannotStoreData(format!( + "Payload format is text but payload is invalid UTF-8: {e}" + )) + })?; + fields.push(format!("payload_text=\"{}\"", escape_field_string(&text))); + } + PayloadFormat::Base64 => { + let encoded = general_purpose::STANDARD.encode(payload_bytes); + fields.push(format!( + "payload_base64=\"{}\"", + escape_field_string(&encoded) + )); + } + } + + let measurement = escape_measurement(self.measurement()); + let tags_fragment = if tags.is_empty() { + String::new() + } else { + format!(",{}", tags.join(",")) + }; + + // message.timestamp is microseconds since Unix epoch. + // If it is 0 (unset by the producer), fall back to now() so points are + // not stored at Unix epoch (year 1970), which falls outside every + // range(start: -1h) query window. + // We also blend the message offset as sub-microsecond nanoseconds so + // that multiple messages in the same batch get distinct timestamps and + // are not deduplicated by InfluxDB (same measurement+tags+time = 1 row). + let base_micros = if message.timestamp == 0 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as u64 + } else { + message.timestamp + }; + // Add offset mod 1000 as extra nanoseconds — shifts timestamp by at + // most 999 ns, which is imperceptible but unique per message. + let unique_micros = base_micros.saturating_add(message.offset % 1_000); + let ts = self.to_precision_timestamp(unique_micros); + + debug!( + "InfluxDB sink ID: {} point — offset={}, raw_ts={}, influx_ts={ts}", + self.id, message.offset, message.timestamp + ); + + Ok(format!( + "{measurement}{tags_fragment} {} {ts}", + fields.join(",") + )) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let mut lines = Vec::with_capacity(messages.len()); + for message in messages { + lines.push(self.line_from_message(topic_metadata, messages_metadata, message)?); + } + + let body = lines.join("\n"); + self.write_with_retry(body).await + } + + async fn write_with_retry(&self, body: String) -> Result<(), Error> { + let client = self.get_client()?; + let url = self.build_write_url()?; + let max_retries = self.get_max_retries(); + let token = self.config.token.clone(); + + // Cap for per-write backoff + let max_delay = parse_duration( + self.config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, + ); + + let mut attempts = 0u32; + loop { + let response_result = client + .post(url.clone()) + .header("Authorization", format!("Token {token}")) + .header("Content-Type", "text/plain; charset=utf-8") + .body(body.clone()) + .send() + .await; + + match response_result { + Ok(response) => { + let status = response.status(); + if status == StatusCode::NO_CONTENT || status == StatusCode::OK { + return Ok(()); + } + + // Honour Retry-After on 429 before our own backoff + let retry_after = if status == StatusCode::TOO_MANY_REQUESTS { + response + .headers() + .get("Retry-After") + .and_then(|v| v.to_str().ok()) + .and_then(parse_retry_after) + } else { + None + }; + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + attempts += 1; + if is_transient_status(status) && attempts < max_retries { + // Use server-supplied delay when available + let delay = retry_after.unwrap_or_else(|| { + // Exponential, with jitter + jitter(exponential_backoff(self.retry_delay, attempts, max_delay)) + }); + warn!( + "Transient InfluxDB write error (attempt {attempts}/{max_retries}): \ + {status}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + continue; + } + + return Err(Error::CannotStoreData(format!( + "InfluxDB write failed with status {status}: {body_text}" + ))); + } + Err(e) => { + attempts += 1; + if attempts < max_retries { + // Exponential, with jitter + let delay = + jitter(exponential_backoff(self.retry_delay, attempts, max_delay)); + warn!( + "Failed to send write request to InfluxDB \ + (attempt {attempts}/{max_retries}): {e}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + continue; + } + + return Err(Error::CannotStoreData(format!( + "InfluxDB write failed after {attempts} attempts: {e}" + ))); + } + } + } + } +} + +// --------------------------------------------------------------------------- +// Sink trait implementation +// --------------------------------------------------------------------------- + +#[async_trait] +impl Sink for InfluxDbSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening InfluxDB sink connector with ID: {}. Bucket: {}, org: {}", + self.id, self.config.bucket, self.config.org + ); + + self.client = Some(self.build_client()?); + + // Use retrying connectivity check instead of hard-fail + self.check_connectivity_with_retry().await?; + + info!( + "InfluxDB sink connector with ID: {} opened successfully", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let batch_size = self.config.batch_size.unwrap_or(500) as usize; + let total_messages = messages.len(); + + // Skip writes entirely if circuit breaker is open + if self.circuit_breaker.is_open().await { + warn!( + "InfluxDB sink ID: {} — circuit breaker is OPEN. \ + Skipping {} messages to avoid hammering a down InfluxDB.", + self.id, total_messages + ); + // Return an error so the runtime knows messages were not written + return Err(Error::CannotStoreData( + "Circuit breaker is open — InfluxDB write skipped".to_string(), + )); + } + + // Collect the first batch error rather than silently dropping + let mut first_error: Option = None; + + for batch in messages.chunks(batch_size.max(1)) { + match self + .process_batch(topic_metadata, &messages_metadata, batch) + .await + { + Ok(()) => { + // Successful write — reset circuit breaker + self.circuit_breaker.record_success(); + } + Err(e) => { + // Failed write — notify circuit breaker + self.circuit_breaker.record_failure().await; + + let mut state = self.state.lock().await; + state.write_errors += batch.len() as u64; + error!( + "InfluxDB sink ID: {} failed to write batch of {} messages: {e}", + self.id, + batch.len() + ); + drop(state); + + // Capture first error; continue attempting remaining + // batches to maximise data delivery, but record the failure. + if first_error.is_none() { + first_error = Some(e); + } + } + } + } + + let mut state = self.state.lock().await; + state.messages_processed += total_messages as u64; + + if self.verbose { + info!( + "InfluxDB sink ID: {} processed {} messages. \ + Total processed: {}, write errors: {}", + self.id, total_messages, state.messages_processed, state.write_errors + ); + } else { + debug!( + "InfluxDB sink ID: {} processed {} messages. \ + Total processed: {}, write errors: {}", + self.id, total_messages, state.messages_processed, state.write_errors + ); + } + + // ropagate the first batch error to the runtime so it can + // decide whether to retry, halt, or dead-letter — instead of returning Ok(()) + // and silently losing messages. + if let Some(err) = first_error { + return Err(err); + } + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + self.client = None; // release connection pool + let state = self.state.lock().await; + info!( + "InfluxDB sink connector with ID: {} closed. Processed: {}, errors: {}", + self.id, state.messages_processed, state.write_errors + ); + Ok(()) + } +} diff --git a/core/connectors/sources/influxdb_source/Cargo.toml b/core/connectors/sources/influxdb_source/Cargo.toml new file mode 100644 index 0000000000..97a55da601 --- /dev/null +++ b/core/connectors/sources/influxdb_source/Cargo.toml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_influxdb_source" +version = "0.2.2-edge.1" +description = "Iggy InfluxDB source connector for polling Flux query results" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "influxdb", "source"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "futures"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +csv = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +humantime = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rand.workspace = true +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/core/connectors/sources/influxdb_source/config.toml b/core/connectors/sources/influxdb_source/config.toml new file mode 100644 index 0000000000..3f7cd14980 --- /dev/null +++ b/core/connectors/sources/influxdb_source/config.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB source" +path = "../../target/release/libiggy_connector_influxdb_source" +verbose = false + +[[streams]] +stream = "user_events" +topic = "users" +schema = "json" +batch_length = 100 + +[plugin_config] +url = "http://localhost:8086" +org = "iggy" +token = "replace-with-token" +query = ''' +from(bucket: "events") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "iggy_messages") + |> filter(fn: (r) => r._time > time(v: "$cursor")) + |> sort(columns: ["_time"]) + |> limit(n: $limit) +''' +poll_interval = "5s" +batch_size = 500 +cursor_field = "_time" +initial_offset = "1970-01-01T00:00:00Z" +include_metadata = true +payload_format = "json" +max_retries = 3 +retry_delay = "1s" +timeout = "10s" diff --git a/core/connectors/sources/influxdb_source/src/lib.rs b/core/connectors/sources/influxdb_source/src/lib.rs new file mode 100644 index 0000000000..98dc2c5f68 --- /dev/null +++ b/core/connectors/sources/influxdb_source/src/lib.rs @@ -0,0 +1,807 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// ============================================================================= +// CHANGES FROM ORIGINAL — all fixes are marked with [FIX-SRC-N] comments: +// +// [FIX-SRC-1] open() now retries connectivity with exponential backoff+jitter +// instead of failing hard when InfluxDB is unavailable at startup. +// [FIX-SRC-2] run_query_with_retry() uses true exponential backoff (2^attempt) +// instead of linear (delay * attempt). +// [FIX-SRC-3] Added random jitter (±20%) to every retry delay to avoid +// thundering herd across multiple connector instances. +// [FIX-SRC-4] On HTTP 429 Too Many Requests, the Retry-After response header +// is parsed and honoured instead of using the fixed retry_delay. +// [FIX-SRC-5] Added a circuit breaker (ConsecutiveFailureBreaker) that opens +// after max_retries consecutive poll failures, pausing queries for +// a configurable cool-down before attempting again. +// [FIX-SRC-6] Added DEFAULT_MAX_OPEN_RETRIES / max_open_retries config field +// to control how many times open() retries before giving up. +// [FIX-SRC-7] Added DEFAULT_OPEN_RETRY_MAX_DELAY cap so backoff in open() +// doesn't grow unboundedly. +// ============================================================================= + +use async_trait::async_trait; +use base64::{Engine as _, engine::general_purpose}; +use csv::StringRecord; +use humantime::Duration as HumanDuration; +use iggy_common::{DateTime, Utc}; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, +}; +use rand::RngExt as _; +use reqwest::{Client, StatusCode, Url}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +source_connector!(InfluxDbSource); + +const CONNECTOR_NAME: &str = "InfluxDB source"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_POLL_INTERVAL: &str = "5s"; +const DEFAULT_TIMEOUT: &str = "10s"; +const DEFAULT_CURSOR: &str = "1970-01-01T00:00:00Z"; +// [FIX-SRC-6] Maximum attempts for open() connectivity retries +const DEFAULT_MAX_OPEN_RETRIES: u32 = 10; +// [FIX-SRC-7] Cap for exponential backoff in open() — never wait longer than this +const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "60s"; +// [FIX-SRC-5] How many consecutive poll failures open the circuit breaker +const DEFAULT_CIRCUIT_BREAKER_THRESHOLD: u32 = 5; +// [FIX-SRC-5] How long the circuit stays open before allowing a probe attempt +const DEFAULT_CIRCUIT_COOL_DOWN: &str = "30s"; + +// --------------------------------------------------------------------------- +// [FIX-SRC-5] Simple consecutive-failure circuit breaker +// --------------------------------------------------------------------------- +#[derive(Debug)] +struct CircuitBreaker { + threshold: u32, + consecutive_failures: AtomicU32, + open_until: Mutex>, + cool_down: Duration, +} + +impl CircuitBreaker { + fn new(threshold: u32, cool_down: Duration) -> Self { + CircuitBreaker { + threshold, + consecutive_failures: AtomicU32::new(0), + open_until: Mutex::new(None), + cool_down, + } + } + + /// Call when a poll attempt succeeds — resets failure count and closes circuit. + fn record_success(&self) { + self.consecutive_failures.store(0, Ordering::SeqCst); + } + + /// Call when a poll attempt fails after all retries — may open the circuit. + async fn record_failure(&self) { + let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1; + if failures >= self.threshold { + let mut guard = self.open_until.lock().await; + let deadline = tokio::time::Instant::now() + self.cool_down; + *guard = Some(deadline); + warn!( + "Circuit breaker OPENED after {failures} consecutive failures. \ + Pausing queries for {:?}.", + self.cool_down + ); + } + } + + /// Returns true if the circuit is currently open (queries should be skipped). + async fn is_open(&self) -> bool { + let mut guard = self.open_until.lock().await; + if let Some(deadline) = *guard { + if tokio::time::Instant::now() < deadline { + return true; + } + // Cool-down elapsed — allow one probe attempt (half-open state) + *guard = None; + self.consecutive_failures.store(0, Ordering::SeqCst); + info!("Circuit breaker entering HALF-OPEN state — probing InfluxDB."); + } + false + } +} + +// --------------------------------------------------------------------------- +// Main connector structs +// --------------------------------------------------------------------------- + +#[derive(Debug)] +pub struct InfluxDbSource { + pub id: u32, + config: InfluxDbSourceConfig, + client: Option, + state: Mutex, + verbose: bool, + retry_delay: Duration, + poll_interval: Duration, + // [FIX-SRC-5] + circuit_breaker: Arc, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InfluxDbSourceConfig { + pub url: String, + pub org: String, + pub token: String, + pub query: String, + pub poll_interval: Option, + pub batch_size: Option, + pub cursor_field: Option, + pub initial_offset: Option, + pub payload_column: Option, + pub payload_format: Option, + pub include_metadata: Option, + pub verbose_logging: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub timeout: Option, + // [FIX-SRC-6] How many times open() will retry before giving up + pub max_open_retries: Option, + // [FIX-SRC-7] Upper cap on open() backoff delay + pub open_retry_max_delay: Option, + // [FIX-SRC-5] Circuit breaker configuration + pub circuit_breaker_threshold: Option, + pub circuit_breaker_cool_down: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum PayloadFormat { + #[default] + Json, + Text, + Raw, +} + +impl PayloadFormat { + fn from_config(value: Option<&str>) -> Self { + match value.map(|v| v.to_ascii_lowercase()).as_deref() { + Some("text") | Some("utf8") => PayloadFormat::Text, + Some("raw") | Some("base64") => PayloadFormat::Raw, + _ => PayloadFormat::Json, + } + } + + fn schema(self) -> Schema { + match self { + PayloadFormat::Json => Schema::Json, + PayloadFormat::Text => Schema::Text, + PayloadFormat::Raw => Schema::Raw, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct State { + last_poll_time: DateTime, + last_timestamp: Option, + processed_rows: u64, +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn parse_duration(value: Option<&str>, default_value: &str) -> Duration { + let raw = value.unwrap_or(default_value); + HumanDuration::from_str(raw) + .map(|d| d.into()) + .unwrap_or_else(|_| Duration::from_secs(1)) +} + +fn parse_scalar(value: &str) -> serde_json::Value { + if value.is_empty() { + return serde_json::Value::Null; + } + if let Ok(v) = value.parse::() { + return serde_json::Value::Bool(v); + } + if let Ok(v) = value.parse::() { + return serde_json::Value::Number(v.into()); + } + if let Ok(v) = value.parse::() + && let Some(number) = serde_json::Number::from_f64(v) + { + return serde_json::Value::Number(number); + } + serde_json::Value::String(value.to_string()) +} + +fn is_header_record(record: &StringRecord) -> bool { + record.iter().any(|v| v == "_time") && record.iter().any(|v| v == "_value") +} + +fn is_transient_status(status: StatusCode) -> bool { + status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() +} + +// [FIX-SRC-3] Apply ±20% random jitter to a duration to spread retry storms +fn jitter(base: Duration) -> Duration { + let millis = base.as_millis() as u64; + let jitter_range = millis / 5; // 20% of base + if jitter_range == 0 { + return base; + } + let delta = rand::rng().random_range(0..=jitter_range * 2); + Duration::from_millis(millis.saturating_sub(jitter_range) + delta) +} + +// [FIX-SRC-2] True exponential backoff: base * 2^attempt, capped at max_delay +fn exponential_backoff(base: Duration, attempt: u32, max_delay: Duration) -> Duration { + let factor = 2u64.saturating_pow(attempt); + let raw = Duration::from_millis(base.as_millis().saturating_mul(factor as u128) as u64); + raw.min(max_delay) +} + +// [FIX-SRC-4] Parse Retry-After header value (integer seconds or HTTP date) +fn parse_retry_after(value: &str) -> Option { + // First try plain integer seconds + if let Ok(secs) = value.trim().parse::() { + return Some(Duration::from_secs(secs)); + } + // Then try HTTP-date (best-effort via httpdate crate if available, + // otherwise fall back to None so caller uses its own backoff) + None +} + +// --------------------------------------------------------------------------- +// InfluxDbSource implementation +// --------------------------------------------------------------------------- + +impl InfluxDbSource { + pub fn new(id: u32, config: InfluxDbSourceConfig, state: Option) -> Self { + let verbose = config.verbose_logging.unwrap_or(false); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let poll_interval = parse_duration(config.poll_interval.as_deref(), DEFAULT_POLL_INTERVAL); + + // [FIX-SRC-5] Build circuit breaker from config + let cb_threshold = config + .circuit_breaker_threshold + .unwrap_or(DEFAULT_CIRCUIT_BREAKER_THRESHOLD); + let cb_cool_down = parse_duration( + config.circuit_breaker_cool_down.as_deref(), + DEFAULT_CIRCUIT_COOL_DOWN, + ); + + let restored_state = state + .and_then(|s| s.deserialize::(CONNECTOR_NAME, id)) + .inspect(|s| { + info!( + "Restored state for {CONNECTOR_NAME} connector with ID: {id}. \ + Last timestamp: {:?}, processed rows: {}", + s.last_timestamp, s.processed_rows + ); + }); + + InfluxDbSource { + id, + config, + client: None, + state: Mutex::new(restored_state.unwrap_or(State { + last_poll_time: Utc::now(), + last_timestamp: None, + processed_rows: 0, + })), + verbose, + retry_delay, + poll_interval, + circuit_breaker: Arc::new(CircuitBreaker::new(cb_threshold, cb_cool_down)), + } + } + + fn serialize_state(&self, state: &State) -> Option { + ConnectorState::serialize(state, CONNECTOR_NAME, self.id) + } + + fn payload_format(&self) -> PayloadFormat { + PayloadFormat::from_config(self.config.payload_format.as_deref()) + } + + fn cursor_field(&self) -> &str { + self.config.cursor_field.as_deref().unwrap_or("_time") + } + + fn get_max_retries(&self) -> u32 { + self.config + .max_retries + .unwrap_or(DEFAULT_MAX_RETRIES) + .max(1) + } + + fn build_client(&self) -> Result { + let timeout = parse_duration(self.config.timeout.as_deref(), DEFAULT_TIMEOUT); + Client::builder() + .timeout(timeout) + .build() + .map_err(|e| Error::InitError(format!("Failed to create HTTP client: {e}"))) + } + + fn get_client(&self) -> Result<&Client, Error> { + self.client + .as_ref() + .ok_or_else(|| Error::Connection("InfluxDB client is not initialized".to_string())) + } + + fn build_health_url(&self) -> Result { + let base = self.config.url.trim_end_matches('/'); + Url::parse(&format!("{base}/health")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}"))) + } + + fn build_query_url(&self) -> Result { + let base = self.config.url.trim_end_matches('/'); + let mut url = Url::parse(&format!("{base}/api/v2/query")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + url.query_pairs_mut().append_pair("org", &self.config.org); + Ok(url) + } + + async fn check_connectivity(&self) -> Result<(), Error> { + let client = self.get_client()?; + let url = self.build_health_url()?; + let response = client + .get(url) + .send() + .await + .map_err(|e| Error::Connection(format!("InfluxDB health check failed: {e}")))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + return Err(Error::Connection(format!( + "InfluxDB health check returned status {status}: {body}" + ))); + } + Ok(()) + } + + // [FIX-SRC-1] Retry connectivity check with exponential backoff + jitter + // instead of failing hard on the first attempt. + async fn check_connectivity_with_retry(&self) -> Result<(), Error> { + let max_open_retries = self + .config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) + .max(1); + + let max_delay = parse_duration( + self.config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, + ); + + let mut attempt = 0u32; + loop { + match self.check_connectivity().await { + Ok(()) => { + if attempt > 0 { + info!( + "InfluxDB connectivity established after {attempt} retries \ + for connector ID: {}", + self.id + ); + } + return Ok(()); + } + Err(e) => { + attempt += 1; + if attempt >= max_open_retries { + error!( + "InfluxDB connectivity check failed after {attempt} attempts \ + for connector ID: {}. Giving up: {e}", + self.id + ); + return Err(e); + } + // [FIX-SRC-2] Exponential backoff, [FIX-SRC-3] with jitter + let backoff = jitter(exponential_backoff(self.retry_delay, attempt, max_delay)); + warn!( + "InfluxDB health check failed (attempt {attempt}/{max_open_retries}) \ + for connector ID: {}. Retrying in {backoff:?}: {e}", + self.id + ); + tokio::time::sleep(backoff).await; + } + } + } + } + + async fn current_cursor(&self) -> String { + let state = self.state.lock().await; + state + .last_timestamp + .clone() + .or_else(|| self.config.initial_offset.clone()) + .unwrap_or_else(|| DEFAULT_CURSOR.to_string()) + } + + fn query_with_params(&self, cursor: &str) -> String { + let mut query = self.config.query.clone(); + if query.contains("$cursor") { + query = query.replace("$cursor", cursor); + } + if query.contains("$limit") { + query = query.replace("$limit", &self.config.batch_size.unwrap_or(500).to_string()); + } + query + } + + async fn run_query_with_retry(&self, query: &str) -> Result { + let client = self.get_client()?; + let url = self.build_query_url()?; + let max_retries = self.get_max_retries(); + let token = self.config.token.clone(); + + // [FIX-SRC-7] Cap for per-query backoff (reuse open_retry_max_delay config) + let max_delay = parse_duration( + self.config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, + ); + + let body = json!({ + "query": query, + "dialect": { + "annotations": [], + "delimiter": ",", + "header": true, + "commentPrefix": "#" + } + }); + + let mut attempts = 0u32; + loop { + let response_result = client + .post(url.clone()) + .header("Authorization", format!("Token {token}")) + .header("Content-Type", "application/json") + .header("Accept", "text/csv") + .json(&body) + .send() + .await; + + match response_result { + Ok(response) => { + let status = response.status(); + + if status.is_success() { + return response.text().await.map_err(|e| { + Error::Storage(format!("Failed to read query response: {e}")) + }); + } + + // [FIX-SRC-4] Honour Retry-After on 429 before our own backoff + let retry_after = if status == StatusCode::TOO_MANY_REQUESTS { + response + .headers() + .get("Retry-After") + .and_then(|v| v.to_str().ok()) + .and_then(parse_retry_after) + } else { + None + }; + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + attempts += 1; + if is_transient_status(status) && attempts < max_retries { + // [FIX-SRC-4] Use server-supplied delay when available + let delay = retry_after.unwrap_or_else(|| { + // [FIX-SRC-2] Exponential, [FIX-SRC-3] with jitter + jitter(exponential_backoff(self.retry_delay, attempts, max_delay)) + }); + warn!( + "Transient InfluxDB query error (attempt {attempts}/{max_retries}): \ + {status}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + continue; + } + + return Err(Error::Storage(format!( + "InfluxDB query failed with status {status}: {body_text}" + ))); + } + Err(e) => { + attempts += 1; + if attempts < max_retries { + // [FIX-SRC-2] Exponential, [FIX-SRC-3] with jitter + let delay = + jitter(exponential_backoff(self.retry_delay, attempts, max_delay)); + warn!( + "Failed to query InfluxDB (attempt {attempts}/{max_retries}): \ + {e}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + continue; + } + + return Err(Error::Storage(format!( + "InfluxDB query failed after {attempts} attempts: {e}" + ))); + } + } + } + } + + fn parse_csv_rows(&self, csv_text: &str) -> Result>, Error> { + let mut reader = csv::ReaderBuilder::new() + .has_headers(false) + .from_reader(csv_text.as_bytes()); + + let mut headers: Option = None; + let mut rows = Vec::new(); + + for result in reader.records() { + let record = result + .map_err(|e| Error::InvalidRecordValue(format!("Invalid CSV record: {e}")))?; + + if record.is_empty() { + continue; + } + + if let Some(first) = record.get(0) + && first.starts_with('#') + { + continue; + } + + if is_header_record(&record) { + headers = Some(record.clone()); + continue; + } + + let Some(active_headers) = headers.as_ref() else { + continue; + }; + + if record == *active_headers { + continue; + } + + let mut mapped = HashMap::new(); + for (idx, key) in active_headers.iter().enumerate() { + if key.is_empty() { + continue; + } + let value = record.get(idx).unwrap_or("").to_string(); + mapped.insert(key.to_string(), value); + } + + if !mapped.is_empty() { + rows.push(mapped); + } + } + + Ok(rows) + } + + fn build_payload( + &self, + row: &HashMap, + include_metadata: bool, + ) -> Result, Error> { + if let Some(payload_column) = self.config.payload_column.as_deref() { + let raw_value = row.get(payload_column).cloned().ok_or_else(|| { + Error::InvalidRecordValue(format!("Missing payload column '{payload_column}'")) + })?; + + return match self.payload_format() { + PayloadFormat::Json => { + let value: serde_json::Value = + serde_json::from_str(&raw_value).map_err(|e| { + Error::InvalidRecordValue(format!( + "Payload column '{payload_column}' is not valid JSON: {e}" + )) + })?; + serde_json::to_vec(&value).map_err(|e| { + Error::Serialization(format!("JSON serialization failed: {e}")) + }) + } + PayloadFormat::Text => Ok(raw_value.into_bytes()), + PayloadFormat::Raw => general_purpose::STANDARD + .decode(raw_value.as_bytes()) + .or_else(|_| Ok(raw_value.into_bytes())) + .map_err(|e: base64::DecodeError| { + Error::InvalidRecordValue(format!( + "Failed to decode payload as base64: {e}" + )) + }), + }; + } + + let mut json_row = serde_json::Map::new(); + for (key, value) in row { + if include_metadata || key == "_value" || key == "_time" || key == "_measurement" { + json_row.insert(key.clone(), parse_scalar(value)); + } + } + + let wrapped = json!({ + "measurement": row.get("_measurement").cloned().unwrap_or_default(), + "field": row.get("_field").cloned().unwrap_or_default(), + "timestamp": row.get("_time").cloned().unwrap_or_default(), + "value": row.get("_value").map(|v| parse_scalar(v)).unwrap_or(serde_json::Value::Null), + "row": json_row, + }); + + serde_json::to_vec(&wrapped) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))) + } + + async fn poll_messages(&self) -> Result<(Vec, Option), Error> { + let cursor = self.current_cursor().await; + let query = self.query_with_params(&cursor); + let csv_data = self.run_query_with_retry(&query).await?; + + let rows = self.parse_csv_rows(&csv_data)?; + let include_metadata = self.config.include_metadata.unwrap_or(true); + let cursor_field = self.cursor_field().to_string(); + + let mut messages = Vec::with_capacity(rows.len()); + let mut max_cursor: Option = None; + + for row in rows { + if let Some(cursor_value) = row.get(&cursor_field) + && max_cursor + .as_ref() + .is_none_or(|current| cursor_value > current) + { + max_cursor = Some(cursor_value.clone()); + } + + let payload = self.build_payload(&row, include_metadata)?; + messages.push(ProducedMessage { + id: Some(Uuid::new_v4().as_u128()), + checksum: None, + timestamp: Some(Utc::now().timestamp_millis() as u64), + origin_timestamp: Some(Utc::now().timestamp_millis() as u64), + headers: None, + payload, + }); + } + + Ok((messages, max_cursor)) + } +} + +// --------------------------------------------------------------------------- +// Source trait implementation +// --------------------------------------------------------------------------- + +#[async_trait] +impl Source for InfluxDbSource { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening InfluxDB source connector with ID: {}. Org: {}", + self.id, self.config.org + ); + + self.client = Some(self.build_client()?); + + // [FIX-SRC-1] Use retrying connectivity check instead of hard-fail + self.check_connectivity_with_retry().await?; + + info!( + "InfluxDB source connector with ID: {} opened successfully", + self.id + ); + Ok(()) + } + + async fn poll(&self) -> Result { + // [FIX-SRC-5] Skip query if circuit breaker is open + if self.circuit_breaker.is_open().await { + warn!( + "InfluxDB source ID: {} — circuit breaker is OPEN. Skipping poll.", + self.id + ); + return Ok(ProducedMessages { + schema: Schema::Json, + messages: vec![], + state: None, + }); + } + + match self.poll_messages().await { + Ok((messages, max_cursor)) => { + // [FIX-SRC-5] Successful poll — reset circuit breaker + self.circuit_breaker.record_success(); + + let mut state = self.state.lock().await; + state.last_poll_time = Utc::now(); + state.processed_rows += messages.len() as u64; + if let Some(cursor) = max_cursor { + state.last_timestamp = Some(cursor); + } + + if self.verbose { + info!( + "InfluxDB source ID: {} produced {} messages. \ + Total processed: {}. Cursor: {:?}", + self.id, + messages.len(), + state.processed_rows, + state.last_timestamp + ); + } else { + debug!( + "InfluxDB source ID: {} produced {} messages. \ + Total processed: {}. Cursor: {:?}", + self.id, + messages.len(), + state.processed_rows, + state.last_timestamp + ); + } + + let schema = if self.config.payload_column.is_some() { + self.payload_format().schema() + } else { + Schema::Json + }; + + let persisted_state = self.serialize_state(&state); + + Ok(ProducedMessages { + schema, + messages, + state: persisted_state, + }) + } + Err(e) => { + // [FIX-SRC-5] Failed poll — notify circuit breaker + self.circuit_breaker.record_failure().await; + error!( + "InfluxDB source ID: {} poll failed: {e}. \ + Consecutive failures tracked by circuit breaker.", + self.id + ); + tokio::time::sleep(self.poll_interval).await; + Err(e) + } + } + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "InfluxDB source connector ID: {} closed. Total rows processed: {}", + self.id, state.processed_rows + ); + Ok(()) + } +} diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 4c58bcb26c..20bf0cd9d2 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -52,7 +52,7 @@ once_cell = { workspace = true } predicates = { workspace = true } rand = { workspace = true } rcgen = { workspace = true } -reqwest = { workspace = true } +reqwest = { workspace = true, features = ["json"] } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } rmcp = { workspace = true, features = [ @@ -61,7 +61,7 @@ rmcp = { workspace = true, features = [ "transport-streamable-http-client", "transport-streamable-http-client-reqwest", ] } -serde = { workspace = true } +serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } serial_test = { workspace = true } server = { workspace = true } @@ -72,7 +72,7 @@ sysinfo = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } testcontainers-modules = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["full", "test-util"] } toml = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/mod.rs b/core/integration/tests/connectors/fixtures/elasticsearch/mod.rs index b0281f4924..b48c6790f4 100644 --- a/core/integration/tests/connectors/fixtures/elasticsearch/mod.rs +++ b/core/integration/tests/connectors/fixtures/elasticsearch/mod.rs @@ -17,9 +17,9 @@ * under the License. */ -mod container; -mod sink; -mod source; +pub mod container; +pub mod sink; +pub mod source; pub use sink::ElasticsearchSinkFixture; pub use source::ElasticsearchSourcePreCreatedFixture; diff --git a/core/integration/tests/connectors/fixtures/influxdb/container.rs b/core/integration/tests/connectors/fixtures/influxdb/container.rs new file mode 100644 index 0000000000..5ad727453c --- /dev/null +++ b/core/integration/tests/connectors/fixtures/influxdb/container.rs @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use integration::harness::TestBinaryError; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tracing::info; + +const INFLUXDB_IMAGE: &str = "influxdb"; +const INFLUXDB_TAG: &str = "2.7-alpine"; +const INFLUXDB_PORT: u16 = 8086; + +pub const INFLUXDB_ORG: &str = "iggy-test-org"; +pub const INFLUXDB_BUCKET: &str = "iggy-test-bucket"; +pub const INFLUXDB_TOKEN: &str = "iggy-test-secret-token"; +pub const INFLUXDB_USERNAME: &str = "iggy-admin"; +pub const INFLUXDB_PASSWORD: &str = "iggy-password"; + +/// Number of attempts to poll `/ping` before giving up. +pub const HEALTH_CHECK_ATTEMPTS: usize = 60; +/// Milliseconds between each `/ping` attempt. +pub const HEALTH_CHECK_INTERVAL_MS: u64 = 1_000; + +pub const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub const DEFAULT_TEST_TOPIC: &str = "test_topic"; + +// ── env-var keys injected into the connectors runtime ──────────────────────── + +pub const ENV_SOURCE_URL: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_URL"; +pub const ENV_SOURCE_ORG: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_ORG"; +pub const ENV_SOURCE_TOKEN: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_TOKEN"; +pub const ENV_SOURCE_BUCKET: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_BUCKET"; +pub const ENV_SOURCE_QUERY: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_QUERY"; +pub const ENV_SOURCE_POLL_INTERVAL: &str = + "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_POLL_INTERVAL"; +pub const ENV_SOURCE_BATCH_SIZE: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PLUGIN_CONFIG_BATCH_SIZE"; +pub const ENV_SOURCE_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_STREAMS_0_STREAM"; +pub const ENV_SOURCE_STREAMS_0_TOPIC: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_STREAMS_0_TOPIC"; +pub const ENV_SOURCE_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_STREAMS_0_SCHEMA"; +pub const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_INFLUXDB_PATH"; + +pub const ENV_SINK_URL: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_PLUGIN_CONFIG_URL"; +pub const ENV_SINK_ORG: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_PLUGIN_CONFIG_ORG"; +pub const ENV_SINK_TOKEN: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_PLUGIN_CONFIG_TOKEN"; +pub const ENV_SINK_BUCKET: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_PLUGIN_CONFIG_BUCKET"; +pub const ENV_SINK_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_STREAMS_0_STREAM"; +pub const ENV_SINK_STREAMS_0_TOPICS: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_STREAMS_0_TOPICS"; +pub const ENV_SINK_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_STREAMS_0_SCHEMA"; +pub const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_INFLUXDB_STREAMS_0_CONSUMER_GROUP"; +pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_INFLUXDB_PATH"; + +// ── Container ──────────────────────────────────────────────────────────────── + +pub struct InfluxDbContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl InfluxDbContainer { + pub async fn start() -> Result { + let container: ContainerAsync = + GenericImage::new(INFLUXDB_IMAGE, INFLUXDB_TAG) + .with_exposed_port(INFLUXDB_PORT.tcp()) + // "Listening" appears in stdout before the HTTP API is ready on + // aarch64/Apple Silicon; we add a real /ping probe below. + .with_wait_for(WaitFor::message_on_stdout("Listening")) + .with_mapped_port(0, INFLUXDB_PORT.tcp()) + .with_env_var("DOCKER_INFLUXDB_INIT_MODE", "setup") + .with_env_var("DOCKER_INFLUXDB_INIT_USERNAME", INFLUXDB_USERNAME) + .with_env_var("DOCKER_INFLUXDB_INIT_PASSWORD", INFLUXDB_PASSWORD) + .with_env_var("DOCKER_INFLUXDB_INIT_ORG", INFLUXDB_ORG) + .with_env_var("DOCKER_INFLUXDB_INIT_BUCKET", INFLUXDB_BUCKET) + .with_env_var("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN", INFLUXDB_TOKEN) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(INFLUXDB_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbContainer".to_string(), + message: "No mapping for InfluxDB port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("InfluxDB container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +// ── HTTP client ─────────────────────────────────────────────────────────────── + +pub fn create_http_client() -> HttpClient { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"); + reqwest_middleware::ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +} + +// ── Shared InfluxDB operations ──────────────────────────────────────────────── + +pub trait InfluxDbOps: Sync { + fn container(&self) -> &InfluxDbContainer; + fn http_client(&self) -> &HttpClient; + + /// Write line-protocol lines into the test bucket. + fn write_lines( + &self, + lines: &[&str], + ) -> impl std::future::Future> + Send { + async move { + let url = format!( + "{}/api/v2/write?org={}&bucket={}&precision=ns", + self.container().base_url, + INFLUXDB_ORG, + INFLUXDB_BUCKET, + ); + let body = lines.join("\n"); + + let response = self + .http_client() + .post(&url) + .header("Authorization", format!("Token {INFLUXDB_TOKEN}")) + .header("Content-Type", "text/plain; charset=utf-8") + .body(body) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbOps".to_string(), + message: format!("Failed to write lines: {e}"), + })?; + + let status = response.status(); + if !status.is_success() && status.as_u16() != 204 { + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbOps".to_string(), + message: format!("Write error: status={status}, body={body}"), + }); + } + + Ok(()) + } + } + + fn query_count( + &self, + flux: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = format!( + "{}/api/v2/query?org={}", + self.container().base_url, + INFLUXDB_ORG + ); + let body = serde_json::json!({ "query": flux, "type": "flux" }); + + let response = self + .http_client() + .post(&url) + .header("Authorization", format!("Token {INFLUXDB_TOKEN}")) + .header("Content-Type", "application/json") + .header("Accept", "application/csv") + .json(&body) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to query InfluxDB: {e}"), + })?; + + let text = response.text().await.unwrap_or_default(); + eprintln!( + "DEBUG influxdb csv response:\n{}", + &text[..text.len().min(2000)] + ); + // InfluxDB annotated CSV format: + // - Annotation rows start with '#' + // - Header row starts with ',result,table,...' (empty first field) + // - Data rows start with an empty annotation field, e.g. ',_result,0,...' + // where the THIRD field (index 2) is the numeric table index. + // - Empty lines separate tables + // Count lines whose third CSV field is a non-negative integer (data rows). + let count = text + .lines() + .filter(|l| { + let mut fields = l.splitn(4, ','); + let annotation = fields.next().unwrap_or(""); + // Data rows have an empty first field (the annotation column) + if !annotation.is_empty() { + return false; + } + fields.next(); // skip _result + let table = fields.next().unwrap_or(""); + table.parse::().is_ok() + }) + .count(); + Ok(count) + } + } +} diff --git a/core/integration/tests/connectors/fixtures/influxdb/mod.rs b/core/integration/tests/connectors/fixtures/influxdb/mod.rs new file mode 100644 index 0000000000..44ac7b1182 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/influxdb/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod container; +pub mod sink; +pub mod source; + +pub use sink::InfluxDbSinkFixture; +pub use source::InfluxDbSourceFixture; diff --git a/core/integration/tests/connectors/fixtures/influxdb/sink.rs b/core/integration/tests/connectors/fixtures/influxdb/sink.rs new file mode 100644 index 0000000000..702b5dbbb8 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/influxdb/sink.rs @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::container::{ + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_BUCKET, ENV_SINK_ORG, ENV_SINK_PATH, + ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, ENV_SINK_STREAMS_0_STREAM, + ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_TOKEN, ENV_SINK_URL, HEALTH_CHECK_ATTEMPTS, + HEALTH_CHECK_INTERVAL_MS, INFLUXDB_BUCKET, INFLUXDB_ORG, INFLUXDB_TOKEN, InfluxDbContainer, + InfluxDbOps, create_http_client, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; +use std::time::Duration; +use tokio::time::sleep; +use tracing::info; + +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +pub struct InfluxDbSinkFixture { + container: InfluxDbContainer, + http_client: HttpClient, +} + +impl InfluxDbOps for InfluxDbSinkFixture { + fn container(&self) -> &InfluxDbContainer { + &self.container + } + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +impl InfluxDbSinkFixture { + /// Poll until at least `expected` points exist in the bucket. + pub async fn wait_for_points( + &self, + measurement: &str, + expected: usize, + ) -> Result { + let flux = format!( + r#"from(bucket:"{b}") |> range(start:-1h) |> filter(fn:(r)=>r._measurement=="{m}")"#, + b = INFLUXDB_BUCKET, + m = measurement, + ); + info!("Flux Query {} ", flux); + for _ in 0..POLL_ATTEMPTS { + match self.query_count(&flux).await { + Ok(n) if n >= expected => { + info!("Found {n} points in InfluxDB (expected {expected})"); + return Ok(n); + } + Ok(_) | Err(_) => {} + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + Err(TestBinaryError::InvalidState { + message: format!("Expected at least {expected} points after {POLL_ATTEMPTS} attempts"), + }) + } +} + +#[async_trait] +impl TestFixture for InfluxDbSinkFixture { + async fn setup() -> Result { + let container = InfluxDbContainer::start().await?; + let http_client = create_http_client(); + + let fixture = Self { + container, + http_client, + }; + + // Same /ping readiness probe as the source fixture. + for attempt in 0..HEALTH_CHECK_ATTEMPTS { + let url = format!("{}/ping", fixture.container.base_url); + match fixture.http_client.get(&url).send().await { + Ok(resp) if resp.status().as_u16() == 204 => { + info!("InfluxDB /ping OK after {} attempts", attempt + 1); + return Ok(fixture); + } + Ok(resp) => { + info!( + "InfluxDB /ping status {} (attempt {})", + resp.status(), + attempt + 1 + ); + } + Err(e) => { + info!("InfluxDB /ping error on attempt {}: {e}", attempt + 1); + } + } + sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await; + } + + Err(TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbSink".to_string(), + message: format!( + "InfluxDB /ping did not return 204 after {HEALTH_CHECK_ATTEMPTS} attempts" + ), + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert(ENV_SINK_URL.to_string(), self.container.base_url.clone()); + envs.insert(ENV_SINK_ORG.to_string(), INFLUXDB_ORG.to_string()); + envs.insert(ENV_SINK_TOKEN.to_string(), INFLUXDB_TOKEN.to_string()); + envs.insert(ENV_SINK_BUCKET.to_string(), INFLUXDB_BUCKET.to_string()); + envs.insert( + ENV_SINK_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", DEFAULT_TEST_TOPIC), + ); + envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + "influxdb_sink_cg".to_string(), + ); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_influxdb_sink".to_string(), + ); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/influxdb/source.rs b/core/integration/tests/connectors/fixtures/influxdb/source.rs new file mode 100644 index 0000000000..d45f6ee6c8 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/influxdb/source.rs @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::container::{ + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE, ENV_SOURCE_BUCKET, + ENV_SOURCE_ORG, ENV_SOURCE_PATH, ENV_SOURCE_POLL_INTERVAL, ENV_SOURCE_QUERY, + ENV_SOURCE_STREAMS_0_SCHEMA, ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC, + ENV_SOURCE_TOKEN, ENV_SOURCE_URL, HEALTH_CHECK_ATTEMPTS, HEALTH_CHECK_INTERVAL_MS, + INFLUXDB_BUCKET, INFLUXDB_ORG, INFLUXDB_TOKEN, InfluxDbContainer, InfluxDbOps, + create_http_client, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; +use std::time::Duration; +use tokio::time::sleep; +use tracing::info; + +pub struct InfluxDbSourceFixture { + pub(super) container: InfluxDbContainer, + pub(super) http_client: HttpClient, +} + +impl InfluxDbOps for InfluxDbSourceFixture { + fn container(&self) -> &InfluxDbContainer { + &self.container + } + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +impl InfluxDbSourceFixture { + /// Write line-protocol lines into the test bucket. + pub async fn write_lines(&self, lines: &[&str]) -> Result<(), TestBinaryError> { + InfluxDbOps::write_lines(self, lines).await + } +} + +#[async_trait] +impl TestFixture for InfluxDbSourceFixture { + async fn setup() -> Result { + let container = InfluxDbContainer::start().await?; + let http_client = create_http_client(); + + let fixture = Self { + container, + http_client, + }; + + // Poll /ping until InfluxDB HTTP API is truly ready to accept writes. + // The "Listening" log fires before the API finishes initialisation on + // Apple Silicon / aarch64, causing Connection-reset-by-peer on the + // first /api/v2/write call. /ping returning 204 is the authoritative + // signal that the API is ready. + for attempt in 0..HEALTH_CHECK_ATTEMPTS { + let url = format!("{}/ping", fixture.container.base_url); + match fixture.http_client.get(&url).send().await { + Ok(resp) if resp.status().as_u16() == 204 => { + info!("InfluxDB /ping OK after {} attempts", attempt + 1); + return Ok(fixture); + } + Ok(resp) => { + info!( + "InfluxDB /ping status {} (attempt {})", + resp.status(), + attempt + 1 + ); + } + Err(e) => { + info!("InfluxDB /ping error on attempt {}: {e}", attempt + 1); + } + } + sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await; + } + + Err(TestBinaryError::FixtureSetup { + fixture_type: "InfluxDbSource".to_string(), + message: format!( + "InfluxDB /ping did not return 204 after {HEALTH_CHECK_ATTEMPTS} attempts" + ), + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let default_flux = format!( + r#"from(bucket:"{b}") |> range(start: -1h) |> filter(fn: (r) => r._time > time(v: "$cursor")) |> sort(columns: ["_time"]) |> limit(n: $limit)"#, + b = INFLUXDB_BUCKET, + ); + + let mut envs = HashMap::new(); + envs.insert(ENV_SOURCE_URL.to_string(), self.container.base_url.clone()); + envs.insert(ENV_SOURCE_ORG.to_string(), INFLUXDB_ORG.to_string()); + envs.insert(ENV_SOURCE_TOKEN.to_string(), INFLUXDB_TOKEN.to_string()); + envs.insert(ENV_SOURCE_BUCKET.to_string(), INFLUXDB_BUCKET.to_string()); + envs.insert(ENV_SOURCE_QUERY.to_string(), default_flux); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "100ms".to_string()); + envs.insert(ENV_SOURCE_BATCH_SIZE.to_string(), "100".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + "../../target/debug/libiggy_connector_influxdb_source".to_string(), + ); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 6deae48664..2f3fcda054 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -19,6 +19,7 @@ mod elasticsearch; mod iceberg; +mod influxdb; mod mongodb; mod postgres; mod quickwit; @@ -26,6 +27,7 @@ mod wiremock; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; +pub use influxdb::{InfluxDbSinkFixture, InfluxDbSourceFixture}; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, diff --git a/core/integration/tests/connectors/influxdb/docker-compose.yml b/core/integration/tests/connectors/influxdb/docker-compose.yml new file mode 100644 index 0000000000..73ae7e0df1 --- /dev/null +++ b/core/integration/tests/connectors/influxdb/docker-compose.yml @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# docker-compose for InfluxDB connector integration tests. +# +# File location: +# core/integration/tests/connectors/influxdb/docker-compose.yml +# +# Used as a fallback when running tests without a Docker daemon that supports +# testcontainers auto-launch (e.g. some CI environments). Start manually with: +# docker compose -f core/integration/tests/connectors/influxdb/docker-compose.yml up -d +# then run: +# INFLUXDB_URL=http://localhost:8086 cargo test -p iggy-integration-tests influxdb + +version: "3.8" + +services: + influxdb: + image: influxdb:2.7-alpine + container_name: iggy-test-influxdb + ports: + - "8086:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: iggy-admin + DOCKER_INFLUXDB_INIT_PASSWORD: iggy-password + DOCKER_INFLUXDB_INIT_ORG: iggy-test-org + DOCKER_INFLUXDB_INIT_BUCKET: iggy-test-bucket + DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: iggy-super-secret-test-token + healthcheck: + test: ["CMD", "influx", "ping"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + volumes: + - iggy-influxdb-data:/var/lib/influxdb2 + + # InfluxDB instance for the SOURCE connector tests (separate bucket/org) + influxdb-source: + image: influxdb:2.7-alpine + container_name: iggy-test-influxdb-source + ports: + - "8087:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: iggy-admin + DOCKER_INFLUXDB_INIT_PASSWORD: iggy-password + DOCKER_INFLUXDB_INIT_ORG: iggy-src-org + DOCKER_INFLUXDB_INIT_BUCKET: iggy-src-bucket + DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: iggy-src-secret-token + healthcheck: + test: ["CMD", "influx", "ping"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + volumes: + - iggy-influxdb-source-data:/var/lib/influxdb2 + +volumes: + iggy-influxdb-data: + iggy-influxdb-source-data: diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink.rs b/core/integration/tests/connectors/influxdb/influxdb_sink.rs new file mode 100644 index 0000000000..dcf462cd3e --- /dev/null +++ b/core/integration/tests/connectors/influxdb/influxdb_sink.rs @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::TEST_MESSAGE_COUNT; +use crate::connectors::fixtures::InfluxDbSinkFixture; +use bytes::Bytes; +use iggy::prelude::IggyMessage; +use iggy::prelude::Partitioning; +use iggy_binary_protocol::MessageClient; +use iggy_common::Identifier; +use integration::harness::seeds; +use integration::iggy_harness; +use serde_json::json; + +// seeds::connector_stream creates the topic with 1 partition (Iggy partition IDs are 1-based). +// Use Partitioning::balanced() so the runtime picks partition 1 automatically. + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_writes_messages_to_bucket( + harness: &TestHarness, + fixture: InfluxDbSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let mut messages: Vec = (1u32..=TEST_MESSAGE_COUNT as u32) + .map(|i| { + let payload = serde_json::to_vec(&json!({"sensor_id": i, "temp": 20.0 + i as f64})) + .expect("Failed to serialize"); + IggyMessage::builder() + .id(i as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::balanced(), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_points("iggy_messages", TEST_MESSAGE_COUNT) + .await + .expect("Failed to wait for InfluxDB points"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture: InfluxDbSinkFixture) { + let client = harness.root_client().await.unwrap(); + let bulk_count = 50; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let mut messages: Vec = (0..bulk_count) + .map(|i| { + let payload = serde_json::to_vec(&json!({"seq": i})).expect("Failed to serialize"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::balanced(), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_points("iggy_messages", bulk_count) + .await + .expect("Failed to wait for InfluxDB points"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_payload_fields_stored_correctly( + harness: &TestHarness, + fixture: InfluxDbSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let payload = serde_json::to_vec(&json!({"device": "sensor-42", "reading": 99.5})).unwrap(); + let mut messages = vec![ + IggyMessage::builder() + .id(1u128) + .payload(Bytes::from(payload)) + .build() + .unwrap(), + ]; + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::balanced(), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_points("iggy_messages", 1) + .await + .expect("Failed to wait for InfluxDB points"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_large_batch(harness: &TestHarness, fixture: InfluxDbSinkFixture) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + for chunk_start in (0..500usize).step_by(100) { + let mut chunk: Vec = (chunk_start..chunk_start + 100) + .map(|i| { + let payload = serde_json::to_vec(&json!({"seq": i})).expect("Failed to serialize"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages(&stream_id, &topic_id, &Partitioning::balanced(), &mut chunk) + .await + .expect("Failed to send messages"); + } + + fixture + .wait_for_points("iggy_messages", 500) + .await + .expect("Failed to wait for 500 InfluxDB points"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_recovers_backlogged_messages( + harness: &TestHarness, + fixture: InfluxDbSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let mut messages: Vec = (0..10) + .map(|i| { + let payload = serde_json::to_vec(&json!({"i": i})).expect("Failed to serialize"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::balanced(), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + fixture + .wait_for_points("iggy_messages", 10) + .await + .expect("Failed to wait for 10 backlogged InfluxDB points"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_sink_multiple_partitions(harness: &TestHarness, fixture: InfluxDbSinkFixture) { + let client = harness.root_client().await.unwrap(); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + // Topic has only 1 partition — send 3 messages via balanced partitioning + // (they all go to partition 1, which is correct for a 1-partition topic). + for i in 1u32..=3 { + let payload = serde_json::to_vec(&json!({"msg_index": i})).expect("Failed to serialize"); + let mut messages = vec![ + IggyMessage::builder() + .id(i as u128) + .payload(Bytes::from(payload)) + .build() + .unwrap(), + ]; + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::balanced(), + &mut messages, + ) + .await + .expect("Failed to send messages"); + } + + fixture + .wait_for_points("iggy_messages", 3) + .await + .expect("Failed to wait for 3 InfluxDB points"); +} diff --git a/core/integration/tests/connectors/influxdb/influxdb_source.rs b/core/integration/tests/connectors/influxdb/influxdb_source.rs new file mode 100644 index 0000000000..ea9ccc403a --- /dev/null +++ b/core/integration/tests/connectors/influxdb/influxdb_source.rs @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses you to + * you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the specific language governing permissions and limitations + * under the License. + */ + +use super::TEST_MESSAGE_COUNT; +use crate::connectors::fixtures::InfluxDbSourceFixture; +use iggy_binary_protocol::MessageClient; +use iggy_common::Utc; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use integration::harness::seeds; +use integration::iggy_harness; +use serde_json::Value; +use tracing::info; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_source_polls_and_produces_messages( + harness: &TestHarness, + fixture: InfluxDbSourceFixture, +) { + let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64; + let lines: Vec = (0..TEST_MESSAGE_COUNT) + .map(|i| { + format!( + "sensor_readings,loc=lab v={v} {base_ts}", + v = 20.0 + i as f64, + base_ts = base_ts + i as u64 * 1000, + ) + }) + .collect(); + let line_refs: Vec<&str> = lines.iter().map(String::as_str).collect(); + + fixture + .write_lines(&line_refs) + .await + .expect("Failed to write lines to InfluxDB"); + + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer = Consumer::default(); + + let mut total = 0usize; + for _ in 0..100 { + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &consumer, + &PollingStrategy::next(), + 100, + true, + ) + .await + .expect("poll_messages failed"); + + total += polled.messages.len(); + if total >= TEST_MESSAGE_COUNT { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + assert!( + total >= TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} messages, got {total}" + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_source_message_payload_structure( + harness: &TestHarness, + fixture: InfluxDbSourceFixture, +) { + let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64; + fixture + .write_lines(&[&format!("sensor_readings,loc=roof humidity=78.5 {base_ts}")]) + .await + .expect("Failed to write line"); + + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer = Consumer::default(); + + let mut msgs: Vec = Vec::new(); + for _ in 0..100 { + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &consumer, + &PollingStrategy::next(), + 10, + true, + ) + .await + .expect("poll_messages failed"); + + for m in polled.messages { + if let Ok(v) = serde_json::from_slice::(&m.payload) { + msgs.push(v); + } + } + if !msgs.is_empty() { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + info!("Received messages {:?}", msgs); + assert_eq!(msgs.len(), 1, "Expected 1 message, got {}", msgs.len()); + let m = &msgs[0]; + assert!( + m.get("measurement").is_some(), + "missing 'measurement': {{m}}" + ); + assert!(m.get("timestamp").is_some(), "missing 'timestamp': {{m}}"); + assert!(m.get("value").is_some(), "missing 'value': {{m}}"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_source_empty_bucket_produces_no_messages( + harness: &TestHarness, + fixture: InfluxDbSourceFixture, +) { + // Write nothing — bucket intentionally empty for this measurement. + let _ = &fixture; + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer = Consumer::default(); + + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &consumer, + &PollingStrategy::next(), + 100, + false, + ) + .await + .expect("poll_messages failed"); + + assert_eq!( + polled.messages.len(), + 0, + "Expected 0 messages for empty bucket, got {}", + polled.messages.len() + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), + seed = seeds::connector_stream +)] +async fn influxdb_source_multiple_measurements( + harness: &TestHarness, + fixture: InfluxDbSourceFixture, +) { + let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64; + fixture + .write_lines(&[ + &format!("temperature,room=living v=21.5 {base_ts}"), + &format!("humidity,room=living v=55.0 {}", base_ts + 1000), + &format!("pressure,room=living v=1013.25 {}", base_ts + 2000), + ]) + .await + .expect("Failed to write lines"); + + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer = Consumer::default(); + + let mut msgs: Vec = Vec::new(); + for _ in 0..100 { + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &consumer, + &PollingStrategy::next(), + 100, + true, + ) + .await + .expect("poll_messages failed"); + + for m in polled.messages { + if let Ok(v) = serde_json::from_slice::(&m.payload) { + msgs.push(v); + } + } + if msgs.len() >= 3 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + info!("influxdb_source_multiple_measurements Received {:#?}", msgs); + assert_eq!(msgs.len(), 3, "Expected 3 messages, got {}", msgs.len()); + + let measurements: Vec<&str> = msgs + .iter() + .filter_map(|m| m["measurement"].as_str()) + .collect(); + assert!(measurements.contains(&"temperature"), "missing temperature"); + assert!(measurements.contains(&"humidity"), "missing humidity"); + assert!(measurements.contains(&"pressure"), "missing pressure"); +} diff --git a/core/integration/tests/connectors/influxdb/mod.rs b/core/integration/tests/connectors/influxdb/mod.rs new file mode 100644 index 0000000000..e283df885c --- /dev/null +++ b/core/integration/tests/connectors/influxdb/mod.rs @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod influxdb_sink; +mod influxdb_source; + +const TEST_MESSAGE_COUNT: usize = 3; diff --git a/core/integration/tests/connectors/influxdb/sink.toml b/core/integration/tests/connectors/influxdb/sink.toml new file mode 100644 index 0000000000..68f80b2a9b --- /dev/null +++ b/core/integration/tests/connectors/influxdb/sink.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/influxdb_sink" +[plugin_config] +precision = "us" +measurement = "iggy_messages" +batch_size = 100 +payload_format = "json" +max_retries = 3 +retry_delay = "200ms" +timeout = "10s" +max_open_retries = 5 +open_retry_max_delay = "10s" +circuit_breaker_threshold = 10 +circuit_breaker_cool_down = "5s" diff --git a/core/integration/tests/connectors/influxdb/source.toml b/core/integration/tests/connectors/influxdb/source.toml new file mode 100644 index 0000000000..672b838ea8 --- /dev/null +++ b/core/integration/tests/connectors/influxdb/source.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sources/influxdb_source" +[plugin_config] +precision = "us" diff --git a/core/integration/tests/connectors/influxdb_sink.toml b/core/integration/tests/connectors/influxdb_sink.toml new file mode 100644 index 0000000000..7894320cbb --- /dev/null +++ b/core/integration/tests/connectors/influxdb_sink.toml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB sink (test)" +path = "target/release/libiggy_connector_influxdb_sink" +verbose = false + +[[streams]] +stream = "connector_stream" +topics = ["connector_topic"] +schema = "json" +batch_length = 10 +poll_interval = "100ms" +consumer_group = "influxdb_sink_test" + +[plugin_config] +url = "http://localhost:8086" +org = "test-org" +bucket = "test-bucket" +token = "test-token" +measurement = "iggy_messages" +precision = "ms" +batch_size = 100 +include_metadata = true +include_checksum = false +include_origin_timestamp = false +include_stream_tag = true +include_topic_tag = true +include_partition_tag = true +payload_format = "json" +max_retries = 3 +retry_delay = "200ms" +timeout = "5s" +max_open_retries = 3 +open_retry_max_delay = "5s" +circuit_breaker_threshold = 10 +circuit_breaker_cool_down = "5s" diff --git a/core/integration/tests/connectors/influxdb_source.toml b/core/integration/tests/connectors/influxdb_source.toml new file mode 100644 index 0000000000..4a5d985cc5 --- /dev/null +++ b/core/integration/tests/connectors/influxdb_source.toml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "influxdb" +enabled = true +version = 0 +name = "InfluxDB source (test)" +path = "target/release/libiggy_connector_influxdb_source" +verbose = false + +[[streams]] +stream = "connector_stream" +topic = "connector_topic" +schema = "json" +batch_length = 100 + +[plugin_config] +url = "http://localhost:8086" +org = "test-org" +token = "test-token" +query = ''' +from(bucket: "test-bucket") + |> range(start: 2023-01-01T00:00:00Z) + |> filter(fn: (r) => r._time > time(v: "$cursor")) + |> sort(columns: ["_time"]) + |> limit(n: $limit) +''' +poll_interval = "200ms" +batch_size = 500 +cursor_field = "_time" +initial_offset = "2023-01-01T00:00:00Z" +include_metadata = true +payload_format = "json" +max_retries = 3 +retry_delay = "200ms" +timeout = "5s" +max_open_retries = 3 +open_retry_max_delay = "5s" +circuit_breaker_threshold = 10 +circuit_breaker_cool_down = "5s" diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index 0d93529049..9e71e6545f 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -22,6 +22,7 @@ mod elasticsearch; mod fixtures; mod http_config_provider; mod iceberg; +mod influxdb; mod mongodb; mod postgres; mod quickwit;