Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ of panicking drop implementations.
- Added `from_bytes_truncating_at_nul` to `CString`
- Added `CString::{into_bytes, into_bytes_with_nul, into_string}`
- Added `pop_front_if` and `pop_back_if` to `Deque`
- Fixed long division being instroduced by the const-erasure in spsc

## [v0.9.2] 2025-11-12

Expand Down
102 changes: 87 additions & 15 deletions src/spsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ pub struct QueueInner<T, S: Storage> {
/// A statically allocated single-producer, single-consumer queue with a capacity of `N - 1`
/// elements.
///
/// <div class="warning">
///
/// To get better performance, use a value for `N` that is a power of 2.
///
/// </div>
///
/// You will likely want to use [`split`](QueueInner::split) to create a producer-consumer pair.
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;

Expand Down Expand Up @@ -182,7 +176,12 @@ impl<T, S: Storage> QueueInner<T, S> {

#[inline]
fn increment(&self, val: usize) -> usize {
(val + 1) % self.n()
let val = val + 1;
if val >= self.n() {
val - self.n()
} else {
val
}
}

#[inline]
Expand All @@ -202,10 +201,13 @@ impl<T, S: Storage> QueueInner<T, S> {
let current_head = self.head.load(Ordering::Relaxed);
let current_tail = self.tail.load(Ordering::Relaxed);

current_tail
.wrapping_sub(current_head)
.wrapping_add(self.n())
% self.n()
if current_tail >= current_head {
current_tail - current_head
} else {
current_tail
.wrapping_sub(current_head)
.wrapping_add(self.n())
}
}

/// Returns whether the queue is empty.
Expand Down Expand Up @@ -626,7 +628,8 @@ impl<'a, T> Iterator for Iter<'a, T> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);

let i = (head + self.index) % self.rb.n();
let i = head + self.index;
let i = if i >= self.rb.n() { i - self.rb.n() } else { i };
self.index += 1;

Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
Expand All @@ -643,7 +646,8 @@ impl<'a, T> Iterator for IterMut<'a, T> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);

let i = (head + self.index) % self.rb.n();
let i = head + self.index;
let i = if i >= self.rb.n() { i - self.rb.n() } else { i };
self.index += 1;

Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
Expand All @@ -659,7 +663,8 @@ impl<T> DoubleEndedIterator for Iter<'_, T> {
let head = self.rb.head.load(Ordering::Relaxed);

// self.len > 0, since it's larger than self.index > 0
let i = (head + self.len - 1) % self.rb.n();
let i = head + self.len - 1;
let i = if i >= self.rb.n() { i - self.rb.n() } else { i };
self.len -= 1;
Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
} else {
Expand All @@ -674,7 +679,8 @@ impl<T> DoubleEndedIterator for IterMut<'_, T> {
let head = self.rb.head.load(Ordering::Relaxed);

// self.len > 0, since it's larger than self.index > 0
let i = (head + self.len - 1) % self.rb.n();
let i = head + self.len - 1;
let i = if i >= self.rb.n() { i - self.rb.n() } else { i };
self.len -= 1;
Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
} else {
Expand Down Expand Up @@ -1076,6 +1082,28 @@ mod tests {
assert_eq!(items.next(), None);
}

/// Exercise the modulo self.n() operation in next()
#[test]
fn iter_modulo() {
let mut rb: Queue<i32, 4> = Queue::new();

for _ in 0..2 {
rb.enqueue(0).unwrap();
rb.dequeue().unwrap();
}
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.enqueue(3).unwrap();

let mut items = rb.iter();

// assert_eq!(items.next(), Some(&0));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), Some(&2));
assert_eq!(items.next(), Some(&3));
assert_eq!(items.next(), None);
}

#[test]
fn iter_double_ended() {
let mut rb: Queue<i32, 4> = Queue::new();
Expand All @@ -1093,6 +1121,28 @@ mod tests {
assert_eq!(items.next_back(), None);
}

/// Test that the modulo in next_back works as expected
#[test]
fn iter_double_ended_modulo() {
let mut rb: Queue<i32, 4> = Queue::new();

for _ in 0..2 {
rb.enqueue(0).unwrap();
rb.dequeue().unwrap();
}
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();

let mut items = rb.iter();

assert_eq!(items.next(), Some(&0));
assert_eq!(items.next_back(), Some(&2));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}

#[test]
fn iter_mut() {
let mut rb: Queue<i32, 4> = Queue::new();
Expand Down Expand Up @@ -1126,6 +1176,28 @@ mod tests {
assert_eq!(items.next_back(), None);
}

/// Test that the modulo in next_back works as expected
#[test]
fn iter_mut_double_ended_modulo() {
let mut rb: Queue<i32, 4> = Queue::new();

for _ in 0..2 {
rb.enqueue(0).unwrap();
rb.dequeue().unwrap();
}
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();

let mut items = rb.iter_mut();

assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next_back(), Some(&mut 2));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}

#[test]
fn wrap_around() {
let mut rb: Queue<i32, 4> = Queue::new();
Expand Down
Loading