Skip to content
Open
118 changes: 60 additions & 58 deletions crates/loro-internal/src/arena.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod str_arena;
use self::str_arena::StrArena;
use crate::sync::MutexExt as _;
use crate::sync::{Mutex, MutexGuard};
use crate::{
change::Lamport,
Expand Down Expand Up @@ -141,17 +142,17 @@ impl SharedArena {
Self {
inner: Arc::new(InnerSharedArena {
container_idx_to_id: Mutex::new(
self.inner.container_idx_to_id.lock().unwrap().clone(),
self.inner.container_idx_to_id.lock_unpoisoned().clone(),
),
depth: Mutex::new(self.inner.depth.lock().unwrap().clone()),
depth: Mutex::new(self.inner.depth.lock_unpoisoned().clone()),
container_id_to_idx: Mutex::new(
self.inner.container_id_to_idx.lock().unwrap().clone(),
self.inner.container_id_to_idx.lock_unpoisoned().clone(),
),
parents: Mutex::new(self.inner.parents.lock().unwrap().clone()),
values: Mutex::new(self.inner.values.lock().unwrap().clone()),
root_c_idx: Mutex::new(self.inner.root_c_idx.lock().unwrap().clone()),
parents: Mutex::new(self.inner.parents.lock_unpoisoned().clone()),
values: Mutex::new(self.inner.values.lock_unpoisoned().clone()),
root_c_idx: Mutex::new(self.inner.root_c_idx.lock_unpoisoned().clone()),
str: self.inner.str.clone(),
parent_resolver: Mutex::new(self.inner.parent_resolver.lock().unwrap().clone()),
parent_resolver: Mutex::new(self.inner.parent_resolver.lock_unpoisoned().clone()),
}),
}
}
Expand All @@ -163,38 +164,38 @@ impl SharedArena {

fn get_arena_guards(&self) -> ArenaGuards<'_> {
ArenaGuards {
container_id_to_idx: self.inner.container_id_to_idx.lock().unwrap(),
container_idx_to_id: self.inner.container_idx_to_id.lock().unwrap(),
depth: self.inner.depth.lock().unwrap(),
parents: self.inner.parents.lock().unwrap(),
root_c_idx: self.inner.root_c_idx.lock().unwrap(),
parent_resolver: self.inner.parent_resolver.lock().unwrap(),
container_id_to_idx: self.inner.container_id_to_idx.lock_unpoisoned(),
container_idx_to_id: self.inner.container_idx_to_id.lock_unpoisoned(),
depth: self.inner.depth.lock_unpoisoned(),
parents: self.inner.parents.lock_unpoisoned(),
root_c_idx: self.inner.root_c_idx.lock_unpoisoned(),
parent_resolver: self.inner.parent_resolver.lock_unpoisoned(),
}
}

pub fn register_container(&self, id: &ContainerID) -> ContainerIdx {
let mut container_id_to_idx = self.inner.container_id_to_idx.lock().unwrap();
let mut container_id_to_idx = self.inner.container_id_to_idx.lock_unpoisoned();
if let Some(&idx) = container_id_to_idx.get(id) {
return idx;
}

let mut container_idx_to_id = self.inner.container_idx_to_id.lock().unwrap();
let mut container_idx_to_id = self.inner.container_idx_to_id.lock_unpoisoned();
let idx = container_idx_to_id.len();
container_idx_to_id.push(id.clone());
let idx = ContainerIdx::from_index_and_type(idx as u32, id.container_type());
container_id_to_idx.insert(id.clone(), idx);
if id.is_root() {
self.inner.root_c_idx.lock().unwrap().push(idx);
self.inner.parents.lock().unwrap().insert(idx, None);
self.inner.depth.lock().unwrap().push(NonZeroU16::new(1));
self.inner.root_c_idx.lock_unpoisoned().push(idx);
self.inner.parents.lock_unpoisoned().insert(idx, None);
self.inner.depth.lock_unpoisoned().push(NonZeroU16::new(1));
} else {
self.inner.depth.lock().unwrap().push(None);
self.inner.depth.lock_unpoisoned().push(None);
}
idx
}

pub fn get_container_id(&self, idx: ContainerIdx) -> Option<ContainerID> {
let lock = self.inner.container_idx_to_id.lock().unwrap();
let lock = self.inner.container_idx_to_id.lock_unpoisoned();
lock.get(idx.to_index() as usize).cloned()
}

Expand All @@ -218,64 +219,64 @@ impl SharedArena {

#[inline]
pub fn idx_to_id(&self, id: ContainerIdx) -> Option<ContainerID> {
let lock = self.inner.container_idx_to_id.lock().unwrap();
let lock = self.inner.container_idx_to_id.lock_unpoisoned();
lock.get(id.to_index() as usize).cloned()
}

#[inline]
pub fn with_idx_to_id<R>(&self, f: impl FnOnce(&Vec<ContainerID>) -> R) -> R {
let lock = self.inner.container_idx_to_id.lock().unwrap();
let lock = self.inner.container_idx_to_id.lock_unpoisoned();
f(&lock)
}

pub fn alloc_str(&self, str: &str) -> StrAllocResult {
let mut text_lock = self.inner.str.lock().unwrap();
let mut text_lock = self.inner.str.lock_unpoisoned();
_alloc_str(&mut text_lock, str)
}

/// return slice and unicode index
pub fn alloc_str_with_slice(&self, str: &str) -> (BytesSlice, StrAllocResult) {
let mut text_lock = self.inner.str.lock().unwrap();
let mut text_lock = self.inner.str.lock_unpoisoned();
_alloc_str_with_slice(&mut text_lock, str)
}

/// alloc str without extra info
pub fn alloc_str_fast(&self, bytes: &[u8]) {
let mut text_lock = self.inner.str.lock().unwrap();
let mut text_lock = self.inner.str.lock_unpoisoned();
text_lock.alloc(std::str::from_utf8(bytes).unwrap());
}

#[inline]
pub fn utf16_len(&self) -> usize {
self.inner.str.lock().unwrap().len_utf16()
self.inner.str.lock_unpoisoned().len_utf16()
}

#[inline]
pub fn alloc_value(&self, value: LoroValue) -> usize {
let mut values_lock = self.inner.values.lock().unwrap();
let mut values_lock = self.inner.values.lock_unpoisoned();
_alloc_value(&mut values_lock, value)
}

#[inline]
pub fn alloc_values(&self, values: impl Iterator<Item = LoroValue>) -> std::ops::Range<usize> {
let mut values_lock = self.inner.values.lock().unwrap();
let mut values_lock = self.inner.values.lock_unpoisoned();
_alloc_values(&mut values_lock, values)
}

#[inline]
pub fn set_parent(&self, child: ContainerIdx, parent: Option<ContainerIdx>) {
let mut parents = self.inner.parents.lock().unwrap();
let mut parents = self.inner.parents.lock_unpoisoned();
parents.insert(child, parent);
let mut depth = self.inner.depth.lock().unwrap();
let mut root_c_idx = self.inner.root_c_idx.lock().unwrap();
let mut depth = self.inner.depth.lock_unpoisoned();
let mut root_c_idx = self.inner.root_c_idx.lock_unpoisoned();

match parent {
Some(p) => {
// Acquire the two maps as mutable guards so we can lazily register
// unknown parents while computing depth.
let mut idx_to_id_guard = self.inner.container_idx_to_id.lock().unwrap();
let mut id_to_idx_guard = self.inner.container_id_to_idx.lock().unwrap();
let parent_resolver_guard = self.inner.parent_resolver.lock().unwrap();
let mut idx_to_id_guard = self.inner.container_idx_to_id.lock_unpoisoned();
let mut id_to_idx_guard = self.inner.container_id_to_idx.lock_unpoisoned();
let parent_resolver_guard = self.inner.parent_resolver.lock_unpoisoned();
if let Some(d) = get_depth(
p,
&mut depth,
Expand All @@ -298,7 +299,7 @@ impl SharedArena {

pub fn log_hierarchy(&self) {
if cfg!(debug_assertions) {
for (c, p) in self.inner.parents.lock().unwrap().iter() {
for (c, p) in self.inner.parents.lock_unpoisoned().iter() {
tracing::info!(
"container {:?} {:?} {:?}",
c,
Expand Down Expand Up @@ -337,12 +338,12 @@ impl SharedArena {
}

// Try fast path first
if let Some(p) = self.inner.parents.lock().unwrap().get(&child).copied() {
if let Some(p) = self.inner.parents.lock_unpoisoned().get(&child).copied() {
return p;
}

// Fallback: try to resolve parent lazily via the resolver if provided.
let resolver = self.inner.parent_resolver.lock().unwrap().clone();
let resolver = self.inner.parent_resolver.lock_unpoisoned().clone();
if let Some(resolver) = resolver {
let child_id = self.get_container_id(child).unwrap();
if let Some(parent_id) = resolver(child_id.clone()) {
Expand Down Expand Up @@ -370,17 +371,17 @@ impl SharedArena {

#[inline]
pub fn slice_by_unicode(&self, range: impl RangeBounds<usize>) -> BytesSlice {
self.inner.str.lock().unwrap().slice_by_unicode(range)
self.inner.str.lock_unpoisoned().slice_by_unicode(range)
}

#[inline]
pub fn slice_by_utf8(&self, range: impl RangeBounds<usize>) -> BytesSlice {
self.inner.str.lock().unwrap().slice_bytes(range)
self.inner.str.lock_unpoisoned().slice_bytes(range)
}

#[inline]
pub fn slice_str_by_unicode_range(&self, range: Range<usize>) -> String {
let mut s = self.inner.str.lock().unwrap();
let mut s = self.inner.str.lock_unpoisoned();
let s: &mut StrArena = &mut s;
let mut ans = String::with_capacity(range.len());
ans.push_str(s.slice_str_by_unicode(range));
Expand All @@ -389,17 +390,17 @@ impl SharedArena {

#[inline]
pub fn with_text_slice(&self, range: Range<usize>, mut f: impl FnMut(&str)) {
f(self.inner.str.lock().unwrap().slice_str_by_unicode(range))
f(self.inner.str.lock_unpoisoned().slice_str_by_unicode(range))
}

#[inline]
pub fn get_value(&self, idx: usize) -> Option<LoroValue> {
self.inner.values.lock().unwrap().get(idx).cloned()
self.inner.values.lock_unpoisoned().get(idx).cloned()
}

#[inline]
pub fn get_values(&self, range: Range<usize>) -> Vec<LoroValue> {
(self.inner.values.lock().unwrap()[range]).to_vec()
(self.inner.values.lock_unpoisoned()[range]).to_vec()
}

pub fn convert_single_op(
Expand All @@ -415,7 +416,8 @@ impl SharedArena {
}

pub fn can_import_snapshot(&self) -> bool {
self.inner.str.lock().unwrap().is_empty() && self.inner.values.lock().unwrap().is_empty()
self.inner.str.lock_unpoisoned().is_empty()
&& self.inner.values.lock_unpoisoned().is_empty()
}

fn inner_convert_op(
Expand Down Expand Up @@ -540,12 +542,12 @@ impl SharedArena {

#[inline]
pub fn export_containers(&self) -> Vec<ContainerID> {
self.inner.container_idx_to_id.lock().unwrap().clone()
self.inner.container_idx_to_id.lock_unpoisoned().clone()
}

pub fn export_parents(&self) -> Vec<Option<ContainerIdx>> {
let parents = self.inner.parents.lock().unwrap();
let containers = self.inner.container_idx_to_id.lock().unwrap();
let parents = self.inner.parents.lock_unpoisoned();
let containers = self.inner.container_idx_to_id.lock_unpoisoned();
containers
.iter()
.enumerate()
Expand All @@ -563,17 +565,17 @@ impl SharedArena {
/// So we need the flag type here.
#[inline]
pub(crate) fn root_containers(&self, _f: LoadAllFlag) -> Vec<ContainerIdx> {
self.inner.root_c_idx.lock().unwrap().clone()
self.inner.root_c_idx.lock_unpoisoned().clone()
}

// TODO: this can return a u16 directly now, since the depths are always valid
pub(crate) fn get_depth(&self, container: ContainerIdx) -> Option<NonZeroU16> {
let mut depth_guard = self.inner.depth.lock().unwrap();
let mut parents_guard = self.inner.parents.lock().unwrap();
let mut root_c_idx_guard = self.inner.root_c_idx.lock().unwrap();
let resolver_guard = self.inner.parent_resolver.lock().unwrap();
let mut idx_to_id_guard = self.inner.container_idx_to_id.lock().unwrap();
let mut id_to_idx_guard = self.inner.container_id_to_idx.lock().unwrap();
let mut depth_guard = self.inner.depth.lock_unpoisoned();
let mut parents_guard = self.inner.parents.lock_unpoisoned();
let mut root_c_idx_guard = self.inner.root_c_idx.lock_unpoisoned();
let resolver_guard = self.inner.parent_resolver.lock_unpoisoned();
let mut idx_to_id_guard = self.inner.container_idx_to_id.lock_unpoisoned();
let mut id_to_idx_guard = self.inner.container_id_to_idx.lock_unpoisoned();
get_depth(
container,
&mut depth_guard,
Expand All @@ -589,7 +591,7 @@ impl SharedArena {
&self,
range: Range<usize>,
) -> impl Iterator<Item = LoroValue> + '_ {
let values = self.inner.values.lock().unwrap();
let values = self.inner.values.lock_unpoisoned();
range
.into_iter()
.map(move |i| values.get(i).unwrap().clone())
Expand All @@ -599,7 +601,7 @@ impl SharedArena {
&self,
root_index: &loro_common::InternalString,
) -> Option<ContainerIdx> {
let inner = self.inner.container_id_to_idx.lock().unwrap();
let inner = self.inner.container_id_to_idx.lock_unpoisoned();
for t in loro_common::ContainerType::ALL_TYPES.iter() {
let cid = ContainerID::Root {
name: root_index.clone(),
Expand All @@ -614,7 +616,7 @@ impl SharedArena {

#[allow(unused)]
pub(crate) fn log_all_values(&self) {
let values = self.inner.values.lock().unwrap();
let values = self.inner.values.lock_unpoisoned();
for (i, v) in values.iter().enumerate() {
loro_common::debug!("value {} {:?}", i, v);
}
Expand Down Expand Up @@ -760,7 +762,7 @@ impl SharedArena {
where
F: Fn(ContainerID) -> Option<ContainerID> + Send + Sync + 'static,
{
let mut slot = self.inner.parent_resolver.lock().unwrap();
let mut slot = self.inner.parent_resolver.lock_unpoisoned();
*slot = resolver.map(|f| Arc::new(f) as Arc<ParentResolver>);
}
}
Loading
Loading