Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 107 additions & 27 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ fn legacy_tombstone_filter() -> v2::RowFilter {

/// Creates a row filter that matches any tombstone row, new- or legacy-format.
///
/// ## Predicate Matches
///
/// Must be used with `false_mutations` and `predicate_matched == false`.
///
/// ## Details
///
/// New format: presence of the `r` column.
/// Legacy format: `is_redirect_tombstone: true` in the `m` column JSON.
///
Expand Down Expand Up @@ -241,7 +247,13 @@ fn exact_value_regex(value: &str) -> Vec<u8> {
format!("^{}$", regex::escape(value)).into_bytes()
}

/// Creates a row filter that matches tombstones whose redirect resolves to `target`.
/// Matches tombstones whose redirect resolves to `target`.
///
/// ## Predicate Matches
///
/// Must be used with `true_mutations` and `predicate_matched == true`.
///
/// ## Details
///
/// Always includes an exact match on the `r` (redirect) column:
/// - Chain: `r` column present AND value == `target` storage path
Expand Down Expand Up @@ -293,6 +305,62 @@ fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter
}
}

/// Matches tombstones whose redirect resolves to either `old` or `new`.
///
/// ## Predicate Matches
///
/// Must be used with `true_mutations` and `predicate_matched == true`.
///
/// ## Details
///
/// Equivalent to `t == old || t == new`. Built as an Interleave of two
/// [`redirect_target_filter`] calls — yields cells iff at least one branch
/// matches. An absent row or non-tombstone row yields 0 cells, so
/// `predicate_matched = false` (conflict).
fn update_filter(old: &ObjectId, new: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
v2::RowFilter {
filter: Some(v2::row_filter::Filter::Interleave(
v2::row_filter::Interleave {
filters: vec![
redirect_target_filter(old, own_id),
redirect_target_filter(new, own_id),
],
},
)),
}
}

/// Matches rows where no conflicting tombstone exists:
///
/// ## Predicate Matches
///
/// Must be used with `false_mutations` and `predicate_matched == false`.
///
/// ## Details
///
/// Matches either no tombstone, or the redirect already points to `target`.
/// Built as an inverted `Condition` filter:
///
/// - Predicate: [`redirect_target_filter`]`(target)` — does the row have a
/// tombstone pointing to `target`?
/// - True branch: `BlockAllFilter` → 0 cells.
/// - False branch: [`tombstone_predicate`] → 0 cells when no tombstone
///
/// Both safe states yield 0 cells, so `predicate_matched = false` in both cases.
fn optional_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter {
v2::RowFilter {
filter: Some(v2::row_filter::Filter::Condition(Box::new(
v2::row_filter::Condition {
predicate_filter: Some(Box::new(redirect_target_filter(target, own_id))),
true_filter: Some(Box::new(v2::RowFilter {
filter: Some(v2::row_filter::Filter::BlockAllFilter(true)),
})),
false_filter: Some(Box::new(tombstone_predicate())),
},
))),
}
}

/// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`).
///
/// Used by metadata-only reads to avoid fetching the (potentially large) payload column
Expand Down Expand Up @@ -958,23 +1026,23 @@ impl HighVolumeBackend for BigTableBackend {
let path = id.as_storage_path().to_string().into_bytes();
let now = SystemTime::now();

// `invert`: filters that use false_mutations (predicate_matched == false means safe).
let (predicate_filter, invert) = match (current, write.target()) {
Comment on lines +1029 to +1030
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invert flag existed before and is a necessary evil. I tried to get rid of it, but CheckAndMutate only returns predicate_matched if at least one row matched the predicate. So it's not possible to create a negative match. The workaround is to use false_mutations and invert the flag.

I will eventually follow up by typing this properly. The predicates will become either a trait or a struct that has the necessary methods to generate the correct CheckAndMutateRowRequest along with the correct predicate matching.

This can all be deferred to a follow-up.

(Some(old), Some(new)) => (update_filter(old, new, id), false),
(Some(target), None) => (optional_target_filter(target, id), true),
(None, Some(target)) => (optional_target_filter(target, id), true),
(None, None) => (tombstone_predicate(), true),
};

let write_mutations = match write {
TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
TieredWrite::Delete => vec![delete_row_mutation()],
};

let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current {
// Success = no tombstone. tombstone_predicate matches = tombstone found = fail.
// Write on false_mutations (no tombstone); noop on true_mutations (tombstone).
None => (tombstone_predicate(), vec![], write_mutations, false),
// Success = redirect matches target. Predicate match = CAS success.
Some(target) => (
redirect_target_filter(target, id),
write_mutations,
vec![],
true,
),
let (true_mutations, false_mutations) = match invert {
true => (vec![], write_mutations),
false => (write_mutations, vec![]),
};

let request = v2::CheckAndMutateRowRequest {
Expand All @@ -995,7 +1063,7 @@ impl HighVolumeBackend for BigTableBackend {
.await?
.predicate_matched;

Ok(predicate_matched == success_on_match)
Ok(predicate_matched != invert)
}
}

Expand Down Expand Up @@ -1595,7 +1663,7 @@ mod tests {

// --- Section 4: Compare-and-Write ---

/// Creating a tombstone on an empty row succeeds; a second attempt fails.
/// Creating a tombstone on an empty row succeeds; a retry of the same CAS also succeeds.
///
/// After creation, both tiered and legacy APIs reflect the tombstone.
#[tokio::test]
Expand Down Expand Up @@ -1637,11 +1705,11 @@ mod tests {
Err(Error::UnexpectedTombstone)
));

// Second create fails: tombstone already exists.
// Idempotent retry: retry with the same target succeeds
let second = backend
.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone))
.await?;
assert!(!second, "second create must fail: tombstone already exists");
assert!(second, "idempotent retry");

