-
Notifications
You must be signed in to change notification settings - Fork 287
feat(server): Add integration test for message deduplicatpr #2879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,187 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup}; | ||
| use bytes::Bytes; | ||
| use iggy::prelude::*; | ||
| use integration::harness::{TestHarness, assert_clean_system}; | ||
|
|
||
| const MESSAGES_PER_BATCH: u32 = 10; | ||
|
|
||
| pub async fn run(harness: &TestHarness) { | ||
| let client = harness | ||
| .root_client() | ||
| .await | ||
| .expect("Failed to get root client"); | ||
| init_system(&client).await; | ||
|
|
||
| // Step 1: Duplicate rejection | ||
| // Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs. | ||
| // Only the first batch should be persisted. | ||
| let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); | ||
| client | ||
| .send_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| &Partitioning::partition_id(PARTITION_ID), | ||
| &mut original_messages, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate"); | ||
| client | ||
| .send_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| &Partitioning::partition_id(PARTITION_ID), | ||
| &mut duplicate_messages, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await; | ||
| assert_eq!( | ||
| polled.messages.len() as u32, | ||
| MESSAGES_PER_BATCH, | ||
| "Duplicate messages should have been dropped" | ||
| ); | ||
|
Comment on lines
+57
to
+62
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after dedup removes messages mid-batch, offsets and indexes get rebuilt via remove_messages(). you never verify that the polled messages actually have correct, contiguous offsets. a bug in the offset recalculation would be a silent data corruption issue and this test wouldn't catch it. |
||
| for msg in &polled.messages { | ||
| let payload = std::str::from_utf8(&msg.payload).unwrap(); | ||
| assert!( | ||
| payload.starts_with("original"), | ||
| "Expected original payload, got: {payload}" | ||
| ); | ||
| } | ||
|
|
||
| // Step 2: Unique messages pass through | ||
| // Send messages with new IDs (`MESSAGES_PER_BATCH`+1)..(`MESSAGES_PER_BATCH`*2) — these should all be accepted. | ||
| let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique"); | ||
| client | ||
| .send_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| &Partitioning::partition_id(PARTITION_ID), | ||
| &mut new_messages, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let polled = poll_all(&client, MESSAGES_PER_BATCH * 3).await; | ||
| assert_eq!( | ||
| polled.messages.len() as u32, | ||
| MESSAGES_PER_BATCH * 2, | ||
| "Unique messages should have been accepted" | ||
| ); | ||
|
|
||
| // Step 3: Partial deduplication | ||
| // Send a batch where half the IDs are duplicates and half are new. | ||
| // IDs `overlap_id_start`..(`overlap_id_start`+`MESSAGES_PER_BATCH`): | ||
| // IDs `overlap_id_start`..(`MESSAGES_PER_BATCH`*2) already exist (from Step 2), | ||
| // IDs (`MESSAGES_PER_BATCH`*2+1)..(`overlap_id_start`+`MESSAGES_PER_BATCH`) are new. | ||
| let overlap_id_start = 16; | ||
| let total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1; | ||
| // Ensure the `overlap_id_start` is between two batches. | ||
| assert!(overlap_id_start > MESSAGES_PER_BATCH && overlap_id_start < 2 * MESSAGES_PER_BATCH); | ||
|
Comment on lines
+96
to
+99
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. overlap_id_start = 16 is a magic number and the formula |
||
| let mut mixed_messages = build_messages(overlap_id_start, MESSAGES_PER_BATCH, "mixed"); | ||
| client | ||
| .send_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| &Partitioning::partition_id(PARTITION_ID), | ||
| &mut mixed_messages, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await; | ||
| assert_eq!( | ||
| polled.messages.len() as u32, | ||
| total_after_step3, | ||
| "Only new IDs from the mixed batch should have been accepted" | ||
| ); | ||
|
Comment on lines
+111
to
+116
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. steps 3 and 4 only check message counts but never verify payloads. step 1 does this correctly (checks for "original" prefix), but step 3 doesn't verify the 5 surviving messages are actually the new ones (ids 21-25 with "mixed" payload) andnot some other combination. same for step 4 -- should check "after-ttl" payloads. count-only assertions can pass even if the wrong messages are kept. |
||
|
|
||
| // Step 4: TTL expiry re-accepts previously seen IDs | ||
| // The server is configured with a 2s dedup expiry. Wait for it to elapse, | ||
| // then resend the original IDs — they should be accepted again. | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; | ||
| let mut reused_messages = build_messages(1, MESSAGES_PER_BATCH, "after-ttl"); | ||
| client | ||
| .send_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| &Partitioning::partition_id(PARTITION_ID), | ||
| &mut reused_messages, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let polled = poll_all(&client, total_after_step3 + MESSAGES_PER_BATCH).await; | ||
| assert_eq!( | ||
| polled.messages.len() as u32, | ||
| total_after_step3 + MESSAGES_PER_BATCH, | ||
| "Previously seen IDs should be accepted again after TTL expiry" | ||
| ); | ||
|
|
||
| cleanup(&client, false).await; | ||
| assert_clean_system(&client).await; | ||
| } | ||
|
|
||
| async fn init_system(client: &IggyClient) { | ||
| client.create_stream(STREAM_NAME).await.unwrap(); | ||
| client | ||
| .create_topic( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| TOPIC_NAME, | ||
| 1, | ||
| CompressionAlgorithm::default(), | ||
| None, | ||
| IggyExpiry::NeverExpire, | ||
| MaxTopicSize::ServerDefault, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| } | ||
|
|
||
| fn build_messages(start_id: u32, count: u32, payload_prefix: &str) -> Vec<IggyMessage> { | ||
| (0..count) | ||
| .map(|i| { | ||
| let id = (start_id + i) as u128; | ||
| IggyMessage::builder() | ||
| .id(id) | ||
| .payload(Bytes::from(format!("{payload_prefix}-{i}"))) | ||
| .build() | ||
| .expect("Failed to create message") | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| async fn poll_all(client: &IggyClient, max_count: u32) -> PolledMessages { | ||
| let consumer = Consumer::default(); | ||
| client | ||
| .poll_messages( | ||
| &Identifier::named(STREAM_NAME).unwrap(), | ||
| &Identifier::named(TOPIC_NAME).unwrap(), | ||
| Some(PARTITION_ID), | ||
| &consumer, | ||
| &PollingStrategy::offset(0), | ||
| max_count, | ||
| false, | ||
| ) | ||
| .await | ||
| .unwrap() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -356,6 +356,10 @@ impl IggyShard { | |
| ) | ||
| .await; | ||
|
|
||
| if batch.count() == 0 { | ||
| return Ok(()); | ||
| } | ||
|
Comment on lines
+359
to
+361
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i get that it's a fix, but you didn't test it. nothing in this test actually exercises it. you'd need a step that sends a batch where every message is a duplicate (e.g. resend ids 1-10 a third time without any new ids mixed in) and verify the server doesn't blow up and the message count stays the same.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn’t this exactly what the step does where all ids are overlapping?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right, step 1 does hit that path - my bad, sorry. still, it'd be nice to have it as its own explicit step so it's obvious we're regression-testing that specific fix, not just as a side effect of the dedupcheck. |
||
|
|
||
|
Comment on lines
+359
to
+362
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [note] This was causing a bug when all the messages were with duplicate IDs because the batch was empty and we unwraped on first |
||
| let (journal_messages_count, journal_size, is_full) = { | ||
| let mut partitions = self.local_partitions.borrow_mut(); | ||
| let partition = partitions | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ use crate::shard::transmission::event::PartitionInfo; | |
| use crate::shard::transmission::message::ResolvedTopic; | ||
| use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets; | ||
| use crate::streaming::partitions::consumer_offsets::ConsumerOffsets; | ||
| use crate::streaming::partitions::helpers::create_message_deduplicator; | ||
| use crate::streaming::partitions::local_partition::LocalPartition; | ||
| use crate::streaming::partitions::storage::create_partition_file_hierarchy; | ||
| use crate::streaming::partitions::storage::delete_partitions_from_disk; | ||
|
|
@@ -280,13 +281,15 @@ impl IggyShard { | |
| ) | ||
| }); | ||
|
|
||
| let message_deduplicator = create_message_deduplicator(&self.config.system).map(Arc::new); | ||
|
|
||
| let partition = LocalPartition::with_log( | ||
| loaded_log, | ||
| stats, | ||
| std::sync::Arc::new(std::sync::atomic::AtomicU64::new(current_offset)), | ||
| consumer_offsets, | ||
| consumer_group_offsets, | ||
| None, | ||
| message_deduplicator, | ||
|
Comment on lines
+284
to
+292
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [note] I've added this here since the message_deduplicator was always None |
||
| created_at, | ||
| revision_id, | ||
| current_offset > 0, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you never test id 0 (auto-generated ids). the server replaces 0 with a uuid before the dedup check (messages_batch_mut.rs:164-166), so sending multiple messages with id 0 should all pass through. without this, you could silently break the default send path and never know. after that, please also poll the messages to check whether they have unique message id.