Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
749613f
Add InfluxDB source and sink example configs
ryerraguntla Mar 9, 2026
29bdf19
InfluxDB: retries, jitter, circuit breaker
ryerraguntla Mar 11, 2026
8bf7a29
Adding test fixtures
ryerraguntla Mar 11, 2026
677450b
Updating InfluxDB test container configuration
ryerraguntla Mar 11, 2026
625eac6
Create sink.toml for InfluxDB connector
ryerraguntla Mar 11, 2026
d38563f
Add influxdb source configuration file
ryerraguntla Mar 11, 2026
a739b39
Add InfluxDB connector test configs
ryerraguntla Mar 11, 2026
a6c284f
Add InfluxDB connector test configs
ryerraguntla Mar 11, 2026
aa3c662
Update influxdb_sink.rs with new content
ryerraguntla Mar 11, 2026
cb80cfe
Update influxdb_source.rs
ryerraguntla Mar 11, 2026
aabaafd
Update the InfluxDB source implementation
ryerraguntla Mar 11, 2026
ba9cdf5
Fix influxdb_source.rs: restore correct Rust format string interpolation
ryerraguntla Mar 11, 2026
c8b8c02
Fix influxdb_source.rs: correct all format strings and remove erroneo…
ryerraguntla Mar 11, 2026
e14c0f4
InfluxDB: switch to ns precision and increase timeout
ryerraguntla Mar 11, 2026
8ba2253
Commit core/connectors/sinks/mongodb_sink/config.toml
ryerraguntla Mar 14, 2026
65ed00d
Add dummy comment and group influxdb imports
ryerraguntla Mar 12, 2026
59fec00
Removing the duplicates
ryerraguntla Mar 14, 2026
1b2f22d
committing Cargo.lock
ryerraguntla Mar 14, 2026
f046963
InfluxDB: use microsecond precision
ryerraguntla Mar 12, 2026
1d4e2fe
InfluxDB: fix timestamps, precision, and queries
ryerraguntla Mar 12, 2026
d69a4e8
Use 1ms increments for test timestamps
ryerraguntla Mar 13, 2026
e135eef
docs: remove promotional AI-generated language from README
ryerraguntla Mar 13, 2026
f86d30a
Commit Cargo.lock
ryerraguntla Mar 14, 2026
3b4814e
Merge branch 'master' into feat/influxdb-connector
ryerraguntla Mar 15, 2026
2416e62
Checking in the cleanup files
ryerraguntla Mar 15, 2026
0ce1951
Add InfluxDB connectors and improve sink reliability
ryerraguntla Mar 15, 2026
9e02044
Bump csv crate to 1.3.1
ryerraguntla Mar 15, 2026
1f5aa30
fix: restore mongodb fixtures, ONE_DAY_MICROS constant, and fix pub m…
ryerraguntla Mar 15, 2026
69bbb88
chore: remove accidental .elastic-copilot directory
ryerraguntla Mar 15, 2026
3719a44
Add csv and influxdb dependencies
ryerraguntla Mar 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ members = [
"core/connectors/sdk",
"core/connectors/sinks/elasticsearch_sink",
"core/connectors/sinks/iceberg_sink",
"core/connectors/sinks/influxdb_sink",
"core/connectors/sinks/mongodb_sink",
"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
"core/connectors/sources/elasticsearch_source",
"core/connectors/sources/influxdb_source",
"core/connectors/sources/postgres_source",
"core/connectors/sources/random_source",
"core/consensus",
Expand Down Expand Up @@ -118,6 +120,7 @@ consensus = { path = "core/consensus" }
console-subscriber = "0.5.0"
crossbeam = "0.8.4"
crossfire = "3.1.5"
csv = "1.3.1"
ctor = "0.6.3"
ctrlc = { version = "3.5", features = ["termination"] }
cucumber = "0.22"
Expand Down
4 changes: 4 additions & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ crossterm_winapi: 0.9.1, "MIT",
crunchy: 0.2.4, "MIT",
crypto-bigint: 0.5.5, "Apache-2.0 OR MIT",
crypto-common: 0.1.7, "Apache-2.0 OR MIT",
csv: 1.4.0, "MIT OR Unlicense",
csv-core: 0.1.13, "MIT OR Unlicense",
ctor: 0.6.3, "Apache-2.0 OR MIT",
ctor-proc-macro: 0.0.7, "Apache-2.0 OR MIT",
ctr: 0.9.2, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -457,6 +459,8 @@ iggy_common: 0.9.2-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.3.2-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_source: 0.3.2-edge.1, "Apache-2.0",
iggy_connector_iceberg_sink: 0.3.2-edge.1, "Apache-2.0",
iggy_connector_influxdb_sink: 0.2.2-edge.1, "Apache-2.0",
iggy_connector_influxdb_source: 0.2.2-edge.1, "Apache-2.0",
iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
iggy_connector_postgres_sink: 0.3.2-edge.1, "Apache-2.0",
iggy_connector_postgres_source: 0.3.2-edge.1, "Apache-2.0",
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

---

**Iggy** is a persistent message streaming platform written in Rust, supporting QUIC, WebSocket, TCP (custom binary specification) and HTTP (regular REST API) transport protocols, **capable of processing millions of messages per second at ultra-low latency**.
**Iggy** is a persistent message streaming platform written in Rust, supporting QUIC, WebSocket, TCP (custom binary specification) and HTTP (regular REST API) transport protocols, capable of processing millions of messages per second with low latency.

Iggy provides **exceptionally high throughput and performance** while utilizing minimal computing resources.
Iggy provides high throughput and predictable performance while utilizing minimal computing resources.

This is **not yet another extension** running on top of existing infrastructure, such as Kafka or SQL database.

Expand Down Expand Up @@ -174,7 +174,7 @@ fields = ["email", "created_at"]

## Model Context Protocol

The [Model Context Protocol](https://modelcontextprotocol.io) (MCP) is an open protocol that standardizes how applications provide context to LLMs. The **[Iggy MCP Server](https://github.com/apache/iggy/tree/master/core/ai/mcp)** is an implementation of the MCP protocol for the message streaming infrastructure. It can be used to provide context to LLMs in real-time, allowing for more accurate and relevant responses.
The [Model Context Protocol](https://modelcontextprotocol.io) (MCP) is an open protocol that standardizes how applications provide context to LLMs. The **[Iggy MCP Server](https://github.com/apache/iggy/tree/master/core/ai/mcp)** is an implementation of the MCP protocol for the message streaming infrastructure. It can be used to provide context to LLMs in real-time.

![server](assets/iggy_mcp_server.png)

Expand Down Expand Up @@ -399,7 +399,7 @@ while let Some(message) = consumer.next().await {

## Benchmarks

**Benchmarks should be the first-class citizens**. We believe that performance is crucial for any system, and we strive to provide the best possible performance for our users. Please check, why we believe that the **[transparent
Please check, why we believe that the **[transparent
benchmarking](https://iggy.apache.org/blogs/2025/02/17/transparent-benchmarks)** is so important.

We've also built the **[benchmarking platform](https://benchmarks.iggy.apache.org)** where anyone can upload the benchmarks and compare the results with others. Source code for the platform is available in the `core/bench/dashboard` directory.
Expand Down Expand Up @@ -464,7 +464,7 @@ These benchmarks would start the server with the default configuration, create a

For example, to run the benchmark for the already started server, provide the additional argument `--server-address 0.0.0.0:8090`.

**Iggy is already capable of processing millions of messages per second at the microseconds range for p99+ latency** Depending on the hardware, transport protocol (`quic`, `websocket`, `tcp` or `http`) and payload size (`messages-per-batch * message-size`) you might expect **over 5000 MB/s (e.g. 5M of 1 KB msg/sec) throughput for writes and reads**.
Depending on the hardware, transport protocol (`quic`, `websocket`, `tcp` or `http`) and payload size (`messages-per-batch * message-size`) you might expect **over 5000 MB/s (e.g. 5M of 1 KB msg/sec) throughput for writes and reads**.

Please refer to the mentioned [benchmarking platform](https://benchmarks.iggy.apache.org) where you can browse the results achieved on the different hardware configurations, using the different Iggy server versions.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "sink"
key = "influxdb"
enabled = true
version = 0
name = "InfluxDB sink"
path = "<BASE_DIR>/target/release/libiggy_connector_influxdb_sink"
plugin_config_format = "toml"
verbose = false

[[streams]]
stream = "events"
topics = ["influx_events"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "influxdb_sink"

[plugin_config]
url = "http://localhost:8086"
org = "iggy"
bucket = "events"
token = "my_super_secret_token_123"
measurement = "iggy_messages"
precision = "ns"
batch_size = 500
include_metadata = true
include_checksum = true
include_origin_timestamp = true
include_stream_tag = true
include_topic_tag = true
include_partition_tag = true
payload_format = "json"
max_retries = 3
retry_delay = "1s"
timeout = "10s"
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "source"
key = "influxdb"
enabled = true
version = 0
name = "InfluxDB source"
path = "<BASE_DIR>/target/release/libiggy_connector_influxdb_source"
plugin_config_format = "toml"
verbose = false

[[streams]]
stream = "events"
topic = "influx_events"
schema = "json"
batch_length = 100

[plugin_config]
url = "http://localhost:8086"
org = "iggy_org"
token = "my_super_secret_token_123"
query = '''
from(bucket: "iggy_bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "iggy_messages")
|> filter(fn: (r) => r._time > time(v: "$cursor"))
|> sort(columns: ["_time"])
|> limit(n: $limit)
'''
poll_interval = "5s"
batch_size = 500
cursor_field = "_time"
initial_offset = "1970-01-01T00:00:00Z"
include_metadata = true
payload_format = "json"
max_retries = 3
retry_delay = "1s"
timeout = "10s"
precision = "us"
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "source"
key = "influxdb"
enabled = true
version = 0
name = "InfluxDB source"
path = "target/release/libiggy_connector_influxdb_source"
plugin_config_format = "toml"
verbose = false

[[streams]]
stream = "events"
topic = "influx_events"
schema = "json"
batch_length = 100

[plugin_config]
url = "http://localhost:8086"
org = "iggy"
token = "replace-with-token"
query = '''
from(bucket: "events")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "iggy_messages")
|> filter(fn: (r) => r._time > time(v: "$cursor"))
|> sort(columns: ["_time"])
|> limit(n: $limit)
'''
poll_interval = "5s"
batch_size = 500
cursor_field = "_time"
initial_offset = "1970-01-01T00:00:00Z"
include_metadata = true
payload_format = "json"
max_retries = 3
retry_delay = "1s"
timeout = "10s"
precision = "us"
4 changes: 4 additions & 0 deletions core/connectors/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ pub enum Error {
#[error("Invalid config")]
InvalidConfig,
#[error("Invalid record")]
InvalidConfigValue(String),
#[error("Invalid record")]
InvalidRecord,
#[error("Invalid record value : {0}")]
InvalidRecordValue(String),
#[error("Invalid transformer")]
InvalidTransformer,
#[error("HTTP request failed: {0}")]
Expand Down
50 changes: 50 additions & 0 deletions core/connectors/sinks/influxdb_sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iggy_connector_influxdb_sink"
version = "0.2.2-edge.1"
description = "Iggy InfluxDB sink connector for storing stream messages as line protocol"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming", "influxdb", "sink"]
categories = ["command-line-utilities", "database", "network-programming"]
homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"

[package.metadata.cargo-machete]
ignored = ["dashmap", "once_cell", "futures"]

[lib]
crate-type = ["cdylib", "lib"]

[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
rand.workspace = true
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Loading
Loading