Ok(())
}
Expand All @@ -1668,7 +1736,7 @@ mod tests {
expiration_policy: ExpirationPolicy::Manual,
});
let swapped = backend
.compare_and_write(&hv_id, Some(&wrong_lt_id), write)
.compare_and_write(&hv_id, Some(&wrong_lt_id), write.clone())
.await?;
assert!(!swapped, "expected CAS failure due to wrong target");
match backend.get_tiered_metadata(&hv_id).await? {
Expand All @@ -1677,19 +1745,21 @@ mod tests {
}

// Correct target: CAS succeeds, target updated.
let write = TieredWrite::Tombstone(Tombstone {
target: new_lt_id.clone(),
expiration_policy: ExpirationPolicy::Manual,
});
let swapped = backend
.compare_and_write(&hv_id, Some(&old_lt_id), write)
.compare_and_write(&hv_id, Some(&old_lt_id), write.clone())
.await?;
assert!(swapped, "expected CAS success with correct target");
match backend.get_tiered_metadata(&hv_id).await? {
TieredMetadata::Tombstone(t) => assert_eq!(t.target, new_lt_id),
other => panic!("expected tombstone, got {other:?}"),
}

// Idempotent retry: same A→B swap returns true.
let retry = backend
.compare_and_write(&hv_id, Some(&old_lt_id), write)
.await?;
assert!(retry, "idempotent retry");

Ok(())
}

Expand Down Expand Up @@ -1722,13 +1792,19 @@ mod tests {
// Correct target: CAS succeeds, row becomes an inline object.
let payload = Bytes::from_static(b"hello inline");
let write = TieredWrite::Object(Metadata::default(), payload.clone());
let swapped = backend.compare_and_write(&id, Some(&lt_id), write).await?;
let swapped = backend
.compare_and_write(&id, Some(&lt_id), write.clone())
.await?;
assert!(swapped, "expected CAS success with correct target");
let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else {
panic!("expected inline object after swap");
};
assert_eq!(&stream::read_to_vec(stream).await?, payload.as_ref());

// Idempotent retry: row is already inline (no tombstone), same CAS returns true.
let retry = backend.compare_and_write(&id, Some(&lt_id), write).await?;
assert!(retry, "idempotent retry");

Ok(())
}

Expand All @@ -1752,7 +1828,6 @@ mod tests {
}

/// CAS-delete: wrong expected → false; correct expected → true, row gone.
/// CAS-delete with Some(target) against a regular object also returns false.
#[tokio::test]
async fn test_cas_delete() -> Result<()> {
let backend = create_test_backend().await?;
Expand Down Expand Up @@ -1787,16 +1862,21 @@ mod tests {
TieredMetadata::NotFound
));

// Regular object: CAS-delete with Some(target) returns false, object preserved.
// Idempotent retry: row is already absent (no tombstone), same delete returns true.
let retry = backend
.compare_and_write(&id, Some(&lt_id), TieredWrite::Delete)
.await?;
assert!(retry, "idempotent retry");

// Inline object replaced tombstone: Safe to delete since it is an idempotent operation.
let id2 = make_id();
let fake_lt_id = ObjectId::random(id2.context().clone());
let metadata = Metadata::default();
create_object(&backend, &id2, &metadata, b"data", SystemTime::now()).await?;
let deleted = backend
.compare_and_write(&id2, Some(&fake_lt_id), TieredWrite::Delete)
.await?;
assert!(!deleted, "expected false: row is not a tombstone");
assert!(backend.get_object(&id2).await?.is_some());
assert!(deleted, "expected idempotent deletion");

