From 1a1cddd7db2c91b7caa909d6d6205f08ddf3657c Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Thu, 5 Mar 2026 16:06:57 +0200 Subject: [PATCH 1/3] Add integration test and fix small edge cases --- .../message_deduplication_scenario.rs | 161 ++++++++++++++++++ .../integration/tests/server/scenarios/mod.rs | 1 + core/integration/tests/server/specific.rs | 12 +- core/server/src/shard/system/messages.rs | 4 + core/server/src/shard/system/partitions.rs | 6 +- 5 files changed, 181 insertions(+), 3 deletions(-) create mode 100644 core/integration/tests/server/scenarios/message_deduplication_scenario.rs diff --git a/core/integration/tests/server/scenarios/message_deduplication_scenario.rs b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs new file mode 100644 index 0000000000..2da480c7f1 --- /dev/null +++ b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup}; +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::{TestHarness, assert_clean_system}; + +const MESSAGES_PER_BATCH: u32 = 10; + +pub async fn run(harness: &TestHarness) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + init_system(&client).await; + + // Step 1: Duplicate rejection + // Send messages with IDs 1..10, then resend the same IDs. + // Only the first batch should be persisted. + let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut original_messages, + ) + .await + .unwrap(); + + let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut duplicate_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH, + "Duplicate messages should have been dropped" + ); + for msg in &polled.messages { + let payload = std::str::from_utf8(&msg.payload).unwrap(); + assert!( + payload.starts_with("original"), + "Expected original payload, got: {payload}" + ); + } + + // Step 2: Unique messages pass through + // Send messages with new IDs 11..20 — these should all be accepted. + let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut new_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 3).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH * 2, + "Unique messages should have been accepted" + ); + + // Step 3: TTL expiry re-accepts previously seen IDs + // The server is configured with a 2s dedup expiry. Wait for it to elapse, + // then resend the original IDs — they should be accepted again. + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let mut reused_messages = build_messages(1, MESSAGES_PER_BATCH, "after-ttl"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut reused_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH * 3, + "Previously seen IDs should be accepted again after TTL expiry" + ); + + cleanup(&client, false).await; + assert_clean_system(&client).await; +} + +async fn init_system(client: &IggyClient) { + client.create_stream(STREAM_NAME).await.unwrap(); + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); +} + +fn build_messages(start_id: u32, count: u32, payload_prefix: &str) -> Vec { + (0..count) + .map(|i| { + let id = (start_id + i) as u128; + IggyMessage::builder() + .id(id) + .payload(Bytes::from(format!("{payload_prefix}-{i}"))) + .build() + .expect("Failed to create message") + }) + .collect() +} + +async fn poll_all(client: &IggyClient, max_count: u32) -> PolledMessages { + let consumer = Consumer::default(); + client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + max_count, + false, + ) + .await + .unwrap() +} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 5a3b3f7a48..b14d8f14cc 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -32,6 +32,7 @@ pub mod encryption_scenario; pub mod invalid_consumer_offset_scenario; pub mod log_rotation_scenario; pub mod message_cleanup_scenario; +pub mod message_deduplication_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod offset_scenario; diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index ad569b1bd5..606f532806 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -18,8 +18,8 @@ */ use crate::server::scenarios::{ - message_size_scenario, segment_rotation_race_scenario, single_message_per_batch_scenario, - tcp_tls_scenario, websocket_tls_scenario, + message_deduplication_scenario, message_size_scenario, segment_rotation_race_scenario, + single_message_per_batch_scenario, tcp_tls_scenario, websocket_tls_scenario, }; use integration::iggy_harness; @@ -86,3 +86,11 @@ async fn should_handle_single_message_per_batch_with_delayed_persistence(harness async fn segment_rotation_scenario(harness: &TestHarness) { segment_rotation_race_scenario::run(harness).await; } + +#[iggy_harness(server( + system.message_deduplication.enabled = true, + system.message_deduplication.expiry = "2s" +))] +async fn message_deduplication(harness: &TestHarness) { + message_deduplication_scenario::run(harness).await; +} diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index d8ce2d891b..925d66a5fd 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -356,6 +356,10 @@ impl IggyShard { ) .await; + if batch.count() == 0 { + return Ok(()); + } + let (journal_messages_count, journal_size, is_full) = { let mut partitions = self.local_partitions.borrow_mut(); let partition = partitions diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index e281544c10..6ba79293f4 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -26,6 +26,7 @@ use crate::streaming::partitions::consumer_offsets::ConsumerOffsets; use crate::streaming::partitions::local_partition::LocalPartition; use crate::streaming::partitions::storage::create_partition_file_hierarchy; use crate::streaming::partitions::storage::delete_partitions_from_disk; +use crate::streaming::partitions::helpers::create_message_deduplicator; use crate::streaming::segments::Segment; use crate::streaming::segments::storage::create_segment_storage; use crate::streaming::stats::PartitionStats; @@ -280,13 +281,16 @@ impl IggyShard { ) }); + let message_deduplicator = + create_message_deduplicator(&self.config.system).map(Arc::new); + let partition = LocalPartition::with_log( loaded_log, stats, std::sync::Arc::new(std::sync::atomic::AtomicU64::new(current_offset)), consumer_offsets, consumer_group_offsets, - None, + message_deduplicator, created_at, revision_id, current_offset > 0, From e8ea67142a284be5aef3d6c42a19e811aa8cf9a1 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Thu, 5 Mar 2026 18:11:49 +0200 Subject: [PATCH 2/3] Update test and linted files --- .../message_deduplication_scenario.rs | 38 ++++++++++++++++--- core/server/src/shard/system/partitions.rs | 5 +-- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/core/integration/tests/server/scenarios/message_deduplication_scenario.rs b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs index 2da480c7f1..6637532b54 100644 --- a/core/integration/tests/server/scenarios/message_deduplication_scenario.rs +++ b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs @@ -30,7 +30,7 @@ pub async fn run(harness: &TestHarness) { init_system(&client).await; // Step 1: Duplicate rejection - // Send messages with IDs 1..10, then resend the same IDs. + // Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs. // Only the first batch should be persisted. let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); client @@ -69,7 +69,7 @@ pub async fn run(harness: &TestHarness) { } // Step 2: Unique messages pass through - // Send messages with new IDs 11..20 — these should all be accepted. + // Send messages with new IDs (`MESSAGES_PER_BATCH`+1)..(`MESSAGES_PER_BATCH`*2) — these should all be accepted. let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique"); client .send_messages( @@ -88,11 +88,37 @@ pub async fn run(harness: &TestHarness) { "Unique messages should have been accepted" ); - // Step 3: TTL expiry re-accepts previously seen IDs + // Step 3: Partial deduplication + // Send a batch where half the IDs are duplicates and half are new. + // IDs `overlap_id_start`..(`overlap_id_start`+`MESSAGES_PER_BATCH`): + // IDs `overlap_id_start`..(`MESSAGES_PER_BATCH`*2) already exist (from Step 2), + // IDs (`MESSAGES_PER_BATCH`*2+1)..(`overlap_id_start`+`MESSAGES_PER_BATCH`) are new. + let overlap_id_start = 16; + let total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1; + // Ensure the `overlap_id_start` is between two batches. + assert!(overlap_id_start > MESSAGES_PER_BATCH && overlap_id_start < 2 * MESSAGES_PER_BATCH); + let mut mixed_messages = build_messages(overlap_id_start, MESSAGES_PER_BATCH, "mixed"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut mixed_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await; + assert_eq!( + polled.messages.len() as u32, + total_after_step3, + "Only new IDs from the mixed batch should have been accepted" + ); + + // Step 4: TTL expiry re-accepts previously seen IDs // The server is configured with a 2s dedup expiry. Wait for it to elapse, // then resend the original IDs — they should be accepted again. tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let mut reused_messages = build_messages(1, MESSAGES_PER_BATCH, "after-ttl"); client .send_messages( @@ -104,10 +130,10 @@ pub async fn run(harness: &TestHarness) { .await .unwrap(); - let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await; + let polled = poll_all(&client, total_after_step3 + MESSAGES_PER_BATCH).await; assert_eq!( polled.messages.len() as u32, - MESSAGES_PER_BATCH * 3, + total_after_step3 + MESSAGES_PER_BATCH, "Previously seen IDs should be accepted again after TTL expiry" ); diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 6ba79293f4..c28ae49763 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -23,10 +23,10 @@ use crate::shard::transmission::event::PartitionInfo; use crate::shard::transmission::message::ResolvedTopic; use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets; use crate::streaming::partitions::consumer_offsets::ConsumerOffsets; +use crate::streaming::partitions::helpers::create_message_deduplicator; use crate::streaming::partitions::local_partition::LocalPartition; use crate::streaming::partitions::storage::create_partition_file_hierarchy; use crate::streaming::partitions::storage::delete_partitions_from_disk; -use crate::streaming::partitions::helpers::create_message_deduplicator; use crate::streaming::segments::Segment; use crate::streaming::segments::storage::create_segment_storage; use crate::streaming::stats::PartitionStats; @@ -281,8 +281,7 @@ impl IggyShard { ) }); - let message_deduplicator = - create_message_deduplicator(&self.config.system).map(Arc::new); + let message_deduplicator = create_message_deduplicator(&self.config.system).map(Arc::new); let partition = LocalPartition::with_log( loaded_log, From 1111a992c236f274bcc81a436e86e7851cc9f2db Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Thu, 5 Mar 2026 20:44:28 +0200 Subject: [PATCH 3/3] Trigger Build