diff --git a/Cargo.lock b/Cargo.lock index 6f4d7aa024..411e3f078a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5340,6 +5340,7 @@ version = "0.9.2-edge.1" dependencies = [ "aes-gcm", "ahash 0.8.12", + "aligned-vec", "base64 0.22.1", "blake3", "bon", diff --git a/Cargo.toml b/Cargo.toml index d914767715..36406991f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ actix-files = "0.6.10" actix-web = "4.13.0" aes-gcm = "0.10.3" ahash = { version = "0.8.12", features = ["serde"] } +aligned-vec = "0.6.4" anyhow = "1.0.102" argon2 = "0.5.3" arrow = "57.3.0" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 5977d9c402..4b62ebb3d0 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -30,6 +30,7 @@ readme = "../../README.md" [dependencies] aes-gcm = { workspace = true } ahash = { workspace = true } +aligned-vec = { workspace = true } base64 = { workspace = true } blake3 = { workspace = true } bon = { workspace = true } diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 3c059cba3e..1291febb46 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -16,11 +16,15 @@ * under the License. */ -use super::memory_pool::{BytesMutExt, memory_pool}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer}; + +use super::memory_pool::{AlignedBufferExt, memory_pool}; +use bytes::Bytes; use compio::buf::{IoBuf, IoBufMut, SetLen}; -use std::mem::MaybeUninit; -use std::ops::{Deref, DerefMut}; +use std::{ + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; /// A buffer wrapper that participates in memory pooling. /// @@ -32,7 +36,7 @@ pub struct PooledBuffer { from_pool: bool, original_capacity: usize, original_bucket_idx: Option, - inner: BytesMut, + inner: AlignedBuffer, } impl Default for PooledBuffer { @@ -48,13 +52,21 @@ impl PooledBuffer { /// /// * `capacity` - The capacity of the buffer pub fn with_capacity(capacity: usize) -> Self { - let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); + let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity.max(ALIGNMENT)); let original_capacity = buffer.capacity(); let original_bucket_idx = if was_pool_allocated { memory_pool().best_fit(original_capacity) } else { None }; + + debug_assert_eq!( + buffer.as_ptr() as usize % ALIGNMENT, + 0, + "PooledBuffer not aligned to {} bytes", + ALIGNMENT + ); + Self { from_pool: was_pool_allocated, original_capacity, @@ -63,12 +75,12 @@ impl PooledBuffer { } } - /// Creates a new pooled buffer from an existing `BytesMut`. + /// Creates a new pooled buffer from an existing `AlignedBuffer`. /// /// # Arguments /// - /// * `existing` - The existing `BytesMut` buffer - pub fn from_existing(existing: BytesMut) -> Self { + /// * `existing` - The existing `AlignedBuffer` buffer + pub fn from_existing(existing: AlignedBuffer) -> Self { Self { from_pool: false, original_capacity: existing.capacity(), @@ -83,7 +95,7 @@ impl PooledBuffer { from_pool: false, original_capacity: 0, original_bucket_idx: None, - inner: BytesMut::new(), + inner: AlignedBuffer::new(ALIGNMENT), } } @@ -127,6 +139,52 @@ impl PooledBuffer { } } + /// Split the buffer at given position, returning a new PooledBuffer + /// containing byte [0, at) and leaving [at, len) + /// + /// # Panic + /// Panics if at > len + pub fn split_to(&mut self, at: usize) -> PooledBuffer { + assert!( + at <= self.len(), + "split_to out of bounds: at={}, len={}", + at, + self.len() + ); + + let mut new_buff = PooledBuffer::with_capacity(at); + new_buff.inner.extend_from_slice(&self.inner[..at]); + + // SAFETY: + // - `self.inner.as_ptr().add(at)` is valid for `new_len` because + // `at + new_len === old_len <= cap`. Similar with `self.inner.as_mut_ptr()` + // + // - source range is `[at, at + new_len)` and the destination is + // `[0, new_len)`. These ranges do not overlap when `at > 0`. + // - when `at == 0`, the operation is noop + let new_len = self.len() - at; + if new_len > 0 { + unsafe { + // move [at..] to [0..] + std::ptr::copy( + self.inner.as_ptr().add(at), + self.inner.as_mut_ptr(), + new_len, + ); + + self.inner.set_len(new_len); + } + } else { + self.inner.clear(); + } + + new_buff + } + + pub fn put>(&mut self, src: T) { + self.extend_from_slice(src.as_ref()); + } + /// Wrapper for extend_from_slice which might cause resize pub fn extend_from_slice(&mut self, extend_from: &[u8]) { let before_cap = self.inner.capacity(); @@ -140,7 +198,9 @@ impl PooledBuffer { /// Wrapper for put_bytes which might cause resize pub fn put_bytes(&mut self, byte: u8, len: usize) { let before_cap = self.inner.capacity(); - self.inner.put_bytes(byte, len); + + let start = self.inner.len(); + self.inner.resize(start + len, byte); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -149,18 +209,18 @@ impl PooledBuffer { /// Wrapper for put_slice which might cause resize pub fn put_slice(&mut self, src: &[u8]) { - let before_cap = self.inner.capacity(); - self.inner.put_slice(src); - - if self.inner.capacity() != before_cap { - self.check_for_resize(); - } + self.extend_from_slice(src); + // let before_cap = self.inner.capacity(); + // + // if self.inner.capacity() != before_cap { + // self.check_for_resize(); + // } } /// Wrapper for put_u32_le which might cause resize pub fn put_u32_le(&mut self, value: u32) { let before_cap = self.inner.capacity(); - self.inner.put_u32_le(value); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -170,7 +230,7 @@ impl PooledBuffer { /// Wrapper for put_u64_le which might cause resize pub fn put_u64_le(&mut self, value: u64) { let before_cap = self.inner.capacity(); - self.inner.put_u64_le(value); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); @@ -192,11 +252,12 @@ impl PooledBuffer { self.inner.is_empty() } - /// Consumes the PooledBuffer and returns the inner BytesMut. + /// Consumes the PooledBuffer and returns the inner AlignedBuffer. /// Note: This bypasses pool return logic, use with caution. - pub fn into_inner(self) -> BytesMut { + pub fn into_inner(self) -> AlignedBuffer { let mut this = std::mem::ManuallyDrop::new(self); - std::mem::take(&mut this.inner) + + std::mem::replace(&mut this.inner, AlignedBuffer::new(ALIGNMENT)) } /// Freezes the buffer, converting it to an immutable `Bytes`. @@ -205,24 +266,26 @@ impl PooledBuffer { /// return memory to the pool on drop (the frozen Bytes owns the allocation). /// The returned `Bytes` is Arc-backed, allowing cheap clones. pub fn freeze(&mut self) -> Bytes { - // Decrement pool counter since memory is transferred to Bytes - // and won't be returned to the pool. + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); + + // Update pool accounting if self.from_pool && let Some(bucket_idx) = self.original_bucket_idx { memory_pool().dec_bucket_in_use(bucket_idx); } - - let inner = std::mem::take(&mut self.inner); self.from_pool = false; self.original_capacity = 0; self.original_bucket_idx = None; - inner.freeze() + + // Zero copy: Bytes takes ownership of the AlignedBuffer + // and will drop it when refcount reaches zero + Bytes::from_owner(buf) } } impl Deref for PooledBuffer { - type Target = BytesMut; + type Target = AlignedBuffer; fn deref(&self) -> &Self::Target { &self.inner @@ -238,7 +301,7 @@ impl DerefMut for PooledBuffer { impl Drop for PooledBuffer { fn drop(&mut self) { if self.from_pool { - let buf = std::mem::take(&mut self.inner); + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); buf.return_to_pool(self.original_capacity, true); } } @@ -252,27 +315,15 @@ impl From<&[u8]> for PooledBuffer { } } -impl From for PooledBuffer { - fn from(bytes: BytesMut) -> Self { - Self::from_existing(bytes) +impl AsRef<[u8]> for PooledBuffer { + fn as_ref(&self) -> &[u8] { + &self.inner } } -impl Buf for PooledBuffer { - fn remaining(&self) -> usize { - self.inner.remaining() - } - - fn chunk(&self) -> &[u8] { - self.inner.chunk() - } - - fn advance(&mut self, cnt: usize) { - Buf::advance(&mut self.inner, cnt) - } - - fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { - self.inner.chunks_vectored(dst) +impl From for PooledBuffer { + fn from(buffer: AlignedBuffer) -> Self { + Self::from_existing(buffer) } } diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs index 9216ef57ff..d6eb54d05a 100644 --- a/core/common/src/alloc/memory_pool.rs +++ b/core/common/src/alloc/memory_pool.rs @@ -16,7 +16,7 @@ * under the License. */ -use bytes::BytesMut; +use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use human_repr::HumanCount; use once_cell::sync::OnceCell; @@ -24,18 +24,17 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tracing::{info, trace, warn}; +pub const ALIGNMENT: usize = 4096; +pub type AlignedBuffer = AVec>; + /// Global memory pool instance. Use `memory_pool()` to access it. pub static MEMORY_POOL: OnceCell = OnceCell::new(); /// Total number of distinct bucket sizes. -const NUM_BUCKETS: usize = 32; +const NUM_BUCKETS: usize = 28; /// Array of bucket sizes in ascending order. Each entry is a distinct buffer size (in bytes). const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ - 256, - 512, - 1024, - 2 * 1024, 4 * 1024, 8 * 1024, 16 * 1024, @@ -87,7 +86,7 @@ pub struct MemoryPoolConfigOther { pub bucket_capacity: u32, } -/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` buffers. +/// A memory pool that maintains fixed-size buckets for reusing `AlignedBuffer` buffers. /// /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool tracks: /// - Buffers currently in use (`in_use`) @@ -109,7 +108,7 @@ pub struct MemoryPool { /// Array of queues for reusable buffers. Each queue can store up to `bucket_capacity` buffers. /// The length of each queue (`buckets[i].len()`) is how many **free** buffers are currently available. /// Free doesn't mean the buffer is allocated, it just means it's not in use. - buckets: [Arc>; NUM_BUCKETS], + buckets: [Arc>; NUM_BUCKETS], /// Number of buffers **in use** for each bucket size (grow/shrink as they are acquired/released). in_use: [Arc; NUM_BUCKETS], @@ -175,29 +174,47 @@ impl MemoryPool { MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); } - /// Acquire a `BytesMut` buffer with at least `capacity` bytes. + /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes. /// /// - If a bucket can fit `capacity`, try to pop from its free buffer queue; otherwise create a new buffer. /// - If `memory_limit` would be exceeded, allocate outside the pool. /// /// Returns a tuple of (buffer, was_pool_allocated) where was_pool_allocated indicates if the buffer /// was allocated from the pool (true) or externally (false). - pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) { + pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) { if !self.is_enabled { - return (BytesMut::with_capacity(capacity), false); + return (allocate_aligned_buffer(capacity), false); } let current = self.pool_current_size(); match self.best_fit(capacity) { Some(idx) => { + let new_size = BUCKET_SIZES[idx]; + if let Some(mut buf) = self.buckets[idx].pop() { buf.clear(); + + if buf.capacity() < capacity { + warn!( + "Pooled buffer too small! bucket[{}]={}, buf.capacity()={}, requested={}. Dropping and allocating new.", + idx, + new_size, + buf.capacity(), + capacity + ); + drop(buf); + + let new_buf = allocate_aligned_buffer(new_size); + self.inc_bucket_alloc(idx); + self.inc_bucket_in_use(idx); + return (new_buf, true); + } + self.inc_bucket_in_use(idx); return (buf, true); } - let new_size = BUCKET_SIZES[idx]; if current + new_size > self.memory_limit { self.set_capacity_warning(true); trace!( @@ -205,12 +222,12 @@ impl MemoryPool { new_size, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(new_size), false); + return (allocate_aligned_buffer(new_size), false); } self.inc_bucket_alloc(idx); self.inc_bucket_in_use(idx); - (BytesMut::with_capacity(new_size), true) + (allocate_aligned_buffer(new_size), true) } None => { if current + capacity > self.memory_limit { @@ -219,16 +236,16 @@ impl MemoryPool { capacity, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(capacity), false); + return (allocate_aligned_buffer(capacity), false); } self.inc_external_allocations(); - (BytesMut::with_capacity(capacity), false) + (allocate_aligned_buffer(capacity), false) } } } - /// Return a `BytesMut` buffer previously acquired from the pool. + /// Return a `AlignedBuffer` buffer previously acquired from the pool. /// /// - If `current_capacity` differs from `original_capacity`, increments `resize_events`. /// - If a matching bucket exists, place it back in that bucket's queue (if space is available). @@ -236,7 +253,7 @@ impl MemoryPool { /// - The `was_pool_allocated` flag indicates if this buffer was originally allocated from the pool. pub fn release_buffer( &self, - buffer: BytesMut, + buffer: AlignedBuffer, original_capacity: usize, was_pool_allocated: bool, ) { @@ -448,16 +465,42 @@ impl MemoryPool { /// Return a buffer to the pool by calling `release_buffer` with the original capacity. /// This extension trait makes it easy to do `some_bytes.return_to_pool(orig_cap, was_pool_allocated)`. -pub trait BytesMutExt { +pub trait AlignedBufferExt { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool); } -impl BytesMutExt for BytesMut { +impl AlignedBufferExt for AlignedBuffer { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool) { memory_pool().release_buffer(self, original_capacity, was_pool_allocated); } } +fn allocate_aligned_buffer(capacity: usize) -> AlignedBuffer { + let aligned_capacity = if capacity < ALIGNMENT { + ALIGNMENT + } else { + (capacity + ALIGNMENT - 1) & !(ALIGNMENT - 1) + }; + + let mut buffer = AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity); + + // Force the capacity if AVec didn't allocate enough + if buffer.capacity() < aligned_capacity { + buffer.reserve(aligned_capacity - buffer.capacity()); + } + + // Verify final capacity + let actual_capacity = buffer.capacity(); + assert!( + actual_capacity >= aligned_capacity, + "Failed to allocate buffer with capacity {}, got {}", + aligned_capacity, + actual_capacity + ); + + buffer +} + /// Convert a size in bytes to a string like "8KiB" or "2MiB". fn size_str(size: usize) -> String { if size >= 1024 * 1024 { diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index 231ba00a21..3147097e8b 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -25,9 +25,8 @@ use crate::{ IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; use crate::{MessageDeduplicator, PooledBuffer, random_id}; -use bytes::{BufMut, BytesMut}; use lending_iterator::prelude::*; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::sync::Arc; use tracing::{error, warn}; @@ -489,7 +488,7 @@ impl IggyMessagesBatchMut { /// subsequent messages in the new buffer. #[allow(clippy::too_many_arguments)] fn rebuild_indexes_for_chunk( - new_buffer: &BytesMut, + new_buffer: &PooledBuffer, new_indexes: &mut IggyIndexesMut, offset_in_new_buffer: &mut u32, chunk_start: usize, @@ -827,11 +826,3 @@ impl Index for IggyMessagesBatchMut { &self.messages[start..end] } } - -impl Deref for IggyMessagesBatchMut { - type Target = BytesMut; - - fn deref(&self) -> &Self::Target { - &self.messages - } -} diff --git a/core/common/src/types/segment_storage/index_reader.rs b/core/common/src/types/segment_storage/index_reader.rs index 419b6db4b9..e62233ca6a 100644 --- a/core/common/src/types/segment_storage/index_reader.rs +++ b/core/common/src/types/segment_storage/index_reader.rs @@ -16,7 +16,6 @@ // under the License. use crate::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView, IggyIndexesMut, PooledBuffer}; -use bytes::BytesMut; use compio::{ buf::{IntoInner, IoBuf}, fs::{File, OpenOptions}, @@ -336,26 +335,18 @@ impl IndexReader { &self, offset: u32, len: u32, - use_pool: bool, + _use_pool: bool, ) -> Result { - if use_pool { - let len = len as usize; - let buf = PooledBuffer::with_capacity(len); - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let buf = PooledBuffer::with_capacity(len as usize); + + let (result, buf) = self + .file + .read_exact_at(buf.slice(..len as usize), offset as u64) + .await + .into(); + let buf = buf.into_inner(); + result?; + Ok(buf) } /// Gets the nth index from the index file. diff --git a/core/common/src/types/segment_storage/messages_reader.rs b/core/common/src/types/segment_storage/messages_reader.rs index e170926704..ab23affb12 100644 --- a/core/common/src/types/segment_storage/messages_reader.rs +++ b/core/common/src/types/segment_storage/messages_reader.rs @@ -16,7 +16,6 @@ // under the License. use crate::{IggyError, IggyIndexesMut, IggyMessagesBatchMut, PooledBuffer}; -use bytes::BytesMut; use compio::buf::{IntoInner, IoBuf}; use compio::fs::{File, OpenOptions}; use compio::io::AsyncReadAtExt; @@ -133,25 +132,18 @@ impl MessagesReader { &self, offset: u32, len: u32, - use_pool: bool, + _use_pool: bool, ) -> Result { - if use_pool { - let buf = PooledBuffer::with_capacity(len as usize); - let len = len as usize; - let (result, buf) = self - .file - .read_exact_at(buf.slice(..len), offset as u64) - .await - .into(); - let buf = buf.into_inner(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let buf = PooledBuffer::with_capacity(len as usize); + + let (result, buf) = self + .file + .read_exact_at(buf.slice(..len as usize), offset as u64) + .await + .into(); + + let buf = buf.into_inner(); + result?; + Ok(buf) } } diff --git a/core/integration/tests/server/message_cleanup.rs b/core/integration/tests/server/message_cleanup.rs index c61855d836..2f293e2596 100644 --- a/core/integration/tests/server/message_cleanup.rs +++ b/core/integration/tests/server/message_cleanup.rs @@ -78,7 +78,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) { .server( TestServerConfig::builder() .extra_envs(HashMap::from([ - ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "100KiB".to_string()), + ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "10KiB".to_string()), ( "IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(), "true".to_string(), diff --git a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs index 0669531537..581afc3791 100644 --- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs +++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs @@ -396,7 +396,7 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: let stream = client.create_stream(STREAM_NAME).await.unwrap(); let stream_id = stream.id; - let expiry = Duration::from_secs(3); + let expiry = Duration::from_secs(5); let topic = client .create_topic( &Identifier::named(STREAM_NAME).unwrap(), @@ -439,6 +439,8 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: // Collect initial segment counts let mut initial_counts: Vec = Vec::new(); + + // Wait until all partitions have >= 2 segments (up to 5s) for partition_id in 0..PARTITIONS_COUNT { let partition_path = data_path .join(format!( @@ -446,14 +448,23 @@ pub async fn run_expiry_with_multiple_partitions(client: &IggyClient, data_path: )) .display() .to_string(); - let segments = get_segment_paths_for_partition(&partition_path); - initial_counts.push(segments.len()); - assert!( - segments.len() >= 2, - "Partition {} should have at least 2 segments, got {}", - partition_id, - segments.len() - ); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let count = loop { + let segments = get_segment_paths_for_partition(&partition_path); + if segments.len() >= 2 { + break segments.len(); + } + if tokio::time::Instant::now() >= deadline { + panic!( + "Partition {} should have at least 2 segments after 5s, got {}", + partition_id, + segments.len() + ); + } + tokio::time::sleep(Duration::from_millis(100)).await; + }; + initial_counts.push(count); } // Wait for expiry + cleaner diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 652b99a7c6..91a5ea1e72 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -123,6 +123,7 @@ impl ServerCommandHandler for SendMessages { .and_then(|s| s.checked_sub(metadata_len_field_size)) .ok_or(IggyError::InvalidCommand)?; let messages_buffer = PooledBuffer::with_capacity(messages_size); + let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; result?; let messages_buffer = messages_buffer.into_inner(); diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs index 125c0ad25e..f12e36869b 100644 --- a/core/server/src/http/messages.rs +++ b/core/server/src/http/messages.rs @@ -22,18 +22,17 @@ use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; use crate::shard::system::messages::PollingArgs; use crate::shard::transmission::message::ResolvedPartition; -use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; +use crate::streaming::segments::IggyMessagesBatchMut; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use axum::routing::get; use axum::{Extension, Json, Router, debug_handler}; use err_trail::ErrContext; -use iggy_common::Identifier; -use iggy_common::PooledBuffer; -use iggy_common::Validatable; use iggy_common::{Consumer, PollMessages, SendMessages}; +use iggy_common::{Identifier, PooledBuffer}; use iggy_common::{IggyError, IggyMessagesBatch, PolledMessages}; +use iggy_common::{IggyIndexesMut, Validatable}; use send_wrapper::SendWrapper; use std::sync::Arc; use tracing::instrument; @@ -152,9 +151,14 @@ async fn flush_unsaved_buffer( fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { let (_, indexes, messages) = batch.decompose(); - let (_, indexes_buffer) = indexes.decompose(); - let indexes_buffer_mut = PooledBuffer::from_existing(indexes_buffer.into()); - let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); - let messages_buffer_mut = PooledBuffer::from_existing(messages.into()); + let (base_position, indexes_buffer) = indexes.decompose(); + + let mut indexes_buffer_mut = PooledBuffer::with_capacity(indexes_buffer.len()); + indexes_buffer_mut.extend_from_slice(&indexes_buffer); + let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, base_position); + + let mut messages_buffer_mut = PooledBuffer::with_capacity(messages.len()); + messages_buffer_mut.extend_from_slice(&messages); + IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages_buffer_mut) }