Skip to content

Commit fd1500f

Browse files
authored
refactor!: variable size writes for app storage (#24)
* rewrite storage variable size writes * async pruning * simplify * add test case
1 parent 3bd8799 commit fd1500f

2 files changed

Lines changed: 208 additions & 90 deletions

File tree

crates/storage/src/qmdb_impl.rs

Lines changed: 203 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,35 +13,37 @@
1313

1414
use crate::cache::{CachedValue, ShardedDbCache};
1515
use crate::metrics::OptionalMetrics;
16-
use crate::types::{
17-
create_storage_key, create_storage_value_chunk, extract_value_from_chunk, StorageKey,
18-
StorageValueChunk, MAX_VALUE_DATA_SIZE,
19-
};
16+
use crate::types::{create_storage_key, StorageKey, MAX_VALUE_SIZE};
2017
use async_trait::async_trait;
18+
use commonware_codec::RangeCfg;
2119
use commonware_cryptography::sha256::Sha256;
2220
use commonware_runtime::{
2321
buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage,
2422
};
23+
use commonware_storage::qmdb::current::{unordered::variable::Db, VariableConfig};
2524
use commonware_storage::translator::EightCap;
26-
use commonware_storage::{
27-
qmdb::current::{unordered::fixed::Db, FixedConfig},
28-
Persistable,
29-
};
3025
use evolve_core::{ErrorCode, ReadonlyKV};
3126
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
3227
use std::sync::Arc;
33-
use std::time::Instant;
28+
use std::time::{Duration, Instant};
3429
use thiserror::Error;
3530
use tokio::runtime::RuntimeFlavor;
36-
use tokio::sync::RwLock;
31+
use tokio::sync::{
32+
mpsc::{unbounded_channel, UnboundedSender},
33+
RwLock,
34+
};
3735

3836
/// Type alias for QMDB in Current state.
3937
/// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`.
40-
type QmdbCurrent<C> = Db<C, StorageKey, StorageValueChunk, Sha256, EightCap, 64>;
38+
type QmdbCurrent<C> = Db<C, StorageKey, Vec<u8>, Sha256, EightCap, 64>;
39+
40+
const PRUNE_SCHEDULE_DELAY: Duration = Duration::from_millis(50);
41+
const PRUNE_RETRY_DELAY: Duration = Duration::from_millis(25);
42+
const PRUNE_MAX_LOCK_RETRIES: usize = 20;
4143

4244
#[derive(Debug)]
4345
struct PreparedBatch {
44-
updates: Vec<(StorageKey, Option<StorageValueChunk>)>,
46+
updates: Vec<(StorageKey, Option<Vec<u8>>)>,
4547
keys_to_invalidate: Vec<Vec<u8>>,
4648
ops_count: usize,
4749
sets: usize,
@@ -57,7 +59,7 @@ pub enum StorageError {
5759
#[error("Key error")]
5860
Key(ErrorCode),
5961

60-
#[error("Value too large for single chunk: {size} bytes (max: {max})")]
62+
#[error("Value too large: {size} bytes (max: {max})")]
6163
ValueTooLarge { size: usize, max: usize },
6264

6365
#[error("IO error: {0}")]
@@ -114,6 +116,7 @@ where
114116
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
115117
{
116118
db: Arc<RwLock<QmdbCurrent<C>>>,
119+
prune_tx: UnboundedSender<()>,
117120
/// Read cache for fast synchronous lookups
118121
cache: Arc<ShardedDbCache>,
119122
/// Optional metrics for monitoring storage performance
@@ -127,6 +130,7 @@ where
127130
fn clone(&self) -> Self {
128131
Self {
129132
db: self.db.clone(),
133+
prune_tx: self.prune_tx.clone(),
130134
cache: self.cache.clone(),
131135
metrics: self.metrics.clone(),
132136
}
@@ -137,6 +141,46 @@ impl<C> QmdbStorage<C>
137141
where
138142
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
139143
{
144+
fn spawn_prune_worker(db: Arc<RwLock<QmdbCurrent<C>>>) -> UnboundedSender<()> {
145+
let (prune_tx, mut prune_rx) = unbounded_channel::<()>();
146+
147+
tokio::spawn(async move {
148+
while prune_rx.recv().await.is_some() {
149+
tokio::time::sleep(PRUNE_SCHEDULE_DELAY).await;
150+
// Drain any signals that arrived during the delay.
151+
while prune_rx.try_recv().is_ok() {}
152+
153+
// Acquire write lock with bounded retries.
154+
let mut db_guard = None;
155+
for _ in 0..PRUNE_MAX_LOCK_RETRIES {
156+
match db.try_write() {
157+
Ok(guard) => {
158+
db_guard = Some(guard);
159+
break;
160+
}
161+
Err(_) => tokio::time::sleep(PRUNE_RETRY_DELAY).await,
162+
}
163+
}
164+
165+
let Some(mut db) = db_guard else {
166+
tracing::warn!("prune worker: could not acquire write lock, skipping cycle");
167+
continue;
168+
};
169+
170+
let prune_loc = db.inactivity_floor_loc();
171+
if let Err(err) = db.prune(prune_loc).await {
172+
tracing::error!("background prune failed: {err}");
173+
continue;
174+
}
175+
if let Err(err) = db.sync().await {
176+
tracing::error!("background prune sync failed: {err}");
177+
}
178+
}
179+
});
180+
181+
prune_tx
182+
}
183+
140184
/// Create a new QmdbStorage instance
141185
pub async fn new(
142186
context: C,
@@ -172,10 +216,12 @@ where
172216
StorageError::InvalidConfig("write_buffer_size must be non-zero".to_string())
173217
})?;
174218

175-
let qmdb_config = FixedConfig {
176-
log_journal_partition: format!("{}_log-journal", config.partition_prefix),
219+
let qmdb_config = VariableConfig {
220+
log_partition: format!("{}_log-journal", config.partition_prefix),
177221
log_items_per_blob: NonZeroU64::new(1000).unwrap(),
178222
log_write_buffer: write_buffer_size,
223+
log_compression: None,
224+
log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_SIZE), ())),
179225
mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix),
180226
mmr_items_per_blob: NonZeroU64::new(1000).unwrap(),
181227
mmr_write_buffer: write_buffer_size,
@@ -189,12 +235,16 @@ where
189235
page_cache: CacheRef::from_pooler(&context, page_size, capacity),
190236
};
191237

192-
let db = Db::init(context, qmdb_config)
193-
.await
194-
.map_err(map_qmdb_error)?;
238+
let db = Arc::new(RwLock::new(
239+
Db::init(context, qmdb_config)
240+
.await
241+
.map_err(map_qmdb_error)?,
242+
));
243+
let prune_tx = Self::spawn_prune_worker(db.clone());
195244

196245
Ok(Self {
197-
db: Arc::new(RwLock::new(db)),
246+
db,
247+
prune_tx,
198248
cache: Arc::new(ShardedDbCache::with_defaults()),
199249
metrics,
200250
})
@@ -304,18 +354,17 @@ where
304354
for op in operations {
305355
match op {
306356
crate::types::Operation::Set { key, value } => {
307-
if value.len() > MAX_VALUE_DATA_SIZE {
357+
if value.len() > MAX_VALUE_SIZE {
308358
return Err(StorageError::ValueTooLarge {
309359
size: value.len(),
310-
max: MAX_VALUE_DATA_SIZE,
360+
max: MAX_VALUE_SIZE,
311361
});
312362
}
313363

314364
sets += 1;
315365
let storage_key = create_storage_key(&key)?;
316-
let storage_value = create_storage_value_chunk(&value)?;
317366
keys_to_invalidate.push(key);
318-
updates.push((storage_key, Some(storage_value)));
367+
updates.push((storage_key, Some(value)));
319368
}
320369
crate::types::Operation::Remove { key } => {
321370
deletes += 1;
@@ -339,8 +388,10 @@ where
339388
pub async fn commit_state(&self) -> Result<crate::types::CommitHash, StorageError> {
340389
let start = Instant::now();
341390
let db = self.db.read().await;
342-
db.commit().await.map_err(map_qmdb_error)?;
391+
db.sync().await.map_err(map_qmdb_error)?;
343392
let hash = root_to_commit_hash(db.root())?;
393+
drop(db);
394+
let _ = self.prune_tx.send(());
344395
self.metrics.record_commit(start.elapsed().as_secs_f64());
345396
Ok(hash)
346397
}
@@ -429,17 +480,10 @@ where
429480
}
430481

431482
fn decode_storage_value(
432-
result: Result<Option<StorageValueChunk>, impl std::fmt::Display>,
483+
result: Result<Option<Vec<u8>>, impl std::fmt::Display>,
433484
) -> Result<Option<Vec<u8>>, ErrorCode> {
434485
match result {
435-
Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) {
436-
Some(data) if data.is_empty() => Ok(None),
437-
Some(data) => Ok(Some(data)),
438-
None => {
439-
tracing::warn!("Invalid value chunk format, treating as absent");
440-
Ok(None)
441-
}
442-
},
486+
Ok(Some(value)) => Ok(Some(value)),
443487
Ok(None) => Ok(None),
444488
Err(e) => {
445489
let err_str = e.to_string();
@@ -740,8 +784,8 @@ mod tests {
740784
runner.start(|context| async move {
741785
let storage = QmdbStorage::new(context, config).await.unwrap();
742786

743-
// Value exactly at MAX_VALUE_DATA_SIZE limit should work
744-
let max_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE];
787+
// Value exactly at MAX_VALUE_SIZE limit should work
788+
let max_value = vec![b'v'; crate::types::MAX_VALUE_SIZE];
745789
storage
746790
.apply_batch(vec![crate::types::Operation::Set {
747791
key: b"key".to_vec(),
@@ -753,8 +797,8 @@ mod tests {
753797
let retrieved = storage.get(b"key").unwrap();
754798
assert_eq!(retrieved, Some(max_value));
755799

756-
// Value exceeding MAX_VALUE_DATA_SIZE should fail
757-
let oversized_value = vec![b'v'; crate::types::MAX_VALUE_DATA_SIZE + 1];
800+
// Value exceeding MAX_VALUE_SIZE should fail
801+
let oversized_value = vec![b'v'; crate::types::MAX_VALUE_SIZE + 1];
758802
let result = storage
759803
.apply_batch(vec![crate::types::Operation::Set {
760804
key: b"key2".to_vec(),
@@ -765,8 +809,8 @@ mod tests {
765809
assert!(result.is_err());
766810
match result.unwrap_err() {
767811
StorageError::ValueTooLarge { size, max } => {
768-
assert_eq!(size, crate::types::MAX_VALUE_DATA_SIZE + 1);
769-
assert_eq!(max, crate::types::MAX_VALUE_DATA_SIZE);
812+
assert_eq!(size, crate::types::MAX_VALUE_SIZE + 1);
813+
assert_eq!(max, crate::types::MAX_VALUE_SIZE);
770814
}
771815
e => panic!("Expected ValueTooLarge, got {:?}", e),
772816
}
@@ -1043,8 +1087,6 @@ mod tests {
10431087
runner.start(|context| async move {
10441088
let storage = QmdbStorage::new(context, config).await.unwrap();
10451089

1046-
// Empty values are treated as removed (since we use all-zeros to signal removal)
1047-
// This is a known limitation
10481090
storage
10491091
.apply_batch(vec![crate::types::Operation::Set {
10501092
key: b"empty".to_vec(),
@@ -1053,9 +1095,8 @@ mod tests {
10531095
.await
10541096
.unwrap();
10551097

1056-
// Empty value should be treated as None (due to removal semantics)
10571098
let result = storage.get(b"empty").unwrap();
1058-
assert_eq!(result, None);
1099+
assert_eq!(result, Some(Vec::new()));
10591100
})
10601101
}
10611102

@@ -1325,6 +1366,125 @@ mod tests {
13251366
})
13261367
}
13271368

1369+
#[test]
1370+
fn test_commit_prunes_inactive_history() {
1371+
let temp_dir = TempDir::new().unwrap();
1372+
let config = crate::types::StorageConfig {
1373+
path: temp_dir.path().to_path_buf(),
1374+
..Default::default()
1375+
};
1376+
1377+
let runtime_config = TokioConfig::default()
1378+
.with_storage_directory(temp_dir.path())
1379+
.with_worker_threads(2);
1380+
1381+
let runner = Runner::new(runtime_config);
1382+
1383+
runner.start(|context| async move {
1384+
use commonware_storage::qmdb::store::LogStore;
1385+
1386+
let storage = QmdbStorage::new(context, config).await.unwrap();
1387+
const KEYS: usize = 1_100;
1388+
1389+
let initial_ops = (0..KEYS)
1390+
.map(|i| crate::types::Operation::Set {
1391+
key: format!("key-{i}").into_bytes(),
1392+
value: format!("value-{i}-v1").into_bytes(),
1393+
})
1394+
.collect();
1395+
storage.apply_batch(initial_ops).await.unwrap();
1396+
storage.commit_state().await.unwrap();
1397+
1398+
let start_before = {
1399+
let db = storage.db.read().await;
1400+
*db.bounds().await.start
1401+
};
1402+
1403+
let second_ops = (0..KEYS)
1404+
.map(|i| {
1405+
if i % 5 == 0 {
1406+
crate::types::Operation::Remove {
1407+
key: format!("key-{i}").into_bytes(),
1408+
}
1409+
} else {
1410+
crate::types::Operation::Set {
1411+
key: format!("key-{i}").into_bytes(),
1412+
value: format!("value-{i}-v2").into_bytes(),
1413+
}
1414+
}
1415+
})
1416+
.collect();
1417+
storage.apply_batch(second_ops).await.unwrap();
1418+
storage.commit_state().await.unwrap();
1419+
1420+
let mut start_after = start_before;
1421+
for _ in 0..50 {
1422+
start_after = {
1423+
let db = storage.db.read().await;
1424+
*db.bounds().await.start
1425+
};
1426+
if start_after > start_before {
1427+
break;
1428+
}
1429+
tokio::time::sleep(Duration::from_millis(20)).await;
1430+
}
1431+
1432+
assert!(
1433+
start_after > start_before,
1434+
"prune boundary did not advance: before={start_before}, after={start_after}"
1435+
);
1436+
assert_eq!(storage.get(b"key-1").unwrap(), Some(b"value-1-v2".to_vec()));
1437+
assert_eq!(storage.get(b"key-0").unwrap(), None);
1438+
})
1439+
}
1440+
1441+
#[test]
1442+
fn test_variable_codec_survives_reopen() {
1443+
let temp_dir = TempDir::new().unwrap();
1444+
let config = crate::types::StorageConfig {
1445+
path: temp_dir.path().to_path_buf(),
1446+
..Default::default()
1447+
};
1448+
1449+
let runtime_config = TokioConfig::default()
1450+
.with_storage_directory(temp_dir.path())
1451+
.with_worker_threads(2);
1452+
1453+
let runner = Runner::new(runtime_config);
1454+
1455+
runner.start(|context| async move {
1456+
let config2 = config.clone();
1457+
let ctx2 = context.clone();
1458+
1459+
let storage = QmdbStorage::new(context, config).await.unwrap();
1460+
storage
1461+
.apply_batch(vec![
1462+
crate::types::Operation::Set {
1463+
key: b"k1".to_vec(),
1464+
value: b"hello".to_vec(),
1465+
},
1466+
crate::types::Operation::Set {
1467+
key: b"k2".to_vec(),
1468+
value: vec![0u8; 100],
1469+
},
1470+
crate::types::Operation::Set {
1471+
key: b"k3".to_vec(),
1472+
value: Vec::new(),
1473+
},
1474+
])
1475+
.await
1476+
.unwrap();
1477+
storage.commit_state().await.unwrap();
1478+
drop(storage);
1479+
1480+
// Reopen from the same directory and verify values survived.
1481+
let reopened = QmdbStorage::new(ctx2, config2).await.unwrap();
1482+
assert_eq!(reopened.get(b"k1").unwrap(), Some(b"hello".to_vec()));
1483+
assert_eq!(reopened.get(b"k2").unwrap(), Some(vec![0u8; 100]));
1484+
assert_eq!(reopened.get(b"k3").unwrap(), Some(Vec::new()));
1485+
})
1486+
}
1487+
13281488
#[test]
13291489
fn test_preview_batch_root_matches_eventual_commit_hash() {
13301490
let temp_dir = TempDir::new().unwrap();

0 commit comments

Comments
 (0)