Ok(())
}
Expand Down
20 changes: 17 additions & 3 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ pub trait HighVolumeBackend: Backend {
/// - `Some(target)`: succeeds only if a tombstone exists whose redirect
/// resolves to `target`.
///
/// On match, applies `write`. Returns `true` on success, `false` if the
/// precondition was not met (row state changed concurrently).
/// **This operation is idempotent:** if the object is already in the target
/// state, it returns `true`. Whether the mutation runs again is up to the
/// implementation.
///
/// Returns `true` on success or idempotent match, `false` if a conflicting
/// state was found (another writer won the race).
async fn compare_and_write(
&self,
id: &ObjectId,
Expand Down Expand Up @@ -163,7 +167,7 @@ pub enum TieredMetadata {
}

/// The write operation performed by [`HighVolumeBackend::compare_and_write`].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum TieredWrite {
/// Write a redirect tombstone.
Tombstone(Tombstone),
Expand All @@ -173,6 +177,16 @@ pub enum TieredWrite {
Delete,
}

impl TieredWrite {
/// Returns the tombstone target if this is a tombstone write, or `None` otherwise.
pub fn target(&self) -> Option<&ObjectId> {
match self {
TieredWrite::Tombstone(t) => Some(&t.target),
_ => None,
}
}
}

/// Creates a reqwest client with required defaults.
///
/// Automatic decompression is disabled because backends store pre-compressed
Expand Down
26 changes: 16 additions & 10 deletions objectstore-service/src/backend/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,12 @@ impl HighVolumeBackend for InMemoryBackend {
write: TieredWrite,
) -> Result<bool> {
let mut store = self.store.lock().unwrap();
let actual = store.get(id);

let matches = match current {
None => !matches!(actual, Some(StoreEntry::Tombstone(_))),
Some(target) => matches!(
actual,
Some(StoreEntry::Tombstone(t)) if t.target == *target
),
};
let actual = store.get(id);
let matches_current = matches_redirect(actual, current);
let matches_next = matches_redirect(actual, write.target());

if matches {
if matches_current {
match write {
TieredWrite::Tombstone(tombstone) => {
store.insert(id.clone(), StoreEntry::Tombstone(tombstone));
Expand All @@ -196,7 +191,18 @@ impl HighVolumeBackend for InMemoryBackend {
}
}

Ok(matches)
Ok(matches_current || matches_next)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemory backend skips writes Bigtable would apply

Medium Severity

The InMemoryBackend only applies the write when matches_current is true, but returns true whenever matches_current || matches_next. In Bigtable, the write mutation always fires when the predicate indicates a "safe" state (including the idempotent/matches_next path). For the (Some(old), None) transition where the row holds an unrelated object, Bigtable's delete mutation fires and removes it, while InMemoryBackend preserves it — both returning true. This means the test double has different side effects from the real backend, which can mask data-loss scenarios in TieredStorage integration tests that rely on InMemoryBackend.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR: Both backends adhere to the spec. The detail is in the check_and_write docs:

Whether the mutation runs again is up to the implementation.

Effectively, this is a race and the trait leaves this as implementation-defined behavior (not undefined behavior!). The contract that matters is that cleanup can happen, hence the trait leaves this up to implementors:

  • InMemoryBackend chooses the most correct implementation and skips the side-effect since the precondition is violated.
  • BigTable opts for a simpler filter chain and does run the mutation. It may overwrite a concurrent inline-insert, but this is explicitly OK for the contract (both are no tombstone, which is what matters to idempotency).

}
}

/// Returns `true` if `entry` matches the expected tombstone redirect state.
///
/// - `expected = None`: matches any non-tombstone (absent or inline object).
/// - `expected = Some(target)`: matches a tombstone whose redirect target equals `target`.
fn matches_redirect(entry: Option<&StoreEntry>, expected: Option<&ObjectId>) -> bool {
match expected {
None => matches!(entry, Some(StoreEntry::Object { .. }) | None),
Some(target) => matches!(entry, Some(StoreEntry::Tombstone(t)) if t.target == *target),
}
}

Expand Down
9 changes: 9 additions & 0 deletions objectstore-service/src/backend/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@
//! CAS conflicts are therefore **not errors**: the losing writer's data is
//! cleaned up and `Ok` is returned, because the result is indistinguishable
//! from having succeeded a moment earlier and then been overwritten.
//!
//! ### Idempotency
//!
//! `compare_and_write` is idempotent: if the row is already in the target state, it
//! returns `true` without re-applying the mutation. This is critical for retry
//! safety. If the server commits a write but the response is lost, a retry sees the
//! already-mutated state and still returns `true` — so callers do not mistakenly
//! treat a successful commit as a lost race and clean up data that was actually
//! persisted.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down
Loading