Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ actix-files = "0.6.10"
actix-web = "4.13.0"
aes-gcm = "0.10.3"
ahash = { version = "0.8.12", features = ["serde"] }
aligned-vec = "0.6.4"
anyhow = "1.0.102"
argon2 = "0.5.3"
arrow = "57.3.0"
Expand Down
1 change: 1 addition & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ readme = "../../README.md"
[dependencies]
aes-gcm = { workspace = true }
ahash = { workspace = true }
aligned-vec = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
bon = { workspace = true }
Expand Down
145 changes: 98 additions & 47 deletions core/common/src/alloc/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
* under the License.
*/

use super::memory_pool::{BytesMutExt, memory_pool};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer};

use super::memory_pool::{AlignedBufferExt, memory_pool};
use bytes::Bytes;
use compio::buf::{IoBuf, IoBufMut, SetLen};
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::{
mem::MaybeUninit,
ops::{Deref, DerefMut},
};

/// A buffer wrapper that participates in memory pooling.
///
Expand All @@ -32,7 +36,7 @@ pub struct PooledBuffer {
from_pool: bool,
original_capacity: usize,
original_bucket_idx: Option<usize>,
inner: BytesMut,
inner: AlignedBuffer,
}

impl Default for PooledBuffer {
Expand All @@ -48,13 +52,21 @@ impl PooledBuffer {
///
/// * `capacity` - The capacity of the buffer
pub fn with_capacity(capacity: usize) -> Self {
let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity);
let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity.max(ALIGNMENT));
let original_capacity = buffer.capacity();
let original_bucket_idx = if was_pool_allocated {
memory_pool().best_fit(original_capacity)
} else {
None
};

debug_assert_eq!(
buffer.as_ptr() as usize % ALIGNMENT,
0,
"PooledBuffer not aligned to {} bytes",
ALIGNMENT
);

Self {
from_pool: was_pool_allocated,
original_capacity,
Expand All @@ -63,12 +75,12 @@ impl PooledBuffer {
}
}

/// Creates a new pooled buffer from an existing `BytesMut`.
/// Creates a new pooled buffer from an existing `AlignedBuffer`.
///
/// # Arguments
///
/// * `existing` - The existing `BytesMut` buffer
pub fn from_existing(existing: BytesMut) -> Self {
/// * `existing` - The existing `AlignedBuffer` buffer
pub fn from_existing(existing: AlignedBuffer) -> Self {
Self {
from_pool: false,
original_capacity: existing.capacity(),
Expand All @@ -83,7 +95,7 @@ impl PooledBuffer {
from_pool: false,
original_capacity: 0,
original_bucket_idx: None,
inner: BytesMut::new(),
inner: AlignedBuffer::new(ALIGNMENT),
}
}

Expand Down Expand Up @@ -127,6 +139,52 @@ impl PooledBuffer {
}
}

/// Split the buffer at given position, returning a new PooledBuffer
/// containing byte [0, at) and leaving [at, len)
///
/// # Panic
/// Panics if at > len
pub fn split_to(&mut self, at: usize) -> PooledBuffer {
assert!(
at <= self.len(),
"split_to out of bounds: at={}, len={}",
at,
self.len()
);

let mut new_buff = PooledBuffer::with_capacity(at);
new_buff.inner.extend_from_slice(&self.inner[..at]);

// SAFETY:
// - `self.inner.as_ptr().add(at)` is valid for `new_len` because
// `at + new_len === old_len <= cap`. Similar with `self.inner.as_mut_ptr()`
//
// - source range is `[at, at + new_len)` and the destination is
// `[0, new_len)`. These ranges do not overlap when `at > 0`.
// - when `at == 0`, the operation is noop
let new_len = self.len() - at;
if new_len > 0 {
unsafe {
// move [at..] to [0..]
std::ptr::copy(
self.inner.as_ptr().add(at),
self.inner.as_mut_ptr(),
new_len,
);

self.inner.set_len(new_len);
}
} else {
self.inner.clear();
}

new_buff
}

pub fn put<T: AsRef<[u8]>>(&mut self, src: T) {
self.extend_from_slice(src.as_ref());
}

/// Wrapper for extend_from_slice which might cause resize
pub fn extend_from_slice(&mut self, extend_from: &[u8]) {
let before_cap = self.inner.capacity();
Expand All @@ -140,7 +198,9 @@ impl PooledBuffer {
/// Wrapper for put_bytes which might cause resize
pub fn put_bytes(&mut self, byte: u8, len: usize) {
let before_cap = self.inner.capacity();
self.inner.put_bytes(byte, len);

let start = self.inner.len();
self.inner.resize(start + len, byte);

if self.inner.capacity() != before_cap {
self.check_for_resize();
Expand All @@ -149,18 +209,18 @@ impl PooledBuffer {

/// Wrapper for put_slice which might cause resize
pub fn put_slice(&mut self, src: &[u8]) {
let before_cap = self.inner.capacity();
self.inner.put_slice(src);

if self.inner.capacity() != before_cap {
self.check_for_resize();
}
self.extend_from_slice(src);
// let before_cap = self.inner.capacity();
//
// if self.inner.capacity() != before_cap {
// self.check_for_resize();
// }
}

/// Wrapper for put_u32_le which might cause resize
pub fn put_u32_le(&mut self, value: u32) {
let before_cap = self.inner.capacity();
self.inner.put_u32_le(value);
self.inner.extend_from_slice(&value.to_le_bytes());

if self.inner.capacity() != before_cap {
self.check_for_resize();
Expand All @@ -170,7 +230,7 @@ impl PooledBuffer {
/// Wrapper for put_u64_le which might cause resize
pub fn put_u64_le(&mut self, value: u64) {
let before_cap = self.inner.capacity();
self.inner.put_u64_le(value);
self.inner.extend_from_slice(&value.to_le_bytes());

if self.inner.capacity() != before_cap {
self.check_for_resize();
Expand All @@ -192,11 +252,12 @@ impl PooledBuffer {
self.inner.is_empty()
}

/// Consumes the PooledBuffer and returns the inner BytesMut.
/// Consumes the PooledBuffer and returns the inner AlignedBuffer.
/// Note: This bypasses pool return logic, use with caution.
pub fn into_inner(self) -> BytesMut {
pub fn into_inner(self) -> AlignedBuffer {
let mut this = std::mem::ManuallyDrop::new(self);
std::mem::take(&mut this.inner)

std::mem::replace(&mut this.inner, AlignedBuffer::new(ALIGNMENT))
}

/// Freezes the buffer, converting it to an immutable `Bytes`.
Expand All @@ -205,24 +266,26 @@ impl PooledBuffer {
/// return memory to the pool on drop (the frozen Bytes owns the allocation).
/// The returned `Bytes` is Arc-backed, allowing cheap clones.
pub fn freeze(&mut self) -> Bytes {
// Decrement pool counter since memory is transferred to Bytes
// and won't be returned to the pool.
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));

// Update pool accounting
if self.from_pool
&& let Some(bucket_idx) = self.original_bucket_idx
{
memory_pool().dec_bucket_in_use(bucket_idx);
}

let inner = std::mem::take(&mut self.inner);
self.from_pool = false;
self.original_capacity = 0;
self.original_bucket_idx = None;
inner.freeze()

// Zero copy: Bytes takes ownership of the AlignedBuffer
// and will drop it when refcount reaches zero
Bytes::from_owner(buf)
}
}

impl Deref for PooledBuffer {
type Target = BytesMut;
type Target = AlignedBuffer;

fn deref(&self) -> &Self::Target {
&self.inner
Expand All @@ -238,7 +301,7 @@ impl DerefMut for PooledBuffer {
impl Drop for PooledBuffer {
fn drop(&mut self) {
if self.from_pool {
let buf = std::mem::take(&mut self.inner);
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));
buf.return_to_pool(self.original_capacity, true);
}
}
Expand All @@ -252,27 +315,15 @@ impl From<&[u8]> for PooledBuffer {
}
}

impl From<BytesMut> for PooledBuffer {
fn from(bytes: BytesMut) -> Self {
Self::from_existing(bytes)
impl AsRef<[u8]> for PooledBuffer {
fn as_ref(&self) -> &[u8] {
&self.inner
}
}

impl Buf for PooledBuffer {
fn remaining(&self) -> usize {
self.inner.remaining()
}

fn chunk(&self) -> &[u8] {
self.inner.chunk()
}

fn advance(&mut self, cnt: usize) {
Buf::advance(&mut self.inner, cnt)
}

fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
self.inner.chunks_vectored(dst)
impl From<AlignedBuffer> for PooledBuffer {
fn from(buffer: AlignedBuffer) -> Self {
Self::from_existing(buffer)
}
}

Expand Down
Loading
Loading