From ae1732221643afc9272ff918fdbcad84e0640302 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 16:32:01 +0100 Subject: [PATCH 1/3] fix(service): Make compare_and_write idempotent --- objectstore-service/src/backend/bigtable.rs | 150 ++++++++++++++++--- objectstore-service/src/backend/common.rs | 10 +- objectstore-service/src/backend/in_memory.rs | 29 ++-- objectstore-service/src/backend/tiered.rs | 9 ++ 4 files changed, 164 insertions(+), 34 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index ed20935f..d5644714 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -293,6 +293,76 @@ fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter } } +/// Combines multiple filters with OR semantics: passes cells if any sub-filter matches. +fn interleave_filter(filters: Vec) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Interleave( + v2::row_filter::Interleave { filters }, + )), + } +} + +/// Matches rows where no conflicting tombstone exists: either no tombstone, +/// or the redirect already points to `target`. +/// +/// Equivalent to `!t || t == target`. Both safe states produce 0 output cells, +/// so `predicate_matched` is `false` in both cases — pair this filter with +/// `false_mutations`. +/// +/// Note: `!t` includes both absent rows and inline-object rows. This is safe +/// for tombstone and object writes (write is a no-op or overwrites with the +/// same data), but NOT for deletes — use [`delete_compatible_filter`] instead. +fn redirect_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Condition(Box::new( + v2::row_filter::Condition { + predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))), + // t == target → block all → 0 cells (safe, already in target state) + true_filter: Some(Box::new(v2::RowFilter { + filter: Some(v2::row_filter::Filter::BlockAllFilter(true)), + })), + // !t → tombstone_predicate → 0 cells (safe, no tombstone) + // t != target → tombstone_predicate → 1+ cells (conflict) + false_filter: Some(Box::new(tombstone_predicate())), + }, + ))), + } +} + +/// Matches rows that are safe targets for a CAS-delete when the expected state +/// is a tombstone pointing to `target`. +/// +/// Safe states (produce 0 cells, write fires): +/// - `t == target`: the expected tombstone exists — delete it. +/// - Row absent: the tombstone was already deleted — delete is a no-op. +/// +/// Conflict states (produce 1+ cells, write does not fire): +/// - Inline object (`m` column present): another writer replaced the tombstone. +/// - Wrong tombstone (`r` column present): a different redirect is in place. +/// +/// Unlike [`redirect_compatible_filter`], this never fires on inline objects, +/// making it safe to use with `TieredWrite::Delete`. +fn delete_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Condition(Box::new( + v2::row_filter::Condition { + predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))), + // t == target → block all → 0 cells (safe, delete should fire) + true_filter: Some(Box::new(v2::RowFilter { + filter: Some(v2::row_filter::Filter::BlockAllFilter(true)), + })), + // Not t == target: check whether the row has any real content. + // Inline objects have `m`; wrong tombstones have `r`. + // Absent rows have neither → 0 cells → delete fires as a no-op. + false_filter: Some(Box::new(interleave_filter(vec![ + column_filter(COLUMN_METADATA), + column_filter(COLUMN_REDIRECT), + ]))), + }, + ))), + } +} + /// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`). /// /// Used by metadata-only reads to avoid fetching the (potentially large) payload column @@ -958,23 +1028,45 @@ impl HighVolumeBackend for BigTableBackend { let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); + // Capture write characteristics before consuming `write` for mutations. + let next_target = match &write { + TieredWrite::Tombstone(t) => Some(t.target.clone()), + _ => None, + }; + let is_delete = matches!(write, TieredWrite::Delete); + let write_mutations = match write { TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(), TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(), TieredWrite::Delete => vec![delete_row_mutation()], }; - let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current { - // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. - // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). - None => (tombstone_predicate(), vec![], write_mutations, false), - // Success = redirect matches target. Predicate match = CAS success. - Some(target) => ( - redirect_target_filter(target, id), - write_mutations, - vec![], - true, - ), + // An empty row always yields 0 cells, so predicate_matched is always false for + // absent rows. When that is the safe state, write in false_mutations; when it is a + // conflict, write in true_mutations. + let empty_row_is_safe = current.is_none() || next_target.is_none(); + + let predicate_filter = match (current, next_target.as_ref()) { + // t == old || t == new (empty row → conflict) + (Some(old), Some(new)) => interleave_filter(vec![ + redirect_target_filter(old, id), + redirect_target_filter(new, id), + ]), + // !t || t == new (empty row → safe) + (None, Some(new)) => redirect_compatible_filter(new, id), + // t == old OR absent row (inline object or wrong tombstone → conflict). + // Uses delete_compatible_filter to avoid firing on inline objects. + (Some(old), None) if is_delete => delete_compatible_filter(old, id), + // !t || t == old (empty row → safe; inline object → safe, overwrite) + (Some(target), None) => redirect_compatible_filter(target, id), + // !t (empty row → safe) + (None, None) => tombstone_predicate(), + }; + + let (true_mutations, false_mutations) = if empty_row_is_safe { + (vec![], write_mutations) + } else { + (write_mutations, vec![]) }; let request = v2::CheckAndMutateRowRequest { @@ -995,7 +1087,7 @@ impl HighVolumeBackend for BigTableBackend { .await? .predicate_matched; - Ok(predicate_matched == success_on_match) + Ok(predicate_matched != empty_row_is_safe) } } @@ -1595,7 +1687,7 @@ mod tests { // --- Section 4: Compare-and-Write --- - /// Creating a tombstone on an empty row succeeds; a second attempt fails. + /// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds. /// /// After creation, both tiered and legacy APIs reflect the tombstone. #[tokio::test] @@ -1637,11 +1729,11 @@ mod tests { Err(Error::UnexpectedTombstone) )); - // Second create fails: tombstone already exists. + // Idempotent retry: retry with the same target succeeds let second = backend .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) .await?; - assert!(!second, "second create must fail: tombstone already exists"); + assert!(second, "idempotent retry"); Ok(()) } @@ -1668,7 +1760,7 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }); let swapped = backend - .compare_and_write(&hv_id, Some(&wrong_lt_id), write) + .compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone()) .await?; assert!(!swapped, "expected CAS failure due to wrong target"); match backend.get_tiered_metadata(&hv_id).await? { @@ -1677,12 +1769,8 @@ mod tests { } // Correct target: CAS succeeds, target updated. - let write = TieredWrite::Tombstone(Tombstone { - target: new_lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }); let swapped = backend - .compare_and_write(&hv_id, Some(&old_lt_id), write) + .compare_and_write(&hv_id, Some(&old_lt_id), write.clone()) .await?; assert!(swapped, "expected CAS success with correct target"); match backend.get_tiered_metadata(&hv_id).await? { @@ -1690,6 +1778,12 @@ mod tests { other => panic!("expected tombstone, got {other:?}"), } + // Idempotent retry: same A→B swap returns true. + let retry = backend + .compare_and_write(&hv_id, Some(&old_lt_id), write) + .await?; + assert!(retry, "idempotent retry"); + Ok(()) } @@ -1722,13 +1816,19 @@ mod tests { // Correct target: CAS succeeds, row becomes an inline object. let payload = Bytes::from_static(b"hello inline"); let write = TieredWrite::Object(Metadata::default(), payload.clone()); - let swapped = backend.compare_and_write(&id, Some(<_id), write).await?; + let swapped = backend + .compare_and_write(&id, Some(<_id), write.clone()) + .await?; assert!(swapped, "expected CAS success with correct target"); let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { panic!("expected inline object after swap"); }; assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref()); + // Idempotent retry: row is already inline (no tombstone), same CAS returns true. + let retry = backend.compare_and_write(&id, Some(<_id), write).await?; + assert!(retry, "idempotent retry"); + Ok(()) } @@ -1787,6 +1887,12 @@ mod tests { TieredMetadata::NotFound )); + // Idempotent retry: row is already absent (no tombstone), same delete returns true. + let retry = backend + .compare_and_write(&id, Some(<_id), TieredWrite::Delete) + .await?; + assert!(retry, "idempotent retry"); + // Regular object: CAS-delete with Some(target) returns false, object preserved. let id2 = make_id(); let fake_lt_id = ObjectId::random(id2.context().clone()); diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index a950e47b..75167499 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -105,8 +105,12 @@ pub trait HighVolumeBackend: Backend { /// - `Some(target)`: succeeds only if a tombstone exists whose redirect /// resolves to `target`. /// - /// On match, applies `write`. Returns `true` on success, `false` if the - /// precondition was not met (row state changed concurrently). + /// **This operation is idempotent:** if the object is already in the target + /// state, it returns `true`. Whether the mutation runs again is up to the + /// implementation. + /// + /// Returns `true` on success or idempotent match, `false` if a conflicting + /// state was found (another writer won the race). async fn compare_and_write( &self, id: &ObjectId, @@ -163,7 +167,7 @@ pub enum TieredMetadata { } /// The write operation performed by [`HighVolumeBackend::compare_and_write`]. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum TieredWrite { /// Write a redirect tombstone. Tombstone(Tombstone), diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 46dfcd07..753e1624 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -172,17 +172,17 @@ impl HighVolumeBackend for InMemoryBackend { write: TieredWrite, ) -> Result { let mut store = self.store.lock().unwrap(); - let actual = store.get(id); - let matches = match current { - None => !matches!(actual, Some(StoreEntry::Tombstone(_))), - Some(target) => matches!( - actual, - Some(StoreEntry::Tombstone(t)) if t.target == *target - ), + let actual = store.get(id); + let next = match write { + TieredWrite::Tombstone(ref t) => Some(&t.target), + _ => None, }; - if matches { + let matches_current = matches_redirect(actual, current); + let matches_next = matches_redirect(actual, next); + + if matches_current { match write { TieredWrite::Tombstone(tombstone) => { store.insert(id.clone(), StoreEntry::Tombstone(tombstone)); @@ -196,7 +196,18 @@ impl HighVolumeBackend for InMemoryBackend { } } - Ok(matches) + Ok(matches_current || matches_next) + } +} + +/// Returns `true` if `entry` matches the expected tombstone redirect state. +/// +/// - `expected = None`: matches any non-tombstone (absent or inline object). +/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`. +fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool { + match expected { + None => matches!(entry, Some(StoreEntry::Object { .. }) | None), + Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target), } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 94432e44..43889968 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -81,6 +81,15 @@ //! CAS conflicts are therefore **not errors**: the losing writer's data is //! cleaned up and `Ok` is returned, because the result is indistinguishable //! from having succeeded a moment earlier and then been overwritten. +//! +//! ### Idempotency +//! +//! `compare_and_write` is idempotent: if the row is already in the target state, it +//! returns `true` without re-applying the mutation. This is critical for retry +//! safety. If the server commits a write but the response is lost, a retry sees the +//! already-mutated state and still returns `true` — so callers do not mistakenly +//! treat a successful commit as a lost race and clean up data that was actually +//! persisted. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; From 4043539410c7ba8cfd2d8a78d7f314a6e4c89f90 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 16:47:17 +0100 Subject: [PATCH 2/3] ref(service): Simplify CAS predicate selection --- objectstore-service/src/backend/bigtable.rs | 56 ++------------------- 1 file changed, 5 insertions(+), 51 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index d5644714..ec1e08cc 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -308,10 +308,6 @@ fn interleave_filter(filters: Vec) -> v2::RowFilter { /// Equivalent to `!t || t == target`. Both safe states produce 0 output cells, /// so `predicate_matched` is `false` in both cases — pair this filter with /// `false_mutations`. -/// -/// Note: `!t` includes both absent rows and inline-object rows. This is safe -/// for tombstone and object writes (write is a no-op or overwrites with the -/// same data), but NOT for deletes — use [`delete_compatible_filter`] instead. fn redirect_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { v2::RowFilter { filter: Some(v2::row_filter::Filter::Condition(Box::new( @@ -329,40 +325,6 @@ fn redirect_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFi } } -/// Matches rows that are safe targets for a CAS-delete when the expected state -/// is a tombstone pointing to `target`. -/// -/// Safe states (produce 0 cells, write fires): -/// - `t == target`: the expected tombstone exists — delete it. -/// - Row absent: the tombstone was already deleted — delete is a no-op. -/// -/// Conflict states (produce 1+ cells, write does not fire): -/// - Inline object (`m` column present): another writer replaced the tombstone. -/// - Wrong tombstone (`r` column present): a different redirect is in place. -/// -/// Unlike [`redirect_compatible_filter`], this never fires on inline objects, -/// making it safe to use with `TieredWrite::Delete`. -fn delete_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { - v2::RowFilter { - filter: Some(v2::row_filter::Filter::Condition(Box::new( - v2::row_filter::Condition { - predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))), - // t == target → block all → 0 cells (safe, delete should fire) - true_filter: Some(Box::new(v2::RowFilter { - filter: Some(v2::row_filter::Filter::BlockAllFilter(true)), - })), - // Not t == target: check whether the row has any real content. - // Inline objects have `m`; wrong tombstones have `r`. - // Absent rows have neither → 0 cells → delete fires as a no-op. - false_filter: Some(Box::new(interleave_filter(vec![ - column_filter(COLUMN_METADATA), - column_filter(COLUMN_REDIRECT), - ]))), - }, - ))), - } -} - /// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`). /// /// Used by metadata-only reads to avoid fetching the (potentially large) payload column @@ -1028,12 +990,11 @@ impl HighVolumeBackend for BigTableBackend { let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); - // Capture write characteristics before consuming `write` for mutations. + // Clone the next target before consuming `write` for mutations. let next_target = match &write { TieredWrite::Tombstone(t) => Some(t.target.clone()), _ => None, }; - let is_delete = matches!(write, TieredWrite::Delete); let write_mutations = match write { TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(), @@ -1052,13 +1013,8 @@ impl HighVolumeBackend for BigTableBackend { redirect_target_filter(old, id), redirect_target_filter(new, id), ]), - // !t || t == new (empty row → safe) - (None, Some(new)) => redirect_compatible_filter(new, id), - // t == old OR absent row (inline object or wrong tombstone → conflict). - // Uses delete_compatible_filter to avoid firing on inline objects. - (Some(old), None) if is_delete => delete_compatible_filter(old, id), - // !t || t == old (empty row → safe; inline object → safe, overwrite) - (Some(target), None) => redirect_compatible_filter(target, id), + // !t || t == target (empty row → safe) + (Some(target), None) | (None, Some(target)) => redirect_compatible_filter(target, id), // !t (empty row → safe) (None, None) => tombstone_predicate(), }; @@ -1852,7 +1808,6 @@ mod tests { } /// CAS-delete: wrong expected → false; correct expected → true, row gone. - /// CAS-delete with Some(target) against a regular object also returns false. #[tokio::test] async fn test_cas_delete() -> Result<()> { let backend = create_test_backend().await?; @@ -1893,7 +1848,7 @@ mod tests { .await?; assert!(retry, "idempotent retry"); - // Regular object: CAS-delete with Some(target) returns false, object preserved. + // Inline object replaced tombstone: Safe to delete since it is an idempotent operation. let id2 = make_id(); let fake_lt_id = ObjectId::random(id2.context().clone()); let metadata = Metadata::default(); @@ -1901,8 +1856,7 @@ mod tests { let deleted = backend .compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete) .await?; - assert!(!deleted, "expected false: row is not a tombstone"); - assert!(backend.get_object(&id2).await?.is_some()); + assert!(deleted, "expected idempotent deletion"); Ok(()) } From 71523ca7861702a214400929a440412c512b120d Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 17:16:50 +0100 Subject: [PATCH 3/3] ref(service): Rename and document CAS predicate filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Give the two CAS predicate builders semantic names that reflect what they test rather than how they're built: `interleave_filter` → `update_filter`, `redirect_compatible_filter` → `optional_target_filter`. Add `## Predicate Matches` / `## Details` doc sections to all four filter helpers so callers know which mutations branch to pair them with. Extract `TieredWrite::target()` to avoid duplicating the match in every call site. --- objectstore-service/src/backend/bigtable.rs | 98 ++++++++++++-------- objectstore-service/src/backend/common.rs | 10 ++ objectstore-service/src/backend/in_memory.rs | 7 +- 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index ec1e08cc..95a50ab1 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -212,6 +212,12 @@ fn legacy_tombstone_filter() -> v2::RowFilter { /// Creates a row filter that matches any tombstone row, new- or legacy-format. /// +/// ## Predicate Matches +/// +/// Must be used with `false_mutations` and `predicate_matched == false`. +/// +/// ## Details +/// /// New format: presence of the `r` column. /// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON. /// @@ -241,7 +247,13 @@ fn exact_value_regex(value: &str) -> Vec { format!("^{}$", regex::escape(value)).into_bytes() } -/// Creates a row filter that matches tombstones whose redirect resolves to `target`. +/// Matches tombstones whose redirect resolves to `target`. +/// +/// ## Predicate Matches +/// +/// Must be used with `true_mutations` and `predicate_matched == true`. +/// +/// ## Details /// /// Always includes an exact match on the `r` (redirect) column: /// - Chain: `r` column present AND value == `target` storage path @@ -293,32 +305,56 @@ fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter } } -/// Combines multiple filters with OR semantics: passes cells if any sub-filter matches. -fn interleave_filter(filters: Vec) -> v2::RowFilter { +/// Matches tombstones whose redirect resolves to either `old` or `new`. +/// +/// ## Predicate Matches +/// +/// Must be used with `true_mutations` and `predicate_matched == true`. +/// +/// ## Details +/// +/// Equivalent to `t == old || t == new`. Built as an Interleave of two +/// [`redirect_target_filter`] calls — yields cells iff at least one branch +/// matches. An absent row or non-tombstone row yields 0 cells, so +/// `predicate_matched = false` (conflict). +fn update_filter(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { v2::RowFilter { filter: Some(v2::row_filter::Filter::Interleave( - v2::row_filter::Interleave { filters }, + v2::row_filter::Interleave { + filters: vec![ + redirect_target_filter(old, own_id), + redirect_target_filter(new, own_id), + ], + }, )), } } -/// Matches rows where no conflicting tombstone exists: either no tombstone, -/// or the redirect already points to `target`. +/// Matches rows where no conflicting tombstone exists: +/// +/// ## Predicate Matches +/// +/// Must be used with `false_mutations` and `predicate_matched == false`. +/// +/// ## Details /// -/// Equivalent to `!t || t == target`. Both safe states produce 0 output cells, -/// so `predicate_matched` is `false` in both cases — pair this filter with -/// `false_mutations`. -fn redirect_compatible_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { +/// Matches either no tombstone, or the redirect already points to `target`. +/// Built as an inverted `Condition` filter: +/// +/// - Predicate: [`redirect_target_filter`]`(target)` — does the row have a +/// tombstone pointing to `target`? +/// - True branch: `BlockAllFilter` → 0 cells. +/// - False branch: [`tombstone_predicate`] → 0 cells when no tombstone +/// +/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases. +fn optional_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { v2::RowFilter { filter: Some(v2::row_filter::Filter::Condition(Box::new( v2::row_filter::Condition { predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))), - // t == target → block all → 0 cells (safe, already in target state) true_filter: Some(Box::new(v2::RowFilter { filter: Some(v2::row_filter::Filter::BlockAllFilter(true)), })), - // !t → tombstone_predicate → 0 cells (safe, no tombstone) - // t != target → tombstone_predicate → 1+ cells (conflict) false_filter: Some(Box::new(tombstone_predicate())), }, ))), @@ -990,10 +1026,12 @@ impl HighVolumeBackend for BigTableBackend { let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); - // Clone the next target before consuming `write` for mutations. - let next_target = match &write { - TieredWrite::Tombstone(t) => Some(t.target.clone()), - _ => None, + // `invert`: filters that use false_mutations (predicate_matched == false means safe). + let (predicate_filter, invert) = match (current, write.target()) { + (Some(old), Some(new)) => (update_filter(old, new, id), false), + (Some(target), None) => (optional_target_filter(target, id), true), + (None, Some(target)) => (optional_target_filter(target, id), true), + (None, None) => (tombstone_predicate(), true), }; let write_mutations = match write { @@ -1002,27 +1040,9 @@ impl HighVolumeBackend for BigTableBackend { TieredWrite::Delete => vec![delete_row_mutation()], }; - // An empty row always yields 0 cells, so predicate_matched is always false for - // absent rows. When that is the safe state, write in false_mutations; when it is a - // conflict, write in true_mutations. - let empty_row_is_safe = current.is_none() || next_target.is_none(); - - let predicate_filter = match (current, next_target.as_ref()) { - // t == old || t == new (empty row → conflict) - (Some(old), Some(new)) => interleave_filter(vec![ - redirect_target_filter(old, id), - redirect_target_filter(new, id), - ]), - // !t || t == target (empty row → safe) - (Some(target), None) | (None, Some(target)) => redirect_compatible_filter(target, id), - // !t (empty row → safe) - (None, None) => tombstone_predicate(), - }; - - let (true_mutations, false_mutations) = if empty_row_is_safe { - (vec![], write_mutations) - } else { - (write_mutations, vec![]) + let (true_mutations, false_mutations) = match invert { + true => (vec![], write_mutations), + false => (write_mutations, vec![]), }; let request = v2::CheckAndMutateRowRequest { @@ -1043,7 +1063,7 @@ impl HighVolumeBackend for BigTableBackend { .await? .predicate_matched; - Ok(predicate_matched != empty_row_is_safe) + Ok(predicate_matched != invert) } } diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 75167499..0c9f3263 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -177,6 +177,16 @@ pub enum TieredWrite { Delete, } +impl TieredWrite { + /// Returns the tombstone target if this is a tombstone write, or `None` otherwise. + pub fn target(&self) -> Option<&ObjectId> { + match self { + TieredWrite::Tombstone(t) => Some(&t.target), + _ => None, + } + } +} + /// Creates a reqwest client with required defaults. /// /// Automatic decompression is disabled because backends store pre-compressed diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 753e1624..5536f3b4 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -174,13 +174,8 @@ impl HighVolumeBackend for InMemoryBackend { let mut store = self.store.lock().unwrap(); let actual = store.get(id); - let next = match write { - TieredWrite::Tombstone(ref t) => Some(&t.target), - _ => None, - }; - let matches_current = matches_redirect(actual, current); - let matches_next = matches_redirect(actual, next); + let matches_next = matches_redirect(actual, write.target()); if matches_current { match write {