Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you never test id 0 (auto-generated ids). the server replaces 0 with a uuid before the dedup check (messages_batch_mut.rs:164-166), so sending multiple messages with id 0 should all pass through. without this, you could silently break the default send path and never know. after that, please also poll the messages to check whether they have unique message id.

Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup};
use bytes::Bytes;
use iggy::prelude::*;
use integration::harness::{TestHarness, assert_clean_system};

const MESSAGES_PER_BATCH: u32 = 10;

pub async fn run(harness: &TestHarness) {
let client = harness
.root_client()
.await
.expect("Failed to get root client");
init_system(&client).await;

// Step 1: Duplicate rejection
// Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs.
// Only the first batch should be persisted.
let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original");
client
.send_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut original_messages,
)
.await
.unwrap();

let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate");
client
.send_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut duplicate_messages,
)
.await
.unwrap();

let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await;
assert_eq!(
polled.messages.len() as u32,
MESSAGES_PER_BATCH,
"Duplicate messages should have been dropped"
);
Comment on lines +57 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after dedup removes messages mid-batch, offsets and indexes get rebuilt via remove_messages(). you never verify that the polled messages actually have correct, contiguous offsets. a bug in the offset recalculation would be a silent data corruption issue and this test wouldn't catch it.

for msg in &polled.messages {
let payload = std::str::from_utf8(&msg.payload).unwrap();
assert!(
payload.starts_with("original"),
"Expected original payload, got: {payload}"
);
}

// Step 2: Unique messages pass through
// Send messages with new IDs (`MESSAGES_PER_BATCH`+1)..(`MESSAGES_PER_BATCH`*2) — these should all be accepted.
let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique");
client
.send_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut new_messages,
)
.await
.unwrap();

let polled = poll_all(&client, MESSAGES_PER_BATCH * 3).await;
assert_eq!(
polled.messages.len() as u32,
MESSAGES_PER_BATCH * 2,
"Unique messages should have been accepted"
);

// Step 3: Partial deduplication
// Send a batch where half the IDs are duplicates and half are new.
// IDs `overlap_id_start`..(`overlap_id_start`+`MESSAGES_PER_BATCH`):
// IDs `overlap_id_start`..(`MESSAGES_PER_BATCH`*2) already exist (from Step 2),
// IDs (`MESSAGES_PER_BATCH`*2+1)..(`overlap_id_start`+`MESSAGES_PER_BATCH`) are new.
let overlap_id_start = 16;
let total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1;
// Ensure the `overlap_id_start` is between two batches.
assert!(overlap_id_start > MESSAGES_PER_BATCH && overlap_id_start < 2 * MESSAGES_PER_BATCH);
Comment on lines +96 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overlap_id_start = 16 is a magic number and the formula total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1 only works because ids happen to be contiguous from 1. consider deriving it from the actual unique id range, e.g. let new_ids_in_mixed = (overlap_id_start + MESSAGES_PER_BATCH) - (2 * MESSAGES_PER_BATCH + 1) or just computing the set difference explicitly. also build_messages uses 0-based i in the payload instead of the actual message id, which makes correlating payloads to ids harder than it needs to be.

let mut mixed_messages = build_messages(overlap_id_start, MESSAGES_PER_BATCH, "mixed");
client
.send_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut mixed_messages,
)
.await
.unwrap();

let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await;
assert_eq!(
polled.messages.len() as u32,
total_after_step3,
"Only new IDs from the mixed batch should have been accepted"
);
Comment on lines +111 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

steps 3 and 4 only check message counts but never verify payloads. step 1 does this correctly (checks for "original" prefix), but step 3 doesn't verify the 5 surviving messages are actually the new ones (ids 21-25 with "mixed" payload) andnot some other combination. same for step 4 -- should check "after-ttl" payloads. count-only assertions can pass even if the wrong messages are kept.


// Step 4: TTL expiry re-accepts previously seen IDs
// The server is configured with a 2s dedup expiry. Wait for it to elapse,
// then resend the original IDs — they should be accepted again.
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut reused_messages = build_messages(1, MESSAGES_PER_BATCH, "after-ttl");
client
.send_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut reused_messages,
)
.await
.unwrap();

let polled = poll_all(&client, total_after_step3 + MESSAGES_PER_BATCH).await;
assert_eq!(
polled.messages.len() as u32,
total_after_step3 + MESSAGES_PER_BATCH,
"Previously seen IDs should be accepted again after TTL expiry"
);

cleanup(&client, false).await;
assert_clean_system(&client).await;
}

async fn init_system(client: &IggyClient) {
client.create_stream(STREAM_NAME).await.unwrap();
client
.create_topic(
&Identifier::named(STREAM_NAME).unwrap(),
TOPIC_NAME,
1,
CompressionAlgorithm::default(),
None,
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.await
.unwrap();
}

fn build_messages(start_id: u32, count: u32, payload_prefix: &str) -> Vec<IggyMessage> {
(0..count)
.map(|i| {
let id = (start_id + i) as u128;
IggyMessage::builder()
.id(id)
.payload(Bytes::from(format!("{payload_prefix}-{i}")))
.build()
.expect("Failed to create message")
})
.collect()
}

async fn poll_all(client: &IggyClient, max_count: u32) -> PolledMessages {
let consumer = Consumer::default();
client
.poll_messages(
&Identifier::named(STREAM_NAME).unwrap(),
&Identifier::named(TOPIC_NAME).unwrap(),
Some(PARTITION_ID),
&consumer,
&PollingStrategy::offset(0),
max_count,
false,
)
.await
.unwrap()
}
1 change: 1 addition & 0 deletions core/integration/tests/server/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod encryption_scenario;
pub mod invalid_consumer_offset_scenario;
pub mod log_rotation_scenario;
pub mod message_cleanup_scenario;
pub mod message_deduplication_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod offset_scenario;
Expand Down
12 changes: 10 additions & 2 deletions core/integration/tests/server/specific.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/

use crate::server::scenarios::{
message_size_scenario, segment_rotation_race_scenario, single_message_per_batch_scenario,
tcp_tls_scenario, websocket_tls_scenario,
message_deduplication_scenario, message_size_scenario, segment_rotation_race_scenario,
single_message_per_batch_scenario, tcp_tls_scenario, websocket_tls_scenario,
};
use integration::iggy_harness;

Expand Down Expand Up @@ -86,3 +86,11 @@ async fn should_handle_single_message_per_batch_with_delayed_persistence(harness
async fn segment_rotation_scenario(harness: &TestHarness) {
segment_rotation_race_scenario::run(harness).await;
}

#[iggy_harness(server(
system.message_deduplication.enabled = true,
system.message_deduplication.expiry = "2s"
))]
async fn message_deduplication(harness: &TestHarness) {
message_deduplication_scenario::run(harness).await;
}
4 changes: 4 additions & 0 deletions core/server/src/shard/system/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ impl IggyShard {
)
.await;

if batch.count() == 0 {
return Ok(());
}
Comment on lines +359 to +361
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get that it's a fix, but you didn't test it. nothing in this test actually exercises it. you'd need a step that sends a batch where every message is a duplicate (e.g. resend ids 1-10 a third time without any new ids mixed in) and verify the server doesn't blow up and the message count stays the same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn’t this exactly what the step does where all ids are overlapping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, step 1 does hit that path - my bad, sorry. still, it'd be nice to have it as its own explicit step so it's obvious we're regression-testing that specific fix, not just as a side effect of the dedupcheck.


Comment on lines +359 to +362
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[note] This was causing a bug when all the messages were with duplicate IDs because the batch was empty and we unwraped on first

let (journal_messages_count, journal_size, is_full) = {
let mut partitions = self.local_partitions.borrow_mut();
let partition = partitions
Expand Down
5 changes: 4 additions & 1 deletion core/server/src/shard/system/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::shard::transmission::event::PartitionInfo;
use crate::shard::transmission::message::ResolvedTopic;
use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets;
use crate::streaming::partitions::consumer_offsets::ConsumerOffsets;
use crate::streaming::partitions::helpers::create_message_deduplicator;
use crate::streaming::partitions::local_partition::LocalPartition;
use crate::streaming::partitions::storage::create_partition_file_hierarchy;
use crate::streaming::partitions::storage::delete_partitions_from_disk;
Expand Down Expand Up @@ -280,13 +281,15 @@ impl IggyShard {
)
});

let message_deduplicator = create_message_deduplicator(&self.config.system).map(Arc::new);

let partition = LocalPartition::with_log(
loaded_log,
stats,
std::sync::Arc::new(std::sync::atomic::AtomicU64::new(current_offset)),
consumer_offsets,
consumer_group_offsets,
None,
message_deduplicator,
Comment on lines +284 to +292
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[note] I've added this here since the message_deduplicator was always None

created_at,
revision_id,
current_offset > 0,
Expand Down
Loading