From eb0f35d5ca8ec9421533107bc3deee0078c1dd37 Mon Sep 17 00:00:00 2001 From: Tung To Date: Mon, 2 Feb 2026 14:52:47 +0700 Subject: [PATCH 1/9] in middle of direct io --- Cargo.lock | 2 + Cargo.toml | 1 + core/common/Cargo.toml | 1 + core/common/src/alloc/buffer.rs | 146 ++++++++++++------ core/common/src/alloc/memory_pool.rs | 128 ++++++++++----- .../src/types/message/messages_batch_mut.rs | 20 +-- .../src/types/segment_storage/index_reader.rs | 31 ++-- .../types/segment_storage/messages_reader.rs | 33 ++-- core/server/Cargo.toml | 1 + .../messages/poll_messages_handler.rs | 3 + .../messages/send_messages_handler.rs | 5 + core/server/src/http/messages.rs | 18 +-- core/server/src/streaming/partitions/ops.rs | 3 + 13 files changed, 249 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f4d7aa024..3046ae38e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5340,6 +5340,7 @@ version = "0.9.2-edge.1" dependencies = [ "aes-gcm", "ahash 0.8.12", + "aligned-vec", "base64 0.22.1", "blake3", "bon", @@ -9617,6 +9618,7 @@ name = "server" version = "0.7.2-edge.1" dependencies = [ "ahash 0.8.12", + "aligned-vec", "anyhow", "argon2", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index d914767715..9eba430105 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ actix-web = "4.13.0" aes-gcm = "0.10.3" ahash = { version = "0.8.12", features = ["serde"] } anyhow = "1.0.102" +aligned-vec = "0.6.4" argon2 = "0.5.3" arrow = "57.3.0" arrow-array = "57.3.0" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 5977d9c402..4b62ebb3d0 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -30,6 +30,7 @@ readme = "../../README.md" [dependencies] aes-gcm = { workspace = true } ahash = { workspace = true } +aligned-vec = { workspace = true } base64 = { workspace = true } blake3 = { workspace = true } bon = { workspace = true } diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 3c059cba3e..8f1e79d9f6 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -16,11 +16,16 @@ * under the License. */ -use super::memory_pool::{BytesMutExt, memory_pool}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer}; + +use super::memory_pool::{AlignedBufferExt, memory_pool}; +use bytes::Bytes; use compio::buf::{IoBuf, IoBufMut, SetLen}; -use std::mem::MaybeUninit; -use std::ops::{Deref, DerefMut}; +use std::{ + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; +use tracing::debug; /// A buffer wrapper that participates in memory pooling. /// @@ -32,7 +37,7 @@ pub struct PooledBuffer { from_pool: bool, original_capacity: usize, original_bucket_idx: Option, - inner: BytesMut, + inner: AlignedBuffer, } impl Default for PooledBuffer { @@ -48,13 +53,21 @@ impl PooledBuffer { /// /// * `capacity` - The capacity of the buffer pub fn with_capacity(capacity: usize) -> Self { - let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); + let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity.max(ALIGNMENT)); let original_capacity = buffer.capacity(); let original_bucket_idx = if was_pool_allocated { memory_pool().best_fit(original_capacity) } else { None }; + + debug_assert_eq!( + buffer.as_ptr() as usize % ALIGNMENT, + 0, + "PooledBuffer not aligned to {} bytes", + ALIGNMENT + ); + Self { from_pool: was_pool_allocated, original_capacity, @@ -63,12 +76,12 @@ impl PooledBuffer { } } - /// Creates a new pooled buffer from an existing `BytesMut`. + /// Creates a new pooled buffer from an existing `AlignedBuffer`. /// /// # Arguments /// - /// * `existing` - The existing `BytesMut` buffer - pub fn from_existing(existing: BytesMut) -> Self { + /// * `existing` - The existing `AlignedBuffer` buffer + pub fn from_existing(existing: AlignedBuffer) -> Self { Self { from_pool: false, original_capacity: existing.capacity(), @@ -83,7 +96,7 @@ impl PooledBuffer { from_pool: false, original_capacity: 0, original_bucket_idx: None, - inner: BytesMut::new(), + inner: AlignedBuffer::new(ALIGNMENT), } } @@ -127,6 +140,43 @@ impl PooledBuffer { } } + /// Split the buffer at given position, returning a new PooledBuffer + /// containing byte [0, at) and leaving [at, len) + /// + /// # Panic + /// Panics if at > len + /// + /// TODO(tungtose): write unit tests + pub fn split_to(&mut self, at: usize) -> PooledBuffer { + assert!(at <= self.len(), "split_to out of bounds"); + + let mut new_buff = PooledBuffer::with_capacity(at); + + new_buff.inner.extend_from_slice(&self.inner[..at]); + + let new_len = self.len() - at; + if new_len > 0 { + unsafe { + // move [at..] to [0..] + std::ptr::copy( + self.inner.as_ptr().add(at), + self.inner.as_mut_ptr(), + new_len, + ); + + self.inner.set_len(new_len); + } + } else { + self.inner.clear(); + } + + new_buff + } + + pub fn put>(&mut self, src: T) { + self.extend_from_slice(src.as_ref()); + } + /// Wrapper for extend_from_slice which might cause resize pub fn extend_from_slice(&mut self, extend_from: &[u8]) { let before_cap = self.inner.capacity(); @@ -140,7 +190,9 @@ impl PooledBuffer { /// Wrapper for put_bytes which might cause resize pub fn put_bytes(&mut self, byte: u8, len: usize) { let before_cap = self.inner.capacity(); - self.inner.put_bytes(byte, len); + + let start = self.inner.len(); + self.inner.resize(start + len, byte); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -150,7 +202,7 @@ impl PooledBuffer { /// Wrapper for put_slice which might cause resize pub fn put_slice(&mut self, src: &[u8]) { let before_cap = self.inner.capacity(); - self.inner.put_slice(src); + self.extend_from_slice(src); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -160,7 +212,7 @@ impl PooledBuffer { /// Wrapper for put_u32_le which might cause resize pub fn put_u32_le(&mut self, value: u32) { let before_cap = self.inner.capacity(); - self.inner.put_u32_le(value); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -170,7 +222,7 @@ impl PooledBuffer { /// Wrapper for put_u64_le which might cause resize pub fn put_u64_le(&mut self, value: u64) { let before_cap = self.inner.capacity(); - self.inner.put_u64_le(value); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -192,11 +244,12 @@ impl PooledBuffer { self.inner.is_empty() } - /// Consumes the PooledBuffer and returns the inner BytesMut. + /// Consumes the PooledBuffer and returns the inner AlignedBuffer. /// Note: This bypasses pool return logic, use with caution. - pub fn into_inner(self) -> BytesMut { + pub fn into_inner(self) -> AlignedBuffer { let mut this = std::mem::ManuallyDrop::new(self); - std::mem::take(&mut this.inner) + + std::mem::replace(&mut this.inner, AlignedBuffer::new(ALIGNMENT)) } /// Freezes the buffer, converting it to an immutable `Bytes`. @@ -205,24 +258,33 @@ impl PooledBuffer { /// return memory to the pool on drop (the frozen Bytes owns the allocation). /// The returned `Bytes` is Arc-backed, allowing cheap clones. pub fn freeze(&mut self) -> Bytes { + // TODO(tungotse): zero copy + + let bytes = Bytes::copy_from_slice(&self.inner); + self.inner.clear(); + + bytes + // Decrement pool counter since memory is transferred to Bytes // and won't be returned to the pool. - if self.from_pool - && let Some(bucket_idx) = self.original_bucket_idx - { - memory_pool().dec_bucket_in_use(bucket_idx); - } - - let inner = std::mem::take(&mut self.inner); - self.from_pool = false; - self.original_capacity = 0; - self.original_bucket_idx = None; - inner.freeze() + // if self.from_pool + // && let Some(bucket_idx) = self.original_bucket_idx + // { + // memory_pool().dec_bucket_in_use(bucket_idx); + // } + // + // let inner = std::mem::take(&mut self.inner); + // self.from_pool = false; + // self.original_capacity = 0; + // self.original_bucket_idx = None; + // inner.freeze() + + // todo!() } } impl Deref for PooledBuffer { - type Target = BytesMut; + type Target = AlignedBuffer; fn deref(&self) -> &Self::Target { &self.inner @@ -238,7 +300,7 @@ impl DerefMut for PooledBuffer { impl Drop for PooledBuffer { fn drop(&mut self) { if self.from_pool { - let buf = std::mem::take(&mut self.inner); + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); buf.return_to_pool(self.original_capacity, true); } } @@ -252,27 +314,15 @@ impl From<&[u8]> for PooledBuffer { } } -impl From for PooledBuffer { - fn from(bytes: BytesMut) -> Self { - Self::from_existing(bytes) +impl AsRef<[u8]> for PooledBuffer { + fn as_ref(&self) -> &[u8] { + &self.inner } } -impl Buf for PooledBuffer { - fn remaining(&self) -> usize { - self.inner.remaining() - } - - fn chunk(&self) -> &[u8] { - self.inner.chunk() - } - - fn advance(&mut self, cnt: usize) { - Buf::advance(&mut self.inner, cnt) - } - - fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { - self.inner.chunks_vectored(dst) +impl From for PooledBuffer { + fn from(buffer: AlignedBuffer) -> Self { + Self::from_existing(buffer) } } diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs index 9216ef57ff..30869d52ae 100644 --- a/core/common/src/alloc/memory_pool.rs +++ b/core/common/src/alloc/memory_pool.rs @@ -16,26 +16,25 @@ * under the License. */ -use bytes::BytesMut; +use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use human_repr::HumanCount; use once_cell::sync::OnceCell; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tracing::{info, trace, warn}; +use tracing::{debug, info, trace, warn}; + +pub const ALIGNMENT: usize = 4096; +pub type AlignedBuffer = AVec>; /// Global memory pool instance. Use `memory_pool()` to access it. pub static MEMORY_POOL: OnceCell = OnceCell::new(); /// Total number of distinct bucket sizes. -const NUM_BUCKETS: usize = 32; +const NUM_BUCKETS: usize = 28; /// Array of bucket sizes in ascending order. Each entry is a distinct buffer size (in bytes). const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ - 256, - 512, - 1024, - 2 * 1024, 4 * 1024, 8 * 1024, 16 * 1024, @@ -87,7 +86,7 @@ pub struct MemoryPoolConfigOther { pub bucket_capacity: u32, } -/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` buffers. +/// A memory pool that maintains fixed-size buckets for reusing `AlignedBuffer` buffers. /// /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool tracks: /// - Buffers currently in use (`in_use`) @@ -109,7 +108,7 @@ pub struct MemoryPool { /// Array of queues for reusable buffers. Each queue can store up to `bucket_capacity` buffers. /// The length of each queue (`buckets[i].len()`) is how many **free** buffers are currently available. /// Free doesn't mean the buffer is allocated, it just means it's not in use. - buckets: [Arc>; NUM_BUCKETS], + buckets: [Arc>; NUM_BUCKETS], /// Number of buffers **in use** for each bucket size (grow/shrink as they are acquired/released). in_use: [Arc; NUM_BUCKETS], @@ -175,29 +174,47 @@ impl MemoryPool { MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); } - /// Acquire a `BytesMut` buffer with at least `capacity` bytes. + /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes. /// /// - If a bucket can fit `capacity`, try to pop from its free buffer queue; otherwise create a new buffer. /// - If `memory_limit` would be exceeded, allocate outside the pool. /// /// Returns a tuple of (buffer, was_pool_allocated) where was_pool_allocated indicates if the buffer /// was allocated from the pool (true) or externally (false). - pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) { + pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) { if !self.is_enabled { - return (BytesMut::with_capacity(capacity), false); + return (allocate_aligned_buffer(capacity), false); } let current = self.pool_current_size(); match self.best_fit(capacity) { Some(idx) => { + let new_size = BUCKET_SIZES[idx]; + if let Some(mut buf) = self.buckets[idx].pop() { buf.clear(); + + if buf.capacity() < capacity { + warn!( + "Pooled buffer too small! bucket[{}]={}, buf.capacity()={}, requested={}. Dropping and allocating new.", + idx, + new_size, + buf.capacity(), + capacity + ); + drop(buf); + + let new_buf = allocate_aligned_buffer(new_size); + self.inc_bucket_alloc(idx); + self.inc_bucket_in_use(idx); + return (new_buf, true); + } + self.inc_bucket_in_use(idx); return (buf, true); } - let new_size = BUCKET_SIZES[idx]; if current + new_size > self.memory_limit { self.set_capacity_warning(true); trace!( @@ -205,12 +222,12 @@ impl MemoryPool { new_size, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(new_size), false); + return (allocate_aligned_buffer(new_size), false); } self.inc_bucket_alloc(idx); self.inc_bucket_in_use(idx); - (BytesMut::with_capacity(new_size), true) + (allocate_aligned_buffer(new_size), true) } None => { if current + capacity > self.memory_limit { @@ -219,16 +236,16 @@ impl MemoryPool { capacity, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(capacity), false); + return (allocate_aligned_buffer(capacity), false); } self.inc_external_allocations(); - (BytesMut::with_capacity(capacity), false) + (allocate_aligned_buffer(capacity), false) } } } - /// Return a `BytesMut` buffer previously acquired from the pool. + /// Return a `AlignedBuffer` buffer previously acquired from the pool. /// /// - If `current_capacity` differs from `original_capacity`, increments `resize_events`. /// - If a matching bucket exists, place it back in that bucket's queue (if space is available). @@ -236,7 +253,7 @@ impl MemoryPool { /// - The `was_pool_allocated` flag indicates if this buffer was originally allocated from the pool. pub fn release_buffer( &self, - buffer: BytesMut, + buffer: AlignedBuffer, original_capacity: usize, was_pool_allocated: bool, ) { @@ -448,16 +465,47 @@ impl MemoryPool { /// Return a buffer to the pool by calling `release_buffer` with the original capacity. /// This extension trait makes it easy to do `some_bytes.return_to_pool(orig_cap, was_pool_allocated)`. -pub trait BytesMutExt { +pub trait AlignedBufferExt { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool); } -impl BytesMutExt for BytesMut { +impl AlignedBufferExt for AlignedBuffer { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool) { memory_pool().release_buffer(self, original_capacity, was_pool_allocated); } } +fn allocate_aligned_buffer(capacity: usize) -> AlignedBuffer { + // if capacity == 0 { + // return AVec::new(0); + // } + + let aligned_capacity = if capacity < ALIGNMENT { + ALIGNMENT + } else { + (capacity + ALIGNMENT - 1) & !(ALIGNMENT - 1) + }; + + debug!("Allocate aligned buf cap: {aligned_capacity}"); + let mut buffer = AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity); + + // Force the capacity if AVec didn't allocate enough + if buffer.capacity() < aligned_capacity { + buffer.reserve(aligned_capacity - buffer.capacity()); + } + + // Verify final capacity + let actual_capacity = buffer.capacity(); + assert!( + actual_capacity >= aligned_capacity, + "Failed to allocate buffer with capacity {}, got {}", + aligned_capacity, + actual_capacity + ); + + buffer +} + /// Convert a size in bytes to a string like "8KiB" or "2MiB". fn size_str(size: usize) -> String { if size >= 1024 * 1024 { @@ -675,25 +723,25 @@ mod tests { } // Test put_bytes - { - let initial_events = pool.resize_events(); - let mut buffer = PooledBuffer::with_capacity(4 * 1024); - let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); - let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); - - buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros - - assert_eq!( - pool.resize_events(), - initial_events + 1, - "put_bytes should trigger resize event" - ); - assert_eq!( - pool.bucket_current_elements(orig_bucket_idx), - orig_in_use - 1, - "put_bytes should update bucket accounting" - ); - } + // { + // let initial_events = pool.resize_events(); + // let mut buffer = PooledBuffer::with_capacity(4 * 1024); + // let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + // let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + // + // buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros + // + // assert_eq!( + // pool.resize_events(), + // initial_events + 1, + // "put_bytes should trigger resize event" + // ); + // assert_eq!( + // pool.bucket_current_elements(orig_bucket_idx), + // orig_in_use - 1, + // "put_bytes should update bucket accounting" + // ); + // } // Test extend_from_slice { diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index 231ba00a21..b2a31d0e7f 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -25,9 +25,9 @@ use crate::{ IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; use crate::{MessageDeduplicator, PooledBuffer, random_id}; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use lending_iterator::prelude::*; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::sync::Arc; use tracing::{error, warn}; @@ -489,7 +489,7 @@ impl IggyMessagesBatchMut { /// subsequent messages in the new buffer. #[allow(clippy::too_many_arguments)] fn rebuild_indexes_for_chunk( - new_buffer: &BytesMut, + new_buffer: &PooledBuffer, new_indexes: &mut IggyIndexesMut, offset_in_new_buffer: &mut u32, chunk_start: usize, @@ -828,10 +828,10 @@ impl Index for IggyMessagesBatchMut { } } -impl Deref for IggyMessagesBatchMut { - type Target = BytesMut; - - fn deref(&self) -> &Self::Target { - &self.messages - } -} +// impl Deref for IggyMessagesBatchMut { +// type Target = BytesMut; +// +// fn deref(&self) -> &Self::Target { +// &self.messages +// } +// } diff --git a/core/common/src/types/segment_storage/index_reader.rs b/core/common/src/types/segment_storage/index_reader.rs index 419b6db4b9..7b679093bc 100644 --- a/core/common/src/types/segment_storage/index_reader.rs +++ b/core/common/src/types/segment_storage/index_reader.rs @@ -338,24 +338,19 @@ impl IndexReader { len: u32, use_pool: bool, ) -> Result { - if use_pool { - let len = len as usize; - let buf = PooledBuffer::with_capacity(len); - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + // unsafe { + // buf.set_len(len as usize); + // } + + let (result, buf) = self + .file + .read_exact_at(buf.slice(..len as usize), offset as u64) + .await + .into(); + let buf = buf.into_inner(); + result?; + Ok(buf) } /// Gets the nth index from the index file. diff --git a/core/common/src/types/segment_storage/messages_reader.rs b/core/common/src/types/segment_storage/messages_reader.rs index e170926704..1b2147d6e0 100644 --- a/core/common/src/types/segment_storage/messages_reader.rs +++ b/core/common/src/types/segment_storage/messages_reader.rs @@ -135,23 +135,20 @@ impl MessagesReader { len: u32, use_pool: bool, ) -> Result { - if use_pool { - let buf = PooledBuffer::with_capacity(len as usize); - let len = len as usize; - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + + // unsafe { + // buf.set_len(len as usize); + // } + + let (result, buf) = self + .file + .read_exact_at(buf.slice(..len as usize), offset as u64) + .await + .into(); + + let buf = buf.into_inner(); + result?; + Ok(buf) } } diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index c16ce5a6c1..c945f97e62 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -39,6 +39,7 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"] [dependencies] ahash = { workspace = true } +aligned-vec = { workspace = true } anyhow = { workspace = true } argon2 = { workspace = true } async-channel = { workspace = true } diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 71388d75bf..ce31c67b8e 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -56,6 +56,7 @@ impl ServerCommandHandler for PollMessages { let user_id = session.get_user_id(); let client_id = session.client_id; let topic = shard.resolve_topic_for_poll(user_id, &stream_id, &topic_id)?; + let (metadata, mut batch) = shard .poll_messages(client_id, topic, consumer, partition_id, args) .await?; @@ -73,6 +74,7 @@ impl ServerCommandHandler for PollMessages { let mut partition_id_buf = PooledBuffer::with_capacity(4); let mut current_offset_buf = PooledBuffer::with_capacity(8); let mut count_buf = PooledBuffer::with_capacity(4); + partition_id_buf.put_u32_le(metadata.partition_id); current_offset_buf.put_u64_le(metadata.current_offset); count_buf.put_u32_le(batch.count()); @@ -84,6 +86,7 @@ impl ServerCommandHandler for PollMessages { batch.iter_mut().for_each(|m| { bufs.push(m.take_messages()); }); + trace!( "Sending {} messages to client ({} bytes) to client", batch.count(), diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 652b99a7c6..c96d6abcef 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -74,6 +74,7 @@ impl ServerCommandHandler for SendMessages { .read(metadata_buffer.slice(0..metadata_size as usize)) .await; result?; + let metadata_buf = metadata_buf.into_inner(); let mut element_size = 0; @@ -115,6 +116,7 @@ impl ServerCommandHandler for SendMessages { let indexes_buffer = PooledBuffer::with_capacity(indexes_size); let (result, indexes_buffer) = sender.read(indexes_buffer.slice(0..indexes_size)).await; result?; + let indexes_buffer = indexes_buffer.into_inner(); let messages_size = total_payload_size @@ -123,12 +125,15 @@ impl ServerCommandHandler for SendMessages { .and_then(|s| s.checked_sub(metadata_len_field_size)) .ok_or(IggyError::InvalidCommand)?; let messages_buffer = PooledBuffer::with_capacity(messages_size); + let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; result?; + let messages_buffer = messages_buffer.into_inner(); let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages_buffer); + batch.validate()?; let topic = shard.resolve_topic_for_append( diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs index 125c0ad25e..a495d0e656 100644 --- a/core/server/src/http/messages.rs +++ b/core/server/src/http/messages.rs @@ -22,7 +22,7 @@ use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; use crate::shard::system::messages::PollingArgs; use crate::shard::transmission::message::ResolvedPartition; -use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; +use crate::streaming::segments::IggyMessagesBatchMut; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; @@ -30,7 +30,6 @@ use axum::routing::get; use axum::{Extension, Json, Router, debug_handler}; use err_trail::ErrContext; use iggy_common::Identifier; -use iggy_common::PooledBuffer; use iggy_common::Validatable; use iggy_common::{Consumer, PollMessages, SendMessages}; use iggy_common::{IggyError, IggyMessagesBatch, PolledMessages}; @@ -150,11 +149,12 @@ async fn flush_unsaved_buffer( Ok(StatusCode::OK) } -fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { - let (_, indexes, messages) = batch.decompose(); - let (_, indexes_buffer) = indexes.decompose(); - let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); - let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); - let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); - IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) +fn make_mutable(_batch: IggyMessagesBatch) -> IggyMessagesBatchMut { + todo!(); + // let (_, indexes, messages) = batch.decompose(); + // let (_, indexes_buffer) = indexes.decompose(); + // let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); + // let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); + // let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); + // IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) } diff --git a/core/server/src/streaming/partitions/ops.rs b/core/server/src/streaming/partitions/ops.rs index aa481b7b6f..6f9a3eca91 100644 --- a/core/server/src/streaming/partitions/ops.rs +++ b/core/server/src/streaming/partitions/ops.rs @@ -31,6 +31,7 @@ use iggy_common::sharding::IggyNamespace; use iggy_common::{IggyError, PollingKind}; use std::cell::RefCell; use std::sync::atomic::Ordering; +use tracing::debug; /// Poll messages from a partition partitions. /// @@ -117,6 +118,7 @@ pub async fn poll_messages( // Phase 2: Get messages using hybrid disk+journal logic let batches = get_messages_by_offset(local_partitions, namespace, start_offset, count).await?; + Ok((metadata, batches)) } @@ -180,6 +182,7 @@ pub async fn get_messages_by_offset( return Ok(result.get_by_offset(start_offset, count)); } } + return load_messages_from_disk(local_partitions, namespace, start_offset, count).await; } From a64f6916df311cf7d6ab72ade9afd41dc2516caf Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 18:24:10 +0700 Subject: [PATCH 2/9] fix tests & clean up debugs --- core/common/src/alloc/buffer.rs | 1 - .../src/types/message/messages_batch_mut.rs | 1 - .../src/types/segment_storage/index_reader.rs | 8 +--- .../types/segment_storage/messages_reader.rs | 9 +--- .../tests/server/message_cleanup.rs | 2 +- .../scenarios/message_cleanup_scenario.rs | 46 +++++++++++++++---- core/server/src/http/messages.rs | 34 ++++++++++---- core/server/src/streaming/partitions/ops.rs | 1 - 8 files changed, 66 insertions(+), 36 deletions(-) diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 8f1e79d9f6..1c7e738a1a 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -25,7 +25,6 @@ use std::{ mem::MaybeUninit, ops::{Deref, DerefMut}, }; -use tracing::debug; /// A buffer wrapper that participates in memory pooling. /// diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index b2a31d0e7f..b35970bc80 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -25,7 +25,6 @@ use crate::{ IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; use crate::{MessageDeduplicator, PooledBuffer, random_id}; -use bytes::BufMut; use lending_iterator::prelude::*; use std::ops::Index; use std::sync::Arc; diff --git a/core/common/src/types/segment_storage/index_reader.rs b/core/common/src/types/segment_storage/index_reader.rs index 7b679093bc..e62233ca6a 100644 --- a/core/common/src/types/segment_storage/index_reader.rs +++ b/core/common/src/types/segment_storage/index_reader.rs @@ -16,7 +16,6 @@ // under the License. use crate::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView, IggyIndexesMut, PooledBuffer}; -use bytes::BytesMut; use compio::{ buf::{IntoInner, IoBuf}, fs::{File, OpenOptions}, @@ -336,12 +335,9 @@ impl IndexReader { &self, offset: u32, len: u32, - use_pool: bool, + _use_pool: bool, ) -> Result { - let mut buf = PooledBuffer::with_capacity(len as usize); - // unsafe { - // buf.set_len(len as usize); - // } + let buf = PooledBuffer::with_capacity(len as usize); let (result, buf) = self .file diff --git a/core/common/src/types/segment_storage/messages_reader.rs b/core/common/src/types/segment_storage/messages_reader.rs index 1b2147d6e0..ab23affb12 100644 --- a/core/common/src/types/segment_storage/messages_reader.rs +++ b/core/common/src/types/segment_storage/messages_reader.rs @@ -16,7 +16,6 @@ // under the License. use crate::{IggyError, IggyIndexesMut, IggyMessagesBatchMut, PooledBuffer}; -use bytes::BytesMut; use compio::buf::{IntoInner, IoBuf}; use compio::fs::{File, OpenOptions}; use compio::io::AsyncReadAtExt; @@ -133,13 +132,9 @@ impl MessagesReader { &self, offset: u32, len: u32, - use_pool: bool, + _use_pool: bool, ) -> Result { - let mut buf = PooledBuffer::with_capacity(len as usize); - - // unsafe { - // buf.set_len(len as usize); - // } + let buf = PooledBuffer::with_capacity(len as usize); let (result, buf) = self .file diff --git a/core/integration/tests/server/message_cleanup.rs b/core/integration/tests/server/message_cleanup.rs index c61855d836..2f293e2596 100644 --- a/core/integration/tests/server/message_cleanup.rs +++ b/core/integration/tests/server/message_cleanup.rs @@ -78,7 +78,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) { .server( TestServerConfig::builder() .extra_envs(HashMap::from([ - ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "100KiB".to_string()), + ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "10KiB".to_string()), ( "IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(), "true".to_string(), diff --git a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs index 0669531537..df1f123500 100644 --- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs +++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs @@ -396,7 +396,7 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: let stream = client.create_stream(STREAM_NAME).await.unwrap(); let stream_id = stream.id; - let expiry = Duration::from_secs(3); + let expiry = Duration::from_secs(5); let topic = client .create_topic( &Identifier::named(STREAM_NAME).unwrap(), @@ -439,6 +439,8 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: // Collect initial segment counts let mut initial_counts: Vec = Vec::new(); + + // Wait until all partitions have >= 2 segments (up to 5s) for partition_id in 0..PARTITIONS_COUNT { let partition_path = data_path .join(format!( @@ -446,16 +448,42 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: )) .display() .to_string(); - let segments = get_segment_paths_for_partition(&partition_path); - initial_counts.push(segments.len()); - assert!( - segments.len() >= 2, - "Partition {} should have at least 2 segments, got {}", - partition_id, - segments.len() - ); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let count = loop { + let segments = get_segment_paths_for_partition(&partition_path); + if segments.len() >= 2 { + break segments.len(); + } + if tokio::time::Instant::now() >= deadline { + panic!( + "Partition {} should have at least 2 segments after 5s, got {}", + partition_id, + segments.len() + ); + } + tokio::time::sleep(Duration::from_millis(100)).await; + }; + initial_counts.push(count); } + // for partition_id in 0..PARTITIONS_COUNT { + // let partition_path = data_path + // .join(format!( + // "streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}" + // )) + // .display() + // .to_string(); + // let segments = get_segment_paths_for_partition(&partition_path); + // initial_counts.push(segments.len()); + // assert!( + // segments.len() >= 2, + // "Partition {} should have at least 2 segments, got {}", + // partition_id, + // segments.len() + // ); + // } + // Wait for expiry + cleaner tokio::time::sleep(expiry + CLEANER_BUFFER).await; diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs index a495d0e656..4fb66c3963 100644 --- a/core/server/src/http/messages.rs +++ b/core/server/src/http/messages.rs @@ -29,10 +29,10 @@ use axum::http::StatusCode; use axum::routing::get; use axum::{Extension, Json, Router, debug_handler}; use err_trail::ErrContext; -use iggy_common::Identifier; -use iggy_common::Validatable; use iggy_common::{Consumer, PollMessages, SendMessages}; +use iggy_common::{Identifier, PooledBuffer}; use iggy_common::{IggyError, IggyMessagesBatch, PolledMessages}; +use iggy_common::{IggyIndexesMut, Validatable}; use send_wrapper::SendWrapper; use std::sync::Arc; use tracing::instrument; @@ -149,12 +149,26 @@ async fn flush_unsaved_buffer( Ok(StatusCode::OK) } -fn make_mutable(_batch: IggyMessagesBatch) -> IggyMessagesBatchMut { - todo!(); - // let (_, indexes, messages) = batch.decompose(); - // let (_, indexes_buffer) = indexes.decompose(); - // let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); - // let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); - // let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); - // IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) +fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { + let (_, indexes, messages) = batch.decompose(); + let (base_position, indexes_buffer) = indexes.decompose(); + + let mut indexes_buffer_mut = PooledBuffer::with_capacity(indexes_buffer.len()); + indexes_buffer_mut.extend_from_slice(&indexes_buffer); + let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, base_position); + + let mut messages_buffer_mut = PooledBuffer::with_capacity(messages.len()); + messages_buffer_mut.extend_from_slice(&messages); + + IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) } + +// fn make_mutable(_batch: IggyMessagesBatch) -> IggyMessagesBatchMut { +// todo!(); +// // let (_, indexes, messages) = batch.decompose(); +// // let (_, indexes_buffer) = indexes.decompose(); +// // let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); +// // let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); +// // let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); +// // IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) +// } diff --git a/core/server/src/streaming/partitions/ops.rs b/core/server/src/streaming/partitions/ops.rs index 6f9a3eca91..43d0f46662 100644 --- a/core/server/src/streaming/partitions/ops.rs +++ b/core/server/src/streaming/partitions/ops.rs @@ -31,7 +31,6 @@ use iggy_common::sharding::IggyNamespace; use iggy_common::{IggyError, PollingKind}; use std::cell::RefCell; use std::sync::atomic::Ordering; -use tracing::debug; /// Poll messages from a partition partitions. /// From 7dcae284f289a980100ec20e229e2da345bd5f3e Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 18:58:46 +0700 Subject: [PATCH 3/9] clean up --- .../scenarios/message_cleanup_scenario.rs | 17 ----------------- .../handlers/messages/poll_messages_handler.rs | 3 --- .../handlers/messages/send_messages_handler.rs | 4 ---- core/server/src/http/messages.rs | 10 ---------- core/server/src/streaming/partitions/ops.rs | 2 -- 5 files changed, 36 deletions(-) diff --git a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs index df1f123500..581afc3791 100644 --- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs +++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs @@ -467,23 +467,6 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: initial_counts.push(count); } - // for partition_id in 0..PARTITIONS_COUNT { - // let partition_path = data_path - // .join(format!( - // "streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}" - // )) - // .display() - // .to_string(); - // let segments = get_segment_paths_for_partition(&partition_path); - // initial_counts.push(segments.len()); - // assert!( - // segments.len() >= 2, - // "Partition {} should have at least 2 segments, got {}", - // partition_id, - // segments.len() - // ); - // } - // Wait for expiry + cleaner tokio::time::sleep(expiry + CLEANER_BUFFER).await; diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index ce31c67b8e..71388d75bf 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -56,7 +56,6 @@ impl ServerCommandHandler for PollMessages { let user_id = session.get_user_id(); let client_id = session.client_id; let topic = shard.resolve_topic_for_poll(user_id, &stream_id, &topic_id)?; - let (metadata, mut batch) = shard .poll_messages(client_id, topic, consumer, partition_id, args) .await?; @@ -74,7 +73,6 @@ impl ServerCommandHandler for PollMessages { let mut partition_id_buf = PooledBuffer::with_capacity(4); let mut current_offset_buf = PooledBuffer::with_capacity(8); let mut count_buf = PooledBuffer::with_capacity(4); - partition_id_buf.put_u32_le(metadata.partition_id); current_offset_buf.put_u64_le(metadata.current_offset); count_buf.put_u32_le(batch.count()); @@ -86,7 +84,6 @@ impl ServerCommandHandler for PollMessages { batch.iter_mut().for_each(|m| { bufs.push(m.take_messages()); }); - trace!( "Sending {} messages to client ({} bytes) to client", batch.count(), diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index c96d6abcef..91a5ea1e72 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -74,7 +74,6 @@ impl ServerCommandHandler for SendMessages { .read(metadata_buffer.slice(0..metadata_size as usize)) .await; result?; - let metadata_buf = metadata_buf.into_inner(); let mut element_size = 0; @@ -116,7 +115,6 @@ impl ServerCommandHandler for SendMessages { let indexes_buffer = PooledBuffer::with_capacity(indexes_size); let (result, indexes_buffer) = sender.read(indexes_buffer.slice(0..indexes_size)).await; result?; - let indexes_buffer = indexes_buffer.into_inner(); let messages_size = total_payload_size @@ -128,12 +126,10 @@ impl ServerCommandHandler for SendMessages { let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; result?; - let messages_buffer = messages_buffer.into_inner(); let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages_buffer); - batch.validate()?; let topic = shard.resolve_topic_for_append( diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs index 4fb66c3963..f12e36869b 100644 --- a/core/server/src/http/messages.rs +++ b/core/server/src/http/messages.rs @@ -162,13 +162,3 @@ fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) } - -// fn make_mutable(_batch: IggyMessagesBatch) -> IggyMessagesBatchMut { -// todo!(); -// // let (_, indexes, messages) = batch.decompose(); -// // let (_, indexes_buffer) = indexes.decompose(); -// // let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); -// // let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); -// // let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); -// // IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) -// } diff --git a/core/server/src/streaming/partitions/ops.rs b/core/server/src/streaming/partitions/ops.rs index 43d0f46662..aa481b7b6f 100644 --- a/core/server/src/streaming/partitions/ops.rs +++ b/core/server/src/streaming/partitions/ops.rs @@ -117,7 +117,6 @@ pub async fn poll_messages( // Phase 2: Get messages using hybrid disk+journal logic let batches = get_messages_by_offset(local_partitions, namespace, start_offset, count).await?; - Ok((metadata, batches)) } @@ -181,7 +180,6 @@ pub async fn get_messages_by_offset( return Ok(result.get_by_offset(start_offset, count)); } } - return load_messages_from_disk(local_partitions, namespace, start_offset, count).await; } From 737fca8057286a459c7b279268139830614fe03d Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 19:11:45 +0700 Subject: [PATCH 4/9] clean up --- core/common/src/alloc/memory_pool.rs | 38 +++++++++---------- .../src/types/message/messages_batch_mut.rs | 8 ---- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs index 30869d52ae..e7675ed9d1 100644 --- a/core/common/src/alloc/memory_pool.rs +++ b/core/common/src/alloc/memory_pool.rs @@ -723,25 +723,25 @@ mod tests { } // Test put_bytes - // { - // let initial_events = pool.resize_events(); - // let mut buffer = PooledBuffer::with_capacity(4 * 1024); - // let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); - // let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); - // - // buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros - // - // assert_eq!( - // pool.resize_events(), - // initial_events + 1, - // "put_bytes should trigger resize event" - // ); - // assert_eq!( - // pool.bucket_current_elements(orig_bucket_idx), - // orig_in_use - 1, - // "put_bytes should update bucket accounting" - // ); - // } + { + let initial_events = pool.resize_events(); + let mut buffer = PooledBuffer::with_capacity(4 * 1024); + let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + + buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros + + assert_eq!( + pool.resize_events(), + initial_events + 1, + "put_bytes should trigger resize event" + ); + assert_eq!( + pool.bucket_current_elements(orig_bucket_idx), + orig_in_use - 1, + "put_bytes should update bucket accounting" + ); + } // Test extend_from_slice { diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index b35970bc80..3147097e8b 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -826,11 +826,3 @@ impl Index for IggyMessagesBatchMut { &self.messages[start..end] } } - -// impl Deref for IggyMessagesBatchMut { -// type Target = BytesMut; -// -// fn deref(&self) -> &Self::Target { -// &self.messages -// } -// } From 321c221a68358c3666cc57014880f3f1330f2c25 Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 20:10:03 +0700 Subject: [PATCH 5/9] clean up --- core/common/src/alloc/buffer.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 1c7e738a1a..f9622e6e87 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -258,27 +258,10 @@ impl PooledBuffer { /// The returned `Bytes` is Arc-backed, allowing cheap clones. pub fn freeze(&mut self) -> Bytes { // TODO(tungotse): zero copy - let bytes = Bytes::copy_from_slice(&self.inner); self.inner.clear(); bytes - - // Decrement pool counter since memory is transferred to Bytes - // and won't be returned to the pool. - // if self.from_pool - // && let Some(bucket_idx) = self.original_bucket_idx - // { - // memory_pool().dec_bucket_in_use(bucket_idx); - // } - // - // let inner = std::mem::take(&mut self.inner); - // self.from_pool = false; - // self.original_capacity = 0; - // self.original_bucket_idx = None; - // inner.freeze() - - // todo!() } } From 8e690798237fa2cc55c5857c6cb44f9bca097fee Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 20:38:17 +0700 Subject: [PATCH 6/9] manual run lint --- Cargo.lock | 1 - Cargo.toml | 2 +- core/server/Cargo.toml | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3046ae38e1..411e3f078a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9618,7 +9618,6 @@ name = "server" version = "0.7.2-edge.1" dependencies = [ "ahash 0.8.12", - "aligned-vec", "anyhow", "argon2", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index 9eba430105..36406991f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,8 +65,8 @@ actix-files = "0.6.10" actix-web = "4.13.0" aes-gcm = "0.10.3" ahash = { version = "0.8.12", features = ["serde"] } -anyhow = "1.0.102" aligned-vec = "0.6.4" +anyhow = "1.0.102" argon2 = "0.5.3" arrow = "57.3.0" arrow-array = "57.3.0" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index c945f97e62..c16ce5a6c1 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -39,7 +39,6 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"] [dependencies] ahash = { workspace = true } -aligned-vec = { workspace = true } anyhow = { workspace = true } argon2 = { workspace = true } async-channel = { workspace = true } From f55e808337e83bea943723b547a90b541aa7c986 Mon Sep 17 00:00:00 2001 From: Tung To Date: Thu, 12 Mar 2026 21:28:17 +0700 Subject: [PATCH 7/9] zero copy freeze --- core/common/src/alloc/buffer.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index f9622e6e87..ea6c2daa8a 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -257,11 +257,21 @@ impl PooledBuffer { /// return memory to the pool on drop (the frozen Bytes owns the allocation). /// The returned `Bytes` is Arc-backed, allowing cheap clones. pub fn freeze(&mut self) -> Bytes { - // TODO(tungotse): zero copy - let bytes = Bytes::copy_from_slice(&self.inner); - self.inner.clear(); + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); - bytes + // Update pool accounting + if self.from_pool + && let Some(bucket_idx) = self.original_bucket_idx + { + memory_pool().dec_bucket_in_use(bucket_idx); + } + self.from_pool = false; + self.original_capacity = 0; + self.original_bucket_idx = None; + + // Zero copy: Bytes takes ownership of the AlignedBuffer + // and will drop it when refcount reaches zero + Bytes::from_owner(buf) } } From cd3560e67d5f925137dfd0797a076b365499d760 Mon Sep 17 00:00:00 2001 From: Tung To Date: Fri, 13 Mar 2026 21:59:42 +0700 Subject: [PATCH 8/9] remove annoy logs --- core/common/src/alloc/buffer.rs | 27 ++++++++++++++++++--------- core/common/src/alloc/memory_pool.rs | 7 +------ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index ea6c2daa8a..1291febb46 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -144,15 +144,24 @@ impl PooledBuffer { /// /// # Panic /// Panics if at > len - /// - /// TODO(tungtose): write unit tests pub fn split_to(&mut self, at: usize) -> PooledBuffer { - assert!(at <= self.len(), "split_to out of bounds"); + assert!( + at <= self.len(), + "split_to out of bounds: at={}, len={}", + at, + self.len() + ); let mut new_buff = PooledBuffer::with_capacity(at); - new_buff.inner.extend_from_slice(&self.inner[..at]); + // SAFETY: + // - `self.inner.as_ptr().add(at)` is valid for `new_len` because + // `at + new_len === old_len <= cap`. Similar with `self.inner.as_mut_ptr()` + // + // - source range is `[at, at + new_len)` and the destination is + // `[0, new_len)`. These ranges do not overlap when `at > 0`. + // - when `at == 0`, the operation is noop let new_len = self.len() - at; if new_len > 0 { unsafe { @@ -200,12 +209,12 @@ impl PooledBuffer { /// Wrapper for put_slice which might cause resize pub fn put_slice(&mut self, src: &[u8]) { - let before_cap = self.inner.capacity(); self.extend_from_slice(src); - - if self.inner.capacity() != before_cap { - self.check_for_resize(); - } + // let before_cap = self.inner.capacity(); + // + // if self.inner.capacity() != before_cap { + // self.check_for_resize(); + // } } /// Wrapper for put_u32_le which might cause resize diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs index e7675ed9d1..d6eb54d05a 100644 --- a/core/common/src/alloc/memory_pool.rs +++ b/core/common/src/alloc/memory_pool.rs @@ -22,7 +22,7 @@ use human_repr::HumanCount; use once_cell::sync::OnceCell; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tracing::{debug, info, trace, warn}; +use tracing::{info, trace, warn}; pub const ALIGNMENT: usize = 4096; pub type AlignedBuffer = AVec>; @@ -476,17 +476,12 @@ impl AlignedBufferExt for AlignedBuffer { } fn allocate_aligned_buffer(capacity: usize) -> AlignedBuffer { - // if capacity == 0 { - // return AVec::new(0); - // } - let aligned_capacity = if capacity < ALIGNMENT { ALIGNMENT } else { (capacity + ALIGNMENT - 1) & !(ALIGNMENT - 1) }; - debug!("Allocate aligned buf cap: {aligned_capacity}"); let mut buffer = AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity); // Force the capacity if AVec didn't allocate enough From 043a3a7de7c99dbaaca3832307f3d32ca8dec209 Mon Sep 17 00:00:00 2001 From: Tung To Date: Fri, 13 Mar 2026 22:41:26 +0700 Subject: [PATCH 9/9] noop