diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index ed20935f..95a50ab1 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -212,6 +212,12 @@ fn legacy_tombstone_filter() -> v2::RowFilter { /// Creates a row filter that matches any tombstone row, new- or legacy-format. /// +/// ## Predicate Matches +/// +/// Must be used with `false_mutations` and `predicate_matched == false`. +/// +/// ## Details +/// /// New format: presence of the `r` column. /// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON. /// @@ -241,7 +247,13 @@ fn exact_value_regex(value: &str) -> Vec { format!("^{}$", regex::escape(value)).into_bytes() } -/// Creates a row filter that matches tombstones whose redirect resolves to `target`. +/// Matches tombstones whose redirect resolves to `target`. +/// +/// ## Predicate Matches +/// +/// Must be used with `true_mutations` and `predicate_matched == true`. +/// +/// ## Details /// /// Always includes an exact match on the `r` (redirect) column: /// - Chain: `r` column present AND value == `target` storage path @@ -293,6 +305,62 @@ fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter } } +/// Matches tombstones whose redirect resolves to either `old` or `new`. +/// +/// ## Predicate Matches +/// +/// Must be used with `true_mutations` and `predicate_matched == true`. +/// +/// ## Details +/// +/// Equivalent to `t == old || t == new`. Built as an Interleave of two +/// [`redirect_target_filter`] calls — yields cells iff at least one branch +/// matches. An absent row or non-tombstone row yields 0 cells, so +/// `predicate_matched = false` (conflict). +fn update_filter(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Interleave( + v2::row_filter::Interleave { + filters: vec![ + redirect_target_filter(old, own_id), + redirect_target_filter(new, own_id), + ], + }, + )), + } +} + +/// Matches rows where no conflicting tombstone exists: +/// +/// ## Predicate Matches +/// +/// Must be used with `false_mutations` and `predicate_matched == false`. +/// +/// ## Details +/// +/// Matches either no tombstone, or the redirect already points to `target`. +/// Built as an inverted `Condition` filter: +/// +/// - Predicate: [`redirect_target_filter`]`(target)` — does the row have a +/// tombstone pointing to `target`? +/// - True branch: `BlockAllFilter` → 0 cells. +/// - False branch: [`tombstone_predicate`] → 0 cells when no tombstone +/// +/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases. +fn optional_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Condition(Box::new( + v2::row_filter::Condition { + predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))), + true_filter: Some(Box::new(v2::RowFilter { + filter: Some(v2::row_filter::Filter::BlockAllFilter(true)), + })), + false_filter: Some(Box::new(tombstone_predicate())), + }, + ))), + } +} + /// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`). /// /// Used by metadata-only reads to avoid fetching the (potentially large) payload column @@ -958,23 +1026,23 @@ impl HighVolumeBackend for BigTableBackend { let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); + // `invert`: filters that use false_mutations (predicate_matched == false means safe). + let (predicate_filter, invert) = match (current, write.target()) { + (Some(old), Some(new)) => (update_filter(old, new, id), false), + (Some(target), None) => (optional_target_filter(target, id), true), + (None, Some(target)) => (optional_target_filter(target, id), true), + (None, None) => (tombstone_predicate(), true), + }; + let write_mutations = match write { TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(), TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(), TieredWrite::Delete => vec![delete_row_mutation()], }; - let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current { - // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. - // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). - None => (tombstone_predicate(), vec![], write_mutations, false), - // Success = redirect matches target. Predicate match = CAS success. - Some(target) => ( - redirect_target_filter(target, id), - write_mutations, - vec![], - true, - ), + let (true_mutations, false_mutations) = match invert { + true => (vec![], write_mutations), + false => (write_mutations, vec![]), }; let request = v2::CheckAndMutateRowRequest { @@ -995,7 +1063,7 @@ impl HighVolumeBackend for BigTableBackend { .await? .predicate_matched; - Ok(predicate_matched == success_on_match) + Ok(predicate_matched != invert) } } @@ -1595,7 +1663,7 @@ mod tests { // --- Section 4: Compare-and-Write --- - /// Creating a tombstone on an empty row succeeds; a second attempt fails. + /// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds. /// /// After creation, both tiered and legacy APIs reflect the tombstone. #[tokio::test] @@ -1637,11 +1705,11 @@ mod tests { Err(Error::UnexpectedTombstone) )); - // Second create fails: tombstone already exists. + // Idempotent retry: retry with the same target succeeds let second = backend .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) .await?; - assert!(!second, "second create must fail: tombstone already exists"); + assert!(second, "idempotent retry"); Ok(()) } @@ -1668,7 +1736,7 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }); let swapped = backend - .compare_and_write(&hv_id, Some(&wrong_lt_id), write) + .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone()) .await?; assert!(!swapped, "expected CAS failure due to wrong target"); match backend.get_tiered_metadata(&hv_id).await? { @@ -1677,12 +1745,8 @@ mod tests { } // Correct target: CAS succeeds, target updated. - let write = TieredWrite::Tombstone(Tombstone { - target: new_lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }); let swapped = backend - .compare_and_write(&hv_id, Some(&old_lt_id), write) + .compare_and_write(&hv_id, Some(&old_lt_id), write.clone()) .await?; assert!(swapped, "expected CAS success with correct target"); match backend.get_tiered_metadata(&hv_id).await? { @@ -1690,6 +1754,12 @@ mod tests { other => panic!("expected tombstone, got {other:?}"), } + // Idempotent retry: same A→B swap returns true. + let retry = backend + .compare_and_write(&hv_id, Some(&old_lt_id), write) + .await?; + assert!(retry, "idempotent retry"); + Ok(()) } @@ -1722,13 +1792,19 @@ mod tests { // Correct target: CAS succeeds, row becomes an inline object. let payload = Bytes::from_static(b"hello inline"); let write = TieredWrite::Object(Metadata::default(), payload.clone()); - let swapped = backend.compare_and_write(&id, Some(<_id), write).await?; + let swapped = backend + .compare_and_write(&id, Some(<_id), write.clone()) + .await?; assert!(swapped, "expected CAS success with correct target"); let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { panic!("expected inline object after swap"); }; assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref()); + // Idempotent retry: row is already inline (no tombstone), same CAS returns true. + let retry = backend.compare_and_write(&id, Some(<_id), write).await?; + assert!(retry, "idempotent retry"); + Ok(()) } @@ -1752,7 +1828,6 @@ mod tests { } /// CAS-delete: wrong expected → false; correct expected → true, row gone. - /// CAS-delete with Some(target) against a regular object also returns false. #[tokio::test] async fn test_cas_delete() -> Result<()> { let backend = create_test_backend().await?; @@ -1787,7 +1862,13 @@ mod tests { TieredMetadata::NotFound )); - // Regular object: CAS-delete with Some(target) returns false, object preserved. + // Idempotent retry: row is already absent (no tombstone), same delete returns true. + let retry = backend + .compare_and_write(&id, Some(<_id), TieredWrite::Delete) + .await?; + assert!(retry, "idempotent retry"); + + // Inline object replaced tombstone: Safe to delete since it is an idempotent operation. let id2 = make_id(); let fake_lt_id = ObjectId::random(id2.context().clone()); let metadata = Metadata::default(); @@ -1795,8 +1876,7 @@ mod tests { let deleted = backend .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete) .await?; - assert!(!deleted, "expected false: row is not a tombstone"); - assert!(backend.get_object(&id2).await?.is_some()); + assert!(deleted, "expected idempotent deletion"); Ok(()) } diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index a950e47b..0c9f3263 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -105,8 +105,12 @@ pub trait HighVolumeBackend: Backend { /// - `Some(target)`: succeeds only if a tombstone exists whose redirect /// resolves to `target`. /// - /// On match, applies `write`. Returns `true` on success, `false` if the - /// precondition was not met (row state changed concurrently). + /// **This operation is idempotent:** if the object is already in the target + /// state, it returns `true`. Whether the mutation runs again is up to the + /// implementation. + /// + /// Returns `true` on success or idempotent match, `false` if a conflicting + /// state was found (another writer won the race). async fn compare_and_write( &self, id: &ObjectId, @@ -163,7 +167,7 @@ pub enum TieredMetadata { } /// The write operation performed by [`HighVolumeBackend::compare_and_write`]. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum TieredWrite { /// Write a redirect tombstone. Tombstone(Tombstone), @@ -173,6 +177,16 @@ pub enum TieredWrite { Delete, } +impl TieredWrite { + /// Returns the tombstone target if this is a tombstone write, or `None` otherwise. + pub fn target(&self) -> Option<&ObjectId> { + match self { + TieredWrite::Tombstone(t) => Some(&t.target), + _ => None, + } + } +} + /// Creates a reqwest client with required defaults. /// /// Automatic decompression is disabled because backends store pre-compressed diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 46dfcd07..5536f3b4 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -172,17 +172,12 @@ impl HighVolumeBackend for InMemoryBackend { write: TieredWrite, ) -> Result { let mut store = self.store.lock().unwrap(); - let actual = store.get(id); - let matches = match current { - None => !matches!(actual, Some(StoreEntry::Tombstone(_))), - Some(target) => matches!( - actual, - Some(StoreEntry::Tombstone(t)) if t.target == *target - ), - }; + let actual = store.get(id); + let matches_current = matches_redirect(actual, current); + let matches_next = matches_redirect(actual, write.target()); - if matches { + if matches_current { match write { TieredWrite::Tombstone(tombstone) => { store.insert(id.clone(), StoreEntry::Tombstone(tombstone)); @@ -196,7 +191,18 @@ impl HighVolumeBackend for InMemoryBackend { } } - Ok(matches) + Ok(matches_current || matches_next) + } +} + +/// Returns `true` if `entry` matches the expected tombstone redirect state. +/// +/// - `expected = None`: matches any non-tombstone (absent or inline object). +/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`. +fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool { + match expected { + None => matches!(entry, Some(StoreEntry::Object { .. }) | None), + Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target), } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 94432e44..43889968 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -81,6 +81,15 @@ //! CAS conflicts are therefore **not errors**: the losing writer's data is //! cleaned up and `Ok` is returned, because the result is indistinguishable //! from having succeeded a moment earlier and then been overwritten. +//! +//! ### Idempotency +//! +//! `compare_and_write` is idempotent: if the row is already in the target state, it +//! returns `true` without re-applying the mutation. This is critical for retry +//! safety. If the server commits a write but the response is lost, a retry sees the +//! already-mutated state and still returns `true` — so callers do not mistakenly +//! treat a successful commit as a lost race and clean up data that was actually +//! persisted. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering};