From 34f34b049e450b3b4b61a50c144783a6ea4415c5 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 18:02:33 +0100 Subject: [PATCH 1/8] ref(service): Add CAS commit points for atomic tombstone writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace `create_tombstone` with a single `cas_put` method on `HighVolumeBackend`. The method takes a `CasMutation` (WriteTombstone, WriteInline, or Delete) and an optional expected redirect target, and applies the mutation only if the current row state matches. Large-object writes now store the payload at a unique revision key (`{key}/{uuid_v7}`) before CAS-committing the tombstone, so concurrent writers don't silently overwrite each other. Small objects that land on an existing tombstone are CAS-swapped inline (fixing the expiry-mismatch TODO). Deletes CAS-remove the tombstone first, then clean up GCS best-effort — reversing the previous ordering. Co-Authored-By: Claude --- objectstore-service/src/backend/bigtable.rs | 544 ++++++++++++++++++- objectstore-service/src/backend/common.rs | 29 +- objectstore-service/src/backend/in_memory.rs | 40 +- objectstore-service/src/backend/tiered.rs | 285 +++++++--- objectstore-service/src/service.rs | 29 +- 5 files changed, 807 insertions(+), 120 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index ad3cac9b..e6d28f08 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -43,8 +43,8 @@ use tonic::Code; use bytes::Bytes; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, Tombstone, + Backend, CasMutation, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + PutResponse, TieredGet, TieredMetadata, Tombstone, }; use crate::error::{Error, Result}; use crate::gcp_auth::PrefetchingTokenProvider; @@ -226,6 +226,76 @@ fn tombstone_predicate() -> v2::RowFilter { } } +/// Creates a row filter that matches tombstones whose redirect resolves to `expected_target`. +/// +/// Combines an exact value match on the `r` column with additional fallbacks when +/// `expected_target == hv_id` (i.e., the caller expects the legacy identity target): +/// - `r == b""` (empty-sentinel format written before the redirect column stored the path) +/// - legacy `m` column format (`is_redirect_tombstone: true`) +fn redirect_target_filter(expected_target: &ObjectId, hv_id: &ObjectId) -> v2::RowFilter { + let target_path = expected_target.as_storage_path().to_string().into_bytes(); + + let exact_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_REDIRECT), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { + start_value: Some(v2::value_range::StartValue::StartValueClosed( + target_path.clone(), + )), + end_value: Some(v2::value_range::EndValue::EndValueClosed(target_path)), + })), + }, + ], + })), + }; + + if expected_target == hv_id { + // Also match legacy tombstones that resolve to the HV id: + // - empty `r` value (written before the redirect column stored the path) + // - legacy `m` column format (`is_redirect_tombstone: true`) + let empty_redirect_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_REDIRECT), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { + start_value: Some(v2::value_range::StartValue::StartValueClosed( + vec![], + )), + end_value: Some(v2::value_range::EndValue::EndValueClosed(vec![])), + })), + }, + ], + })), + }; + + let legacy_meta_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_METADATA), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRegexFilter( + b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), + )), + }, + ], + })), + }; + + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Interleave( + v2::row_filter::Interleave { + filters: vec![exact_match, empty_redirect_match, legacy_meta_match], + }, + )), + } + } else { + exact_match + } +} + /// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`). /// /// Used by metadata-only reads to avoid fetching the (potentially large) payload column @@ -834,12 +904,77 @@ impl HighVolumeBackend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - tracing::debug!("Writing tombstone to Bigtable backend"); + async fn cas_put( + &self, + id: &ObjectId, + expected_redirect: Option<&ObjectId>, + mutation: CasMutation, + ) -> Result { + tracing::debug!("CAS put to Bigtable backend"); + let path = id.as_storage_path().to_string().into_bytes(); - self.put_tombstone_row(path, &tombstone, "create_tombstone") - .await?; - Ok(()) + let now = SystemTime::now(); + + let write_mutations: Vec = match mutation { + CasMutation::WriteTombstone(tombstone) => { + let mutations = build_tombstone_mutations(&tombstone, now)?; + mutations + .into_iter() + .map(|m| v2::Mutation { mutation: Some(m) }) + .collect() + } + CasMutation::WriteInline(metadata, payload) => { + let mutations = build_write_mutations(&metadata, payload.to_vec(), now)?; + mutations + .into_iter() + .map(|m| v2::Mutation { mutation: Some(m) }) + .collect() + } + CasMutation::Delete => { + let delete = mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}); + vec![v2::Mutation { + mutation: Some(delete), + }] + } + }; + + let (predicate_filter, true_mutations, false_mutations, success_on_match) = + match expected_redirect { + None => { + // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. + // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). + (tombstone_predicate(), vec![], write_mutations, false) + } + Some(target) => { + // Success = redirect matches target. Predicate match = CAS success. + ( + redirect_target_filter(target, id), + write_mutations, + vec![], + true, + ) + } + }; + + let request = v2::CheckAndMutateRowRequest { + table_name: self.table_path.clone(), + row_key: path, + predicate_filter: Some(predicate_filter), + true_mutations, + false_mutations, + ..Default::default() + }; + + let predicate_matched = retry("cas_put", || async { + self.bigtable + .client() + .check_and_mutate_row(request.clone()) + .await + }) + .await? + .predicate_matched; + + Ok(predicate_matched == success_on_match) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -1209,12 +1344,13 @@ mod tests { let id = make_id(); backend - .create_tombstone( + .cas_put( &id, - Tombstone { + None, + CasMutation::WriteTombstone(Tombstone { target: id.clone(), expiration_policy: ExpirationPolicy::Manual, - }, + }), ) .await?; @@ -1357,12 +1493,16 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - let tombstone = Tombstone { - target: id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }; - - backend.create_tombstone(&id, tombstone).await?; + backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; let result = backend.delete_non_tombstone(&id).await?; let tombstone = result.expect("Some(tombstone)"); @@ -1467,13 +1607,19 @@ mod tests { TieredGet::Tombstone(_) )); + // Recreate a fresh tombstone to test the other conditional operations. + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; + let t_opt = backend .put_non_tombstone(&id, &Metadata::default(), bytes::Bytes::new()) .await?; - assert_eq!(t_opt.as_ref(), Some(&t)); + // Legacy tombstones resolve to hv_id; target should match id. + assert_eq!(t_opt.as_ref().map(|t| &t.target), Some(&id)); + + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; let t_opt = backend.delete_non_tombstone(&id).await?; - assert_eq!(t_opt, Some(t)); + assert_eq!(t_opt.as_ref().map(|t| &t.target), Some(&id)); Ok(()) } @@ -1547,16 +1693,22 @@ mod tests { } #[tokio::test] - async fn test_create_tombstone_round_trip() -> Result<()> { + async fn test_cas_put_create_tombstone() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600)); - let tombstone = Tombstone { - target: id.clone(), - expiration_policy, - }; - backend.create_tombstone(&id, tombstone).await?; + let committed = backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: id.clone(), + expiration_policy, + }), + ) + .await?; + assert!(committed, "expected CAS success on empty row"); // Both hv methods must surface the tombstone with the correct expiration_policy. let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else { @@ -1581,6 +1733,346 @@ mod tests { Ok(()) } + /// Attempting to create a tombstone when one already exists returns false. + #[tokio::test] + async fn test_cas_put_create_tombstone_conflict() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let tombstone = Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + let first = backend + .cas_put(&id, None, CasMutation::WriteTombstone(tombstone.clone())) + .await?; + assert!(first, "first write should succeed"); + + let second = backend + .cas_put(&id, None, CasMutation::WriteTombstone(tombstone)) + .await?; + assert!( + !second, + "second write should fail: tombstone already exists" + ); + + Ok(()) + } + + /// CAS-swapping an existing tombstone for a new one succeeds when the expected target matches. + #[tokio::test] + async fn test_cas_put_swap_tombstone() -> Result<()> { + let backend = create_test_backend().await?; + + let hv_id = make_id(); + let old_lt_id = ObjectId::random(hv_id.context().clone()); + let new_lt_id = ObjectId::random(hv_id.context().clone()); + + let old_tombstone = Tombstone { + target: old_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + backend + .cas_put(&hv_id, None, CasMutation::WriteTombstone(old_tombstone)) + .await?; + + let new_tombstone = Tombstone { + target: new_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + let swapped = backend + .cas_put( + &hv_id, + Some(&old_lt_id), + CasMutation::WriteTombstone(new_tombstone), + ) + .await?; + assert!(swapped, "expected CAS success"); + + let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else { + panic!("expected tombstone"); + }; + assert_eq!(t.target, new_lt_id, "tombstone target must be updated"); + + Ok(()) + } + + /// CAS-swapping fails when the expected target does not match the current tombstone. + #[tokio::test] + async fn test_cas_put_swap_tombstone_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let hv_id = make_id(); + let actual_lt_id = ObjectId::random(hv_id.context().clone()); + let wrong_lt_id = ObjectId::random(hv_id.context().clone()); + let new_lt_id = ObjectId::random(hv_id.context().clone()); + + backend + .cas_put( + &hv_id, + None, + CasMutation::WriteTombstone(Tombstone { + target: actual_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + + let swapped = backend + .cas_put( + &hv_id, + Some(&wrong_lt_id), + CasMutation::WriteTombstone(Tombstone { + target: new_lt_id, + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + assert!(!swapped, "expected CAS failure due to wrong target"); + + // Row unchanged. + let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else { + panic!("expected tombstone"); + }; + assert_eq!(t.target, actual_lt_id, "tombstone target must be unchanged"); + + Ok(()) + } + + /// CAS-swapping a tombstone for inline data succeeds when the target matches. + #[tokio::test] + async fn test_cas_put_swap_inline() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + + backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + + let metadata = Metadata::default(); + let payload = bytes::Bytes::from(b"hello inline".to_vec()); + let swapped = backend + .cas_put( + &id, + Some(<_id), + CasMutation::WriteInline(metadata, payload.clone()), + ) + .await?; + assert!(swapped, "expected CAS success"); + + // Row is now an inline object. + let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { + panic!("expected inline object after swap"); + }; + let body = crate::stream::read_to_vec(stream).await?; + assert_eq!(body, payload.as_ref()); + + Ok(()) + } + + /// Inline-swap fails when the expected target does not match. + #[tokio::test] + async fn test_cas_put_swap_inline_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + let wrong_id = ObjectId::random(id.context().clone()); + + backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + + let swapped = backend + .cas_put( + &id, + Some(&wrong_id), + CasMutation::WriteInline(Metadata::default(), bytes::Bytes::new()), + ) + .await?; + assert!(!swapped, "expected CAS failure"); + + // Tombstone still present. + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::Tombstone(_) + )); + + Ok(()) + } + + /// CAS-delete succeeds when the expected target matches. + #[tokio::test] + async fn test_cas_put_delete() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + + backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + + let deleted = backend + .cas_put(&id, Some(<_id), CasMutation::Delete) + .await?; + assert!(deleted, "expected CAS delete success"); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + + /// CAS-delete fails when the expected target does not match. + #[tokio::test] + async fn test_cas_put_delete_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + let wrong_id = ObjectId::random(id.context().clone()); + + backend + .cas_put( + &id, + None, + CasMutation::WriteTombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }), + ) + .await?; + + let deleted = backend + .cas_put(&id, Some(&wrong_id), CasMutation::Delete) + .await?; + assert!(!deleted, "expected CAS failure"); + + // Row preserved. + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::Tombstone(_) + )); + + Ok(()) + } + + /// CAS-delete with Some(target) against a regular object returns false. + #[tokio::test] + async fn test_cas_put_delete_regular_object() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let fake_lt_id = ObjectId::random(id.context().clone()); + + backend + .put_object(&id, &Metadata::default(), crate::stream::single("data")) + .await?; + + let deleted = backend + .cas_put(&id, Some(&fake_lt_id), CasMutation::Delete) + .await?; + assert!(!deleted, "expected false: row is not a tombstone"); + + // Object preserved. + assert!(backend.get_object(&id).await?.is_some()); + + Ok(()) + } + + /// Legacy empty-redirect tombstone (`r=b""`) is matched by `cas_put` when `expected=Some(id)`. + #[tokio::test] + async fn test_cas_put_legacy_empty_redirect() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let path = id.as_storage_path().to_string().into_bytes(); + + // Write a tombstone with an empty `r` value (legacy format). + backend + .mutate( + path, + [ + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_REDIRECT.to_owned(), + timestamp_micros: -1, + value: b"".to_vec(), + }), + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), + timestamp_micros: -1, + value: serde_json::to_vec(&TombstoneMeta { + expiration_policy: ExpirationPolicy::Manual, + }) + .unwrap(), + }), + ], + "test-setup", + ) + .await?; + + // Expected target = id (the legacy fallback resolves to hv_id). + let deleted = backend.cas_put(&id, Some(&id), CasMutation::Delete).await?; + assert!( + deleted, + "cas_put should match legacy empty-redirect tombstone" + ); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + + /// Legacy metadata-format tombstone is matched by `cas_put` when `expected=Some(id)`. + #[tokio::test] + async fn test_cas_put_legacy_metadata_format() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; + + // Legacy tombstones resolve to hv_id, so expected target = id. + let deleted = backend.cas_put(&id, Some(&id), CasMutation::Delete).await?; + assert!(deleted, "cas_put should match legacy-metadata tombstone"); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + /// The redirect pointer stored in the `r` column survives a Bigtable write and read: /// `target` on write equals `target` on read, and the parsed `ObjectId` matches. #[tokio::test] @@ -1595,7 +2087,9 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }; - backend.create_tombstone(&hv_id, tombstone).await?; + backend + .cas_put(&hv_id, None, CasMutation::WriteTombstone(tombstone)) + .await?; match backend.get_tiered_metadata(&hv_id).await? { TieredMetadata::Tombstone(t) => assert_eq!(t.target, lt_id, "target must match"), diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 96991f14..2fc1feff 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -100,6 +100,17 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { async fn delete_object(&self, id: &ObjectId) -> Result; } +/// What to write when a [`cas_put`](HighVolumeBackend::cas_put) condition is met. +#[derive(Debug)] +pub enum CasMutation { + /// Write a redirect tombstone. + WriteTombstone(Tombstone), + /// Write inline object data. + WriteInline(Metadata, Bytes), + /// Delete the row entirely. + Delete, +} + /// Trait for backends that support tombstone-conditional operations. /// /// Only backends suitable for the high-volume tier of @@ -145,11 +156,21 @@ pub trait HighVolumeBackend: Backend { /// without a second round trip. async fn delete_non_tombstone(&self, id: &ObjectId) -> Result>; - /// Writes a redirect tombstone for the given object. + /// Atomically mutates the row if the current redirect state matches. /// - /// A tombstone signals that the real object lives in the long-term backend - /// identified by the tombstone's [`target`](Tombstone::target) ID. - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()>; + /// `expected_redirect` determines the precondition: + /// - `None`: succeeds only if no tombstone exists (row absent or inline). + /// - `Some(target)`: succeeds only if a tombstone exists whose redirect + /// resolves to `target` (handles modern, empty-sentinel, and legacy formats). + /// + /// On match, applies `mutation`. Returns `true` on success, `false` if the + /// precondition was not met (row state changed concurrently). + async fn cas_put( + &self, + id: &ObjectId, + expected_redirect: Option<&ObjectId>, + mutation: CasMutation, + ) -> Result; } /// Creates a reqwest client with required defaults. diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 0597cf14..3a7fd08b 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -13,7 +13,7 @@ use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; use super::common::{ - DeleteResponse, GetResponse, PutResponse, TieredGet, TieredMetadata, Tombstone, + CasMutation, DeleteResponse, GetResponse, PutResponse, TieredGet, TieredMetadata, Tombstone, }; use crate::error::{Error, Result}; use crate::id::ObjectId; @@ -143,12 +143,38 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Ok(None) } - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - self.store - .lock() - .unwrap() - .insert(id.clone(), StoreEntry::Tombstone(tombstone)); - Ok(()) + async fn cas_put( + &self, + id: &ObjectId, + expected_redirect: Option<&ObjectId>, + mutation: CasMutation, + ) -> Result { + let mut store = self.store.lock().unwrap(); + let current = store.get(id); + + let matches = match expected_redirect { + None => !matches!(current, Some(StoreEntry::Tombstone(_))), + Some(target) => matches!( + current, + Some(StoreEntry::Tombstone(t)) if t.target == *target + ), + }; + + if matches { + match mutation { + CasMutation::WriteTombstone(tombstone) => { + store.insert(id.clone(), StoreEntry::Tombstone(tombstone)); + } + CasMutation::WriteInline(metadata, payload) => { + store.insert(id.clone(), StoreEntry::Object(metadata, payload)); + } + CasMutation::Delete => { + store.remove(id); + } + } + } + + Ok(matches) } async fn get_tiered_object(&self, id: &ObjectId) -> Result { diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ad30f3f1..c5760921 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -13,8 +13,8 @@ use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; use crate::backend::common::{ - Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, Tombstone, + Backend, CasMutation, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, + PutResponse, TieredGet, TieredMetadata, Tombstone, }; use crate::backend::{HighVolumeStorageConfig, StorageConfig}; use crate::error::Result; @@ -24,6 +24,18 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +/// Creates a new [`ObjectId`] with the same context but a unique revision key. +/// +/// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct +/// storage path for each large-object write. [`ObjectId::from_storage_path`] parses +/// the result back correctly because the key portion may contain `/`. +fn revision_id(id: &ObjectId) -> ObjectId { + ObjectId { + context: id.context.clone(), + key: format!("{}/{}", id.key, uuid::Uuid::now_v7()), + } +} + /// Configuration for [`TieredStorage`]. /// /// Composes two backends into a tiered routing setup: `high_volume` for small @@ -188,17 +200,23 @@ impl Backend for TieredStorage { .put_non_tombstone(id, metadata, payload.clone()) .await?; - if let Some(Tombstone { target, .. }) = tombstone_opt { - // Tombstone already exists in HV — write to long-term instead. - // TODO: The new object's expiry may differ from the tombstone's, - // leaving them inconsistent. This is a known gap and will be fixed - // in a follow-up. - let stream = crate::stream::single(payload).boxed(); - self.long_term.put_object(&target, metadata, stream).await?; - (BackendChoice::LongTerm, stored_size) - } else { - (BackendChoice::HighVolume, stored_size) + if let Some(tombstone) = tombstone_opt { + // Tombstone exists — CAS-swap it for inline data so the object + // ends up in HV with consistent expiry. + let cas_mutation = CasMutation::WriteInline(metadata.clone(), payload.clone()); + let swapped = self + .high_volume + .cas_put(id, Some(&tombstone.target), cas_mutation) + .await?; + + if swapped { + // Commit succeeded. Best-effort cleanup of old GCS blob. + let _ = self.long_term.delete_object(&tombstone.target).await; + } + // If CAS failed, someone else won the race — their write is committed. } + + (BackendChoice::HighVolume, stored_size) } BackendChoice::LongTerm => { let stored_size = Arc::new(AtomicU64::new(0)); @@ -214,22 +232,45 @@ impl Backend for TieredStorage { }) .boxed(); - // First write the object to long-term. - let long_term_id = id.clone(); - self.long_term - .put_object(&long_term_id, metadata, stream) - .await?; + // 1. Read current HV state to establish the CAS precondition. + let expected_old = match self.high_volume.get_tiered_metadata(id).await? { + TieredMetadata::Tombstone(t) => Some(t), + _ => None, + }; - // Then write the redirect tombstone to high-volume. - let tombstone = Tombstone { - target: long_term_id.clone(), + // 2. Write payload to GCS at a unique revision key. + let lt_id = revision_id(id); + self.long_term.put_object(<_id, metadata, stream).await?; + + // 3. CAS commit: write tombstone only if HV state matches what we saw. + let expected_target = expected_old.as_ref().map(|t| &t.target); + let new_tombstone = Tombstone { + target: lt_id.clone(), expiration_policy: metadata.expiration_policy, }; - - if let Err(e) = self.high_volume.create_tombstone(id, tombstone).await { - // Clean up on any kind of error. - self.long_term.delete_object(&long_term_id).await?; - return Err(e); + let cas_mutation = CasMutation::WriteTombstone(new_tombstone); + let commit_result = self + .high_volume + .cas_put(id, expected_target, cas_mutation) + .await; + + match commit_result { + Ok(true) => { + // Tombstone committed. Clean up old GCS blob if overwriting. + if let Some(old) = expected_old { + let _ = self.long_term.delete_object(&old.target).await; + } + } + Ok(false) => { + // Someone else won the race. Clean up our GCS blob. + let _ = self.long_term.delete_object(<_id).await; + // Return OK — from the caller's perspective, a write happened. + } + Err(e) => { + // CAS error. Clean up our GCS blob before propagating. + let _ = self.long_term.delete_object(<_id).await; + return Err(e); + } } (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire)) @@ -324,11 +365,20 @@ impl Backend for TieredStorage { if let Some(tombstone) = self.high_volume.delete_non_tombstone(id).await? { backend_choice = BackendChoice::LongTerm; - // Delete the long-term object first, then clean up the tombstone. - // This ordering ensures that if the long-term delete fails, the - // tombstone remains and the data is still reachable (not orphaned). - self.long_term.delete_object(&tombstone.target).await?; - self.high_volume.delete_object(id).await?; + // CAS-delete the tombstone first (commit point), then clean up GCS. + // This is the inverse of the old ordering: if GCS cleanup fails, + // an orphan blob remains (accepted) but the tombstone is gone. + let cas_mutation = CasMutation::Delete; + let deleted = self + .high_volume + .cas_put(id, Some(&tombstone.target), cas_mutation) + .await?; + + if deleted { + // Tombstone removed. Best-effort GCS cleanup. + let _ = self.long_term.delete_object(&tombstone.target).await; + } + // If CAS failed, someone else changed the tombstone — nothing to do. } objectstore_metrics::record!( @@ -357,6 +407,60 @@ mod tests { use crate::id::ObjectContext; use crate::stream::{self}; + // --- revision_id tests --- + + #[test] + fn revision_id_preserves_context() { + let id = ObjectId { + context: ObjectContext { + usecase: "testing".to_string(), + scopes: Scopes::from_iter([Scope::create("org", "17").unwrap()]), + }, + key: "my-key".to_string(), + }; + + let revised = revision_id(&id); + assert_eq!(revised.context, id.context); + assert!( + revised.key.starts_with("my-key/"), + "revised key should have / suffix, got: {}", + revised.key + ); + } + + #[test] + fn revision_id_roundtrips_storage_path() { + use crate::id::ObjectId; + let id = ObjectId { + context: ObjectContext { + usecase: "attachments".to_string(), + scopes: Scopes::from_iter([Scope::create("org", "42").unwrap()]), + }, + key: "original".to_string(), + }; + + let revised = revision_id(&id); + let path = revised.as_storage_path().to_string(); + let parsed = ObjectId::from_storage_path(&path) + .unwrap_or_else(|| panic!("failed to parse '{path}'")); + assert_eq!(parsed, revised); + } + + #[test] + fn revision_id_is_unique() { + let id = ObjectId { + context: ObjectContext { + usecase: "testing".to_string(), + scopes: Scopes::empty(), + }, + key: "base-key".to_string(), + }; + + let a = revision_id(&id); + let b = revision_id(&id); + assert_ne!(a.key, b.key, "two calls should produce different keys"); + } + fn make_context() -> ObjectContext { ObjectContext { usecase: "testing".into(), @@ -434,42 +538,46 @@ mod tests { .await .unwrap(); - // Real payload should be in long-term - let (_, lt_bytes) = lt.get(&id).expect_object(); - assert_eq!(lt_bytes.len(), payload_len); + // A redirect tombstone should exist in high-volume pointing to a revision key. + let tombstone = hv.get(&id).expect_tombstone(); + let lt_id = tombstone.target; + assert!( + lt_id.key().starts_with(id.key()), + "tombstone target key should be a revision of the HV key" + ); - // A redirect tombstone should exist in high-volume - hv.get(&id).expect_tombstone(); + // Real payload should be in long-term at the revision key. + let (_, lt_bytes) = lt.get(<_id).expect_object(); + assert_eq!(lt_bytes.len(), payload_len); } #[tokio::test] - async fn reinsert_with_existing_tombstone_routes_to_long_term() { + async fn reinsert_small_over_large_swaps_to_inline() { let (storage, hv, lt) = make_tiered_storage(); let id = ObjectId::new(make_context(), "reinsert-key".into()); - // First: insert a large object → creates tombstone in hv, payload in lt + // First: insert a large object → creates tombstone in hv, payload in lt at lt_id let large_payload = vec![0xABu8; 2 * 1024 * 1024]; storage .put_object(&id, &Default::default(), stream::single(large_payload)) .await .unwrap(); - hv.get(&id).expect_tombstone(); + let lt_id = hv.get(&id).expect_tombstone().target; - // Now re-insert a SMALL payload with the same key. The service should - // detect the existing tombstone and route to long-term anyway. + // Re-insert a SMALL payload with the same key. + // The CAS-swap puts the small object inline in HV and cleans up the old GCS blob. let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold storage .put_object(&id, &Default::default(), stream::single(small_payload)) .await .unwrap(); - // The small object should be in long-term (not high-volume) - let (_, lt_bytes) = lt.get(&id).expect_object(); - assert_eq!(lt_bytes.len(), 100); + // The small object is now inline in high-volume. + hv.get(&id).expect_object(); - // The tombstone in hv should still be present - hv.get(&id).expect_tombstone(); + // The old long-term blob was cleaned up. + lt.get(<_id).expect_not_found(); } #[tokio::test] @@ -490,16 +598,22 @@ mod tests { .await .unwrap(); - // The tombstone in hv should have expiration_policy inherited and target set to id. + // The tombstone in hv should have expiration_policy inherited. + // The target is a unique revision key, not `id` itself. let tombstone = hv.get(&id).expect_tombstone(); assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); - assert_eq!( - tombstone.target, id, - "tombstone target must point to the LT ObjectId (identical to HV id for now)" + let lt_id = tombstone.target.clone(); + assert_ne!( + lt_id, id, + "tombstone target must be a unique revision, not the HV id" + ); + assert!( + lt_id.key().starts_with(id.key()), + "revision key should have original key as prefix" ); - // The long-term object should have the full metadata - let (lt_meta, _) = lt.get(&id).expect_object(); + // The long-term object should be at the revision key with the full metadata. + let (lt_meta, _) = lt.get(<_id).expect_object(); assert_eq!(lt_meta.content_type, "image/png"); assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); } @@ -534,7 +648,7 @@ mod tests { // --- Tombstone inconsistency tests --- - /// A backend where `create_tombstone` always fails, but all other operations work normally. + /// A backend where `cas_put` always fails, but all other operations work normally. #[derive(Debug)] struct FailingTombstoneBackend(InMemoryBackend); @@ -577,7 +691,12 @@ mod tests { self.0.delete_non_tombstone(id).await } - async fn create_tombstone(&self, _id: &ObjectId, _tombstone: Tombstone) -> Result<()> { + async fn cas_put( + &self, + _id: &ObjectId, + _expected_redirect: Option<&ObjectId>, + _mutation: CasMutation, + ) -> Result { Err(Error::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, "simulated tombstone write failure", @@ -617,7 +736,7 @@ mod tests { /// cleanup), reads should gracefully return None rather than error. #[tokio::test] async fn orphan_tombstone_returns_none() { - let (storage, _hv, lt) = make_tiered_storage(); + let (storage, hv, lt) = make_tiered_storage(); let id = ObjectId::new(make_context(), "orphan-tombstone".into()); let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB @@ -626,8 +745,11 @@ mod tests { .await .unwrap(); + // The object is at the revision key in LT, not at id. + let lt_id = hv.get(&id).expect_tombstone().target; + // Remove the long-term object, leaving an orphan tombstone in hv - lt.remove(&id); + lt.remove(<_id); assert!( storage.get_object(&id).await.unwrap().is_none(), @@ -652,10 +774,13 @@ mod tests { .await .unwrap(); + // Capture lt_id before deleting (it lives at the revision key, not at id). + let lt_id = hv.get(&id).expect_tombstone().target; + storage.delete_object(&id).await.unwrap(); assert!(!hv.contains(&id), "tombstone not cleaned up"); - assert!(!lt.contains(&id), "object not cleaned up"); + assert!(!lt.contains(<_id), "long-term object not cleaned up"); } /// A backend wrapper that delegates everything except `delete_object`, which always fails. @@ -689,10 +814,11 @@ mod tests { } } - /// When the long-term delete fails, the tombstone must be preserved so the - /// object remains reachable and no data is orphaned. + /// When the long-term GCS cleanup fails after the tombstone is deleted, the + /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may + /// remain in LT storage, which is accepted. #[tokio::test] - async fn tombstone_preserved_when_long_term_delete_fails() { + async fn delete_succeeds_when_gcs_cleanup_fails() { let hv = InMemoryBackend::new("hv"); let lt = FailingDeleteBackend(InMemoryBackend::new("lt")); let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt)); @@ -704,15 +830,21 @@ mod tests { .await .unwrap(); + // Delete succeeds even though GCS cleanup fails (it is best-effort). let result = storage.delete_object(&id).await; - assert!(result.is_err()); + assert!( + result.is_ok(), + "delete should succeed despite GCS cleanup failure" + ); - hv.get(&id).expect_tombstone(); + // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup). + hv.get(&id).expect_not_found(); - // The object should still be reachable through the service - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body = stream::read_to_vec(s).await.unwrap(); - assert_eq!(body, payload); + // The orphaned GCS blob remains but the object is unreachable through the service. + assert!( + storage.get_object(&id).await.unwrap().is_none(), + "object should be unreachable after tombstone is deleted" + ); } // --- Redirect target tests --- @@ -733,15 +865,13 @@ mod tests { lt.put_object(<_id, &Default::default(), stream::single(payload.clone())) .await .unwrap(); - hv.create_tombstone( - &hv_id, - crate::backend::common::Tombstone { - target: lt_id.clone(), - expiration_policy: objectstore_types::metadata::ExpirationPolicy::Manual, - }, - ) - .await - .unwrap(); + let tombstone = crate::backend::common::Tombstone { + target: lt_id.clone(), + expiration_policy: objectstore_types::metadata::ExpirationPolicy::Manual, + }; + hv.cas_put(&hv_id, None, CasMutation::WriteTombstone(tombstone)) + .await + .unwrap(); // get_object must follow the tombstone and find the object via the lt_id target. let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap(); @@ -758,7 +888,7 @@ mod tests { #[tokio::test] async fn multi_chunk_large_object_chains_buffered_and_remaining() { - let (storage, _hv, lt) = make_tiered_storage(); + let (storage, hv, lt) = make_tiered_storage(); let id = ObjectId::new(make_context(), "multi-chunk".into()); // Deliver a 2 MiB payload across multiple chunks that individually @@ -775,8 +905,9 @@ mod tests { .await .unwrap(); - // Should have been routed to long-term (over 1 MiB). - let (_, lt_bytes) = lt.get(&id).expect_object(); + // Should have been routed to long-term (over 1 MiB) at the revision key. + let lt_id = hv.get(&id).expect_tombstone().target; + let (_, lt_bytes) = lt.get(<_id).expect_object(); assert_eq!(lt_bytes.len(), chunk_size * chunk_count); // Verify data integrity — each chunk's fill byte should appear in order. diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 2d34de78..b480a624 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -235,7 +235,9 @@ mod tests { use super::*; use crate::backend::bigtable::{BigTableBackend, BigTableConfig}; - use crate::backend::common::{HighVolumeBackend, TieredGet, TieredMetadata, Tombstone}; + use crate::backend::common::{ + CasMutation, HighVolumeBackend, TieredGet, TieredMetadata, Tombstone, + }; use crate::backend::gcs::{GcsBackend, GcsConfig}; use crate::backend::in_memory::InMemoryBackend; use crate::backend::tiered::TieredStorage; @@ -529,10 +531,21 @@ mod tests { self.inner.delete_non_tombstone(id).await } - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - self.inner.create_tombstone(id, tombstone).await?; - self.on_put.notify_one(); - Ok(()) + async fn cas_put( + &self, + id: &ObjectId, + expected_redirect: Option<&ObjectId>, + mutation: CasMutation, + ) -> Result { + let notify = matches!( + mutation, + CasMutation::WriteTombstone(_) | CasMutation::WriteInline(_, _) + ); + let result = self.inner.cas_put(id, expected_redirect, mutation).await?; + if notify { + self.on_put.notify_one(); + } + Ok(result) } async fn get_tiered_object(&self, id: &ObjectId) -> Result { @@ -579,9 +592,11 @@ mod tests { .expect("timed out waiting for tombstone write"); // Verify the object was fully written despite the caller being dropped. + // The tombstone in HV points to the revision key in LT. let id = ObjectId::new(make_context(), "completion-test".into()); - assert!(lt.inner.contains(&id), "long-term object missing"); - hv.inner.get(&id).expect_tombstone(); + let tombstone = hv.inner.get(&id).expect_tombstone(); + let lt_id = tombstone.target; + assert!(lt.inner.contains(<_id), "long-term object missing"); } // --- Concurrency limit tests --- From 7e602c00163c276bc17db51037cccfc5e1a8a1ff Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 21:13:15 +0100 Subject: [PATCH 2/8] ref(service): Rename CasMutation/cas_put and extract tiered helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `CasMutation` to `TieredWrite` and `cas_put` to `compare_and_write` for clarity — the names now match the trait method and the write variants (`WriteInline` → `Object`, `WriteTombstone` → `Tombstone`) align with the existing `TieredGet`/`TieredMetadata` vocabulary. Extract `put_high_volume`, `put_long_term`, and `counting_stream` from the `put_object` body in `tiered.rs`, and rename `revision_id` to `new_long_term_revision` to make its purpose explicit. No behavior change. Co-Authored-By: Claude --- objectstore-service/src/backend/bigtable.rs | 225 +++++++------ objectstore-service/src/backend/common.rs | 128 ++++---- objectstore-service/src/backend/in_memory.rs | 69 ++-- objectstore-service/src/backend/tiered.rs | 314 ++++++++++--------- objectstore-service/src/service.rs | 31 +- 5 files changed, 392 insertions(+), 375 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index e6d28f08..9f8398c4 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -43,8 +43,8 @@ use tonic::Code; use bytes::Bytes; use crate::backend::common::{ - Backend, CasMutation, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, - PutResponse, TieredGet, TieredMetadata, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, + TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::gcp_auth::PrefetchingTokenProvider; @@ -845,6 +845,59 @@ impl HighVolumeBackend for BigTableBackend { Err(Error::generic("BigTable: race loop in put_non_tombstone")) } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + tracing::debug!("Reading from Bigtable backend"); + let path = id.as_storage_path().to_string().into_bytes(); + + let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else { + return Ok(TieredGet::NotFound); + }; + + if row.needs_tti_bump() { + self.bump_tti(path.clone(), &row, true, id).await; + } + + Ok(match row { + RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone { + target: parse_redirect_target(&target, id)?, + expiration_policy: meta.expiration_policy, + }), + RowData::Object { metadata, payload } => { + let mut metadata = metadata; + metadata.size = Some(payload.len()); + TieredGet::Object(metadata, crate::stream::single(payload)) + } + }) + } + + #[tracing::instrument(level = "trace", fields(?id), skip_all)] + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + tracing::debug!("Reading metadata from Bigtable backend"); + let path = id.as_storage_path().to_string().into_bytes(); + + // Read metadata and tombstone columns — skip the (potentially large) payload. + // NB: `metadata.size` will not be populated since the payload is not fetched. + let row_opt = self + .read_row(&path, Some(metadata_filter()), "get_tiered_metadata") + .await?; + let Some(row) = row_opt else { + return Ok(TieredMetadata::NotFound); + }; + + if row.needs_tti_bump() { + self.bump_tti(path.clone(), &row, false, id).await; + } + + Ok(match row { + RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone { + target: parse_redirect_target(&target, id)?, + expiration_policy: meta.expiration_policy, + }), + RowData::Object { metadata, .. } => TieredMetadata::Object(metadata), + }) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { tracing::debug!("Conditional delete from Bigtable backend"); @@ -904,33 +957,33 @@ impl HighVolumeBackend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn cas_put( + async fn compare_and_write( &self, id: &ObjectId, - expected_redirect: Option<&ObjectId>, - mutation: CasMutation, + current: Option<&ObjectId>, + write: TieredWrite, ) -> Result { tracing::debug!("CAS put to Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); - let write_mutations: Vec = match mutation { - CasMutation::WriteTombstone(tombstone) => { + let write_mutations: Vec = match write { + TieredWrite::Tombstone(tombstone) => { let mutations = build_tombstone_mutations(&tombstone, now)?; mutations .into_iter() .map(|m| v2::Mutation { mutation: Some(m) }) .collect() } - CasMutation::WriteInline(metadata, payload) => { + TieredWrite::Object(metadata, payload) => { let mutations = build_write_mutations(&metadata, payload.to_vec(), now)?; mutations .into_iter() .map(|m| v2::Mutation { mutation: Some(m) }) .collect() } - CasMutation::Delete => { + TieredWrite::Delete => { let delete = mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}); vec![v2::Mutation { mutation: Some(delete), @@ -938,23 +991,18 @@ impl HighVolumeBackend for BigTableBackend { } }; - let (predicate_filter, true_mutations, false_mutations, success_on_match) = - match expected_redirect { - None => { - // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. - // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). - (tombstone_predicate(), vec![], write_mutations, false) - } - Some(target) => { - // Success = redirect matches target. Predicate match = CAS success. - ( - redirect_target_filter(target, id), - write_mutations, - vec![], - true, - ) - } - }; + let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current { + // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. + // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). + None => (tombstone_predicate(), vec![], write_mutations, false), + // Success = redirect matches target. Predicate match = CAS success. + Some(target) => ( + redirect_target_filter(target, id), + write_mutations, + vec![], + true, + ), + }; let request = v2::CheckAndMutateRowRequest { table_name: self.table_path.clone(), @@ -965,7 +1013,7 @@ impl HighVolumeBackend for BigTableBackend { ..Default::default() }; - let predicate_matched = retry("cas_put", || async { + let predicate_matched = retry("compare_and_write", || async { self.bigtable .client() .check_and_mutate_row(request.clone()) @@ -976,59 +1024,6 @@ impl HighVolumeBackend for BigTableBackend { Ok(predicate_matched == success_on_match) } - - #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - tracing::debug!("Reading from Bigtable backend"); - let path = id.as_storage_path().to_string().into_bytes(); - - let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else { - return Ok(TieredGet::NotFound); - }; - - if row.needs_tti_bump() { - self.bump_tti(path.clone(), &row, true, id).await; - } - - Ok(match row { - RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone { - target: parse_redirect_target(&target, id)?, - expiration_policy: meta.expiration_policy, - }), - RowData::Object { metadata, payload } => { - let mut metadata = metadata; - metadata.size = Some(payload.len()); - TieredGet::Object(metadata, crate::stream::single(payload)) - } - }) - } - - #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { - tracing::debug!("Reading metadata from Bigtable backend"); - let path = id.as_storage_path().to_string().into_bytes(); - - // Read metadata and tombstone columns — skip the (potentially large) payload. - // NB: `metadata.size` will not be populated since the payload is not fetched. - let row_opt = self - .read_row(&path, Some(metadata_filter()), "get_tiered_metadata") - .await?; - let Some(row) = row_opt else { - return Ok(TieredMetadata::NotFound); - }; - - if row.needs_tti_bump() { - self.bump_tti(path.clone(), &row, false, id).await; - } - - Ok(match row { - RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone { - target: parse_redirect_target(&target, id)?, - expiration_policy: meta.expiration_policy, - }), - RowData::Object { metadata, .. } => TieredMetadata::Object(metadata), - }) - } } /// Converts the given TTL duration to a microsecond-precision unix timestamp. @@ -1344,10 +1339,10 @@ mod tests { let id = make_id(); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1494,10 +1489,10 @@ mod tests { let id = make_id(); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1699,10 +1694,10 @@ mod tests { let id = make_id(); let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600)); let committed = backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: id.clone(), expiration_policy, }), @@ -1744,12 +1739,12 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }; let first = backend - .cas_put(&id, None, CasMutation::WriteTombstone(tombstone.clone())) + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone.clone())) .await?; assert!(first, "first write should succeed"); let second = backend - .cas_put(&id, None, CasMutation::WriteTombstone(tombstone)) + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) .await?; assert!( !second, @@ -1773,7 +1768,7 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }; backend - .cas_put(&hv_id, None, CasMutation::WriteTombstone(old_tombstone)) + .compare_and_write(&hv_id, None, TieredWrite::Tombstone(old_tombstone)) .await?; let new_tombstone = Tombstone { @@ -1781,10 +1776,10 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }; let swapped = backend - .cas_put( + .compare_and_write( &hv_id, Some(&old_lt_id), - CasMutation::WriteTombstone(new_tombstone), + TieredWrite::Tombstone(new_tombstone), ) .await?; assert!(swapped, "expected CAS success"); @@ -1808,10 +1803,10 @@ mod tests { let new_lt_id = ObjectId::random(hv_id.context().clone()); backend - .cas_put( + .compare_and_write( &hv_id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: actual_lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1819,10 +1814,10 @@ mod tests { .await?; let swapped = backend - .cas_put( + .compare_and_write( &hv_id, Some(&wrong_lt_id), - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: new_lt_id, expiration_policy: ExpirationPolicy::Manual, }), @@ -1848,10 +1843,10 @@ mod tests { let lt_id = ObjectId::random(id.context().clone()); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1861,10 +1856,10 @@ mod tests { let metadata = Metadata::default(); let payload = bytes::Bytes::from(b"hello inline".to_vec()); let swapped = backend - .cas_put( + .compare_and_write( &id, Some(<_id), - CasMutation::WriteInline(metadata, payload.clone()), + TieredWrite::Object(metadata, payload.clone()), ) .await?; assert!(swapped, "expected CAS success"); @@ -1889,10 +1884,10 @@ mod tests { let wrong_id = ObjectId::random(id.context().clone()); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1900,10 +1895,10 @@ mod tests { .await?; let swapped = backend - .cas_put( + .compare_and_write( &id, Some(&wrong_id), - CasMutation::WriteInline(Metadata::default(), bytes::Bytes::new()), + TieredWrite::Object(Metadata::default(), bytes::Bytes::new()), ) .await?; assert!(!swapped, "expected CAS failure"); @@ -1926,10 +1921,10 @@ mod tests { let lt_id = ObjectId::random(id.context().clone()); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1937,7 +1932,7 @@ mod tests { .await?; let deleted = backend - .cas_put(&id, Some(<_id), CasMutation::Delete) + .compare_and_write(&id, Some(<_id), TieredWrite::Delete) .await?; assert!(deleted, "expected CAS delete success"); @@ -1959,10 +1954,10 @@ mod tests { let wrong_id = ObjectId::random(id.context().clone()); backend - .cas_put( + .compare_and_write( &id, None, - CasMutation::WriteTombstone(Tombstone { + TieredWrite::Tombstone(Tombstone { target: lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, }), @@ -1970,7 +1965,7 @@ mod tests { .await?; let deleted = backend - .cas_put(&id, Some(&wrong_id), CasMutation::Delete) + .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete) .await?; assert!(!deleted, "expected CAS failure"); @@ -1996,7 +1991,7 @@ mod tests { .await?; let deleted = backend - .cas_put(&id, Some(&fake_lt_id), CasMutation::Delete) + .compare_and_write(&id, Some(&fake_lt_id), TieredWrite::Delete) .await?; assert!(!deleted, "expected false: row is not a tombstone"); @@ -2039,7 +2034,9 @@ mod tests { .await?; // Expected target = id (the legacy fallback resolves to hv_id). - let deleted = backend.cas_put(&id, Some(&id), CasMutation::Delete).await?; + let deleted = backend + .compare_and_write(&id, Some(&id), TieredWrite::Delete) + .await?; assert!( deleted, "cas_put should match legacy empty-redirect tombstone" @@ -2062,7 +2059,9 @@ mod tests { write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; // Legacy tombstones resolve to hv_id, so expected target = id. - let deleted = backend.cas_put(&id, Some(&id), CasMutation::Delete).await?; + let deleted = backend + .compare_and_write(&id, Some(&id), TieredWrite::Delete) + .await?; assert!(deleted, "cas_put should match legacy-metadata tombstone"); assert!(matches!( @@ -2088,7 +2087,7 @@ mod tests { }; backend - .cas_put(&hv_id, None, CasMutation::WriteTombstone(tombstone)) + .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) .await?; match backend.get_tiered_metadata(&hv_id).await? { diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 2fc1feff..e7253418 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -10,53 +10,6 @@ use crate::error::Result; use crate::id::ObjectId; use crate::stream::{ClientStream, PayloadStream}; -/// Information about a redirect tombstone in the high-volume backend. -#[derive(Clone, Debug, PartialEq)] -pub struct Tombstone { - /// The [`ObjectId`] of the object in the long-term backend. - /// - /// For legacy tombstones with an empty `r` column, the HV backend resolves - /// this to the HV `ObjectId` itself before surfacing the tombstone to callers. - pub target: ObjectId, - - /// The expiration policy copied from the original object. - pub expiration_policy: ExpirationPolicy, -} - -/// Typed response from [`HighVolumeBackend::get_tiered_object`]. -pub enum TieredGet { - /// A real object was found. - Object(Metadata, PayloadStream), - /// A redirect tombstone was found; the real object lives in the long-term backend. - Tombstone(Tombstone), - /// No entry exists at this key. - NotFound, -} - -impl fmt::Debug for TieredGet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TieredGet::Object(metadata, _stream) => f - .debug_tuple("Object") - .field(metadata) - .finish_non_exhaustive(), - TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(), - TieredGet::NotFound => write!(f, "NotFound"), - } - } -} - -/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`]. -#[derive(Debug)] -pub enum TieredMetadata { - /// Metadata for a real object was found. - Object(Metadata), - /// A redirect tombstone was found; the real object lives in the long-term backend. - Tombstone(Tombstone), - /// No entry exists at this key. - NotFound, -} - /// User agent string used for outgoing requests. /// /// This intentionally has a "sentry" prefix so that it can easily be traced back to us. @@ -100,17 +53,6 @@ pub trait Backend: fmt::Debug + Send + Sync + 'static { async fn delete_object(&self, id: &ObjectId) -> Result; } -/// What to write when a [`cas_put`](HighVolumeBackend::cas_put) condition is met. -#[derive(Debug)] -pub enum CasMutation { - /// Write a redirect tombstone. - WriteTombstone(Tombstone), - /// Write inline object data. - WriteInline(Metadata, Bytes), - /// Delete the row entirely. - Delete, -} - /// Trait for backends that support tombstone-conditional operations. /// /// Only backends suitable for the high-volume tier of @@ -158,21 +100,79 @@ pub trait HighVolumeBackend: Backend { /// Atomically mutates the row if the current redirect state matches. /// - /// `expected_redirect` determines the precondition: + /// `current` determines the precondition: /// - `None`: succeeds only if no tombstone exists (row absent or inline). /// - `Some(target)`: succeeds only if a tombstone exists whose redirect - /// resolves to `target` (handles modern, empty-sentinel, and legacy formats). + /// resolves to `target`. /// - /// On match, applies `mutation`. Returns `true` on success, `false` if the + /// On match, applies `write`. Returns `true` on success, `false` if the /// precondition was not met (row state changed concurrently). - async fn cas_put( + async fn compare_and_write( &self, id: &ObjectId, - expected_redirect: Option<&ObjectId>, - mutation: CasMutation, + current: Option<&ObjectId>, + write: TieredWrite, ) -> Result; } +/// Information about a redirect tombstone in the high-volume backend. +#[derive(Clone, Debug, PartialEq)] +pub struct Tombstone { + /// The [`ObjectId`] of the object in the long-term backend. + /// + /// For legacy tombstones with an empty `r` column, the HV backend resolves + /// this to the HV `ObjectId` itself before surfacing the tombstone to callers. + pub target: ObjectId, + + /// The expiration policy copied from the original object. + pub expiration_policy: ExpirationPolicy, +} + +/// Typed response from [`HighVolumeBackend::get_tiered_object`]. +pub enum TieredGet { + /// A real object was found. + Object(Metadata, PayloadStream), + /// A redirect tombstone was found; the real object lives in the long-term backend. + Tombstone(Tombstone), + /// No entry exists at this key. + NotFound, +} + +impl fmt::Debug for TieredGet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TieredGet::Object(metadata, _stream) => f + .debug_tuple("Object") + .field(metadata) + .finish_non_exhaustive(), + TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(), + TieredGet::NotFound => write!(f, "NotFound"), + } + } +} + +/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`]. +#[derive(Debug)] +pub enum TieredMetadata { + /// Metadata for a real object was found. + Object(Metadata), + /// A redirect tombstone was found; the real object lives in the long-term backend. + Tombstone(Tombstone), + /// No entry exists at this key. + NotFound, +} + +/// The write operation performed by [`HighVolumeBackend::tiered_write`]. +#[derive(Debug)] +pub enum TieredWrite { + /// Write a redirect tombstone. + Tombstone(Tombstone), + /// Write inline object data. + Object(Metadata, Bytes), + /// Delete the row entirely. + Delete, +} + /// Creates a reqwest client with required defaults. /// /// Automatic decompression is disabled because backends store pre-compressed diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 3a7fd08b..46dfcd07 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -13,7 +13,8 @@ use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; use super::common::{ - CasMutation, DeleteResponse, GetResponse, PutResponse, TieredGet, TieredMetadata, Tombstone, + DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata, + TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::id::ObjectId; @@ -115,7 +116,7 @@ impl super::common::Backend for InMemoryBackend { } #[async_trait::async_trait] -impl super::common::HighVolumeBackend for InMemoryBackend { +impl HighVolumeBackend for InMemoryBackend { async fn put_non_tombstone( &self, id: &ObjectId, @@ -133,6 +134,27 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Ok(None) } + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + let entry = self.store.lock().unwrap().get(id).cloned(); + Ok(match entry { + None => TieredGet::NotFound, + Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone), + Some(StoreEntry::Object(mut metadata, bytes)) => { + metadata.size = Some(bytes.len()); + TieredGet::Object(metadata, crate::stream::single(bytes)) + } + }) + } + + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + let entry = self.store.lock().unwrap().get(id).cloned(); + Ok(match entry { + None => TieredMetadata::NotFound, + Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone), + Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata), + }) + } + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { let mut store = self.store.lock().unwrap(); if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() { @@ -143,32 +165,32 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Ok(None) } - async fn cas_put( + async fn compare_and_write( &self, id: &ObjectId, - expected_redirect: Option<&ObjectId>, - mutation: CasMutation, + current: Option<&ObjectId>, + write: TieredWrite, ) -> Result { let mut store = self.store.lock().unwrap(); - let current = store.get(id); + let actual = store.get(id); - let matches = match expected_redirect { - None => !matches!(current, Some(StoreEntry::Tombstone(_))), + let matches = match current { + None => !matches!(actual, Some(StoreEntry::Tombstone(_))), Some(target) => matches!( - current, + actual, Some(StoreEntry::Tombstone(t)) if t.target == *target ), }; if matches { - match mutation { - CasMutation::WriteTombstone(tombstone) => { + match write { + TieredWrite::Tombstone(tombstone) => { store.insert(id.clone(), StoreEntry::Tombstone(tombstone)); } - CasMutation::WriteInline(metadata, payload) => { + TieredWrite::Object(metadata, payload) => { store.insert(id.clone(), StoreEntry::Object(metadata, payload)); } - CasMutation::Delete => { + TieredWrite::Delete => { store.remove(id); } } @@ -176,27 +198,6 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Ok(matches) } - - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - let entry = self.store.lock().unwrap().get(id).cloned(); - Ok(match entry { - None => TieredGet::NotFound, - Some(StoreEntry::Tombstone(tombstone)) => TieredGet::Tombstone(tombstone), - Some(StoreEntry::Object(mut metadata, bytes)) => { - metadata.size = Some(bytes.len()); - TieredGet::Object(metadata, crate::stream::single(bytes)) - } - }) - } - - async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { - let entry = self.store.lock().unwrap().get(id).cloned(); - Ok(match entry { - None => TieredMetadata::NotFound, - Some(StoreEntry::Tombstone(tombstone)) => TieredMetadata::Tombstone(tombstone), - Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata), - }) - } } /// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries. diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index c5760921..1818594d 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -8,13 +8,14 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; -use futures_util::StreamExt; +use bytes::Bytes; +use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; use crate::backend::common::{ - Backend, CasMutation, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, - PutResponse, TieredGet, TieredMetadata, Tombstone, + Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, + TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::{HighVolumeStorageConfig, StorageConfig}; use crate::error::Result; @@ -29,7 +30,7 @@ const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB /// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct /// storage path for each large-object write. [`ObjectId::from_storage_path`] parses /// the result back correctly because the key portion may contain `/`. -fn revision_id(id: &ObjectId) -> ObjectId { +fn new_long_term_revision(id: &ObjectId) -> ObjectId { ObjectId { context: id.context.clone(), key: format!("{}/{}", id.key, uuid::Uuid::now_v7()), @@ -67,27 +68,6 @@ pub struct TieredStorageConfig { pub long_term: Box, } -#[derive(Debug)] -enum BackendChoice { - HighVolume, - LongTerm, -} - -impl BackendChoice { - fn as_str(&self) -> &'static str { - match self { - BackendChoice::HighVolume => "high-volume", - BackendChoice::LongTerm => "long-term", - } - } -} - -impl std::fmt::Display for BackendChoice { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - /// Two-tier storage backend that routes objects by size. /// /// `TieredStorage` implements [`Backend`] and is intended to be used inside a @@ -157,6 +137,94 @@ impl TieredStorage { BackendChoice::LongTerm => self.long_term.name(), } } + + /// Puts an object into the high-volume backend. + /// + /// If a tombstone already exists, attempts to swap it for the new object and delete the old + /// long-term object. + async fn put_high_volume( + &self, + id: &ObjectId, + metadata: &Metadata, + payload: Bytes, + ) -> Result<()> { + let tombstone_opt = self + .high_volume + .put_non_tombstone(id, metadata, payload.clone()) + .await?; + + let Some(Tombstone { target, .. }) = tombstone_opt else { + // No tombstone exists - write succeeded + return Ok(()); + }; + + // Tombstone exists — Swap it for inline data + let write = TieredWrite::Object(metadata.clone(), payload.clone()); + let written = self + .high_volume + .compare_and_write(id, Some(&target), write) + .await?; + + // TODO: Schedule cleanups into background to ensure eventual cleanup + if written { + let _ = self.long_term.delete_object(&target).await; + } + + Ok(()) + } + + /// Puts an object into the long-term backend with a redirect tombstone in front. + /// + /// Deletes the previous long-term object if overwriting an existing tombstone. If the tombstone + /// write fails, the new long-term object is cleaned up. + async fn put_long_term( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: ClientStream, + ) -> Result<()> { + // 1. Read current HV revision to establish the write precondition + let current = match self.high_volume.get_tiered_metadata(id).await? { + TieredMetadata::Tombstone(t) => Some(t.target), + _ => None, + }; + + // 2. Write payload to long-term at a unique revision key. + let new = new_long_term_revision(id); + self.long_term.put_object(&new, metadata, stream).await?; + + // 3. CAS commit: write tombstone only if HV state matches what we saw. + let tombstone = Tombstone { + target: new.clone(), + expiration_policy: metadata.expiration_policy, + }; + let written = self + .high_volume + .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone)) + .await; + + // TODO: Schedule cleanups into background to ensure eventual cleanup + match written { + Ok(true) => { + // Tombstone committed. Clean up old GCS blob if overwriting. + if let Some(current) = current { + let _ = self.long_term.delete_object(¤t).await; + } + } + Ok(false) => { + // Someone else won the race. Clean up our GCS blob. + let _ = self.long_term.delete_object(&new).await; + // Return OK — from the caller's perspective, a write happened. + } + Err(e) => { + // CAS error. Clean up our GCS blob before propagating. + let _ = self.long_term.delete_object(&new).await; + return Err(e); + } + } + + Ok(()) + } } #[async_trait::async_trait] @@ -171,124 +239,39 @@ impl Backend for TieredStorage { metadata: &Metadata, stream: ClientStream, ) -> Result { + let start = Instant::now(); if metadata.origin.is_none() { objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned()); } - let start = Instant::now(); - let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?; - let backend_choice = if peeked.is_exhausted() { - BackendChoice::HighVolume - } else { - BackendChoice::LongTerm - }; - objectstore_metrics::record!( "put.first_chunk.latency" = start.elapsed(), usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), + complete = if peeked.is_exhausted() { "yes" } else { "no" }, ); - let (final_choice, stored_size) = match backend_choice { - BackendChoice::HighVolume => { - let payload = peeked.into_bytes().await?; - let stored_size = payload.len() as u64; - - let tombstone_opt = self - .high_volume - .put_non_tombstone(id, metadata, payload.clone()) - .await?; - - if let Some(tombstone) = tombstone_opt { - // Tombstone exists — CAS-swap it for inline data so the object - // ends up in HV with consistent expiry. - let cas_mutation = CasMutation::WriteInline(metadata.clone(), payload.clone()); - let swapped = self - .high_volume - .cas_put(id, Some(&tombstone.target), cas_mutation) - .await?; - - if swapped { - // Commit succeeded. Best-effort cleanup of old GCS blob. - let _ = self.long_term.delete_object(&tombstone.target).await; - } - // If CAS failed, someone else won the race — their write is committed. - } - - (BackendChoice::HighVolume, stored_size) - } - BackendChoice::LongTerm => { - let stored_size = Arc::new(AtomicU64::new(0)); - let stream = peeked - .into_stream() - .inspect({ - let stored_size = Arc::clone(&stored_size); - move |res| { - if let Ok(chunk) = res { - stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed); - } - } - }) - .boxed(); - - // 1. Read current HV state to establish the CAS precondition. - let expected_old = match self.high_volume.get_tiered_metadata(id).await? { - TieredMetadata::Tombstone(t) => Some(t), - _ => None, - }; - - // 2. Write payload to GCS at a unique revision key. - let lt_id = revision_id(id); - self.long_term.put_object(<_id, metadata, stream).await?; - - // 3. CAS commit: write tombstone only if HV state matches what we saw. - let expected_target = expected_old.as_ref().map(|t| &t.target); - let new_tombstone = Tombstone { - target: lt_id.clone(), - expiration_policy: metadata.expiration_policy, - }; - let cas_mutation = CasMutation::WriteTombstone(new_tombstone); - let commit_result = self - .high_volume - .cas_put(id, expected_target, cas_mutation) - .await; - - match commit_result { - Ok(true) => { - // Tombstone committed. Clean up old GCS blob if overwriting. - if let Some(old) = expected_old { - let _ = self.long_term.delete_object(&old.target).await; - } - } - Ok(false) => { - // Someone else won the race. Clean up our GCS blob. - let _ = self.long_term.delete_object(<_id).await; - // Return OK — from the caller's perspective, a write happened. - } - Err(e) => { - // CAS error. Clean up our GCS blob before propagating. - let _ = self.long_term.delete_object(<_id).await; - return Err(e); - } - } - - (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire)) - } + let (backend_choice, stored_size) = if peeked.is_exhausted() { + let payload = peeked.into_bytes().await?; + self.put_high_volume(id, metadata, payload.clone()).await?; + (BackendChoice::HighVolume, payload.len() as u64) + } else { + let (stored_size, stream) = counting_stream(peeked.into_stream()); + self.put_long_term(id, metadata, stream.boxed()).await?; + (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire)) }; - let backend_ty = self.backend_type(&final_choice); - + let backend_ty = self.backend_type(&backend_choice); objectstore_metrics::record!( "put.latency" = start.elapsed(), usecase = id.usecase().to_owned(), - backend_choice = final_choice.as_str(), + backend_choice = backend_choice.as_str(), backend_type = backend_ty, ); objectstore_metrics::record!( "put.size" = stored_size, usecase = id.usecase().to_owned(), - backend_choice = final_choice.as_str(), + backend_choice = backend_choice.as_str(), backend_type = backend_ty, ); @@ -365,20 +348,16 @@ impl Backend for TieredStorage { if let Some(tombstone) = self.high_volume.delete_non_tombstone(id).await? { backend_choice = BackendChoice::LongTerm; - // CAS-delete the tombstone first (commit point), then clean up GCS. - // This is the inverse of the old ordering: if GCS cleanup fails, - // an orphan blob remains (accepted) but the tombstone is gone. - let cas_mutation = CasMutation::Delete; + // Delete the tombstone first, then clean up GCS. let deleted = self .high_volume - .cas_put(id, Some(&tombstone.target), cas_mutation) + .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete) .await?; + // TODO: Schedule cleanups into background to ensure eventual cleanup if deleted { - // Tombstone removed. Best-effort GCS cleanup. let _ = self.long_term.delete_object(&tombstone.target).await; } - // If CAS failed, someone else changed the tombstone — nothing to do. } objectstore_metrics::record!( @@ -392,6 +371,47 @@ impl Backend for TieredStorage { } } +#[derive(Debug)] +enum BackendChoice { + HighVolume, + LongTerm, +} + +impl BackendChoice { + fn as_str(&self) -> &'static str { + match self { + BackendChoice::HighVolume => "high-volume", + BackendChoice::LongTerm => "long-term", + } + } +} + +impl std::fmt::Display for BackendChoice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Wraps a stream to count the total bytes yielded by successful chunks. +/// +/// Returns the shared counter and the wrapped stream. The counter is incremented +/// as the stream is consumed, so read it only after the stream is exhausted. +fn counting_stream(stream: S) -> (Arc, impl Stream>) +where + S: Stream>, +{ + let counter = Arc::new(AtomicU64::new(0)); + + ( + counter.clone(), + stream.inspect(move |res| { + if let Ok(chunk) = res { + counter.fetch_add(chunk.len() as u64, Ordering::Relaxed); + } + }), + ) +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -419,7 +439,7 @@ mod tests { key: "my-key".to_string(), }; - let revised = revision_id(&id); + let revised = new_long_term_revision(&id); assert_eq!(revised.context, id.context); assert!( revised.key.starts_with("my-key/"), @@ -439,7 +459,7 @@ mod tests { key: "original".to_string(), }; - let revised = revision_id(&id); + let revised = new_long_term_revision(&id); let path = revised.as_storage_path().to_string(); let parsed = ObjectId::from_storage_path(&path) .unwrap_or_else(|| panic!("failed to parse '{path}'")); @@ -456,8 +476,8 @@ mod tests { key: "base-key".to_string(), }; - let a = revision_id(&id); - let b = revision_id(&id); + let a = new_long_term_revision(&id); + let b = new_long_term_revision(&id); assert_ne!(a.key, b.key, "two calls should produce different keys"); } @@ -687,29 +707,29 @@ mod tests { self.0.put_non_tombstone(id, metadata, payload).await } + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + self.0.get_tiered_object(id).await + } + + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + self.0.get_tiered_metadata(id).await + } + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { self.0.delete_non_tombstone(id).await } - async fn cas_put( + async fn compare_and_write( &self, _id: &ObjectId, - _expected_redirect: Option<&ObjectId>, - _mutation: CasMutation, + _current: Option<&ObjectId>, + _write: TieredWrite, ) -> Result { Err(Error::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, "simulated tombstone write failure", ))) } - - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - self.0.get_tiered_object(id).await - } - - async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { - self.0.get_tiered_metadata(id).await - } } /// If the tombstone write to the high-volume backend fails after the long-term @@ -865,11 +885,11 @@ mod tests { lt.put_object(<_id, &Default::default(), stream::single(payload.clone())) .await .unwrap(); - let tombstone = crate::backend::common::Tombstone { + let tombstone = Tombstone { target: lt_id.clone(), expiration_policy: objectstore_types::metadata::ExpirationPolicy::Manual, }; - hv.cas_put(&hv_id, None, CasMutation::WriteTombstone(tombstone)) + hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) .await .unwrap(); diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index b480a624..c4c0be9d 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -236,7 +236,7 @@ mod tests { use super::*; use crate::backend::bigtable::{BigTableBackend, BigTableConfig}; use crate::backend::common::{ - CasMutation, HighVolumeBackend, TieredGet, TieredMetadata, Tombstone, + HighVolumeBackend, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::gcs::{GcsBackend, GcsConfig}; use crate::backend::in_memory::InMemoryBackend; @@ -527,34 +527,31 @@ mod tests { self.inner.put_non_tombstone(id, metadata, payload).await } + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + self.inner.get_tiered_object(id).await + } + + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + self.inner.get_tiered_metadata(id).await + } + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { self.inner.delete_non_tombstone(id).await } - async fn cas_put( + async fn compare_and_write( &self, id: &ObjectId, - expected_redirect: Option<&ObjectId>, - mutation: CasMutation, + current: Option<&ObjectId>, + write: TieredWrite, ) -> Result { - let notify = matches!( - mutation, - CasMutation::WriteTombstone(_) | CasMutation::WriteInline(_, _) - ); - let result = self.inner.cas_put(id, expected_redirect, mutation).await?; + let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _)); + let result = self.inner.compare_and_write(id, current, write).await?; if notify { self.on_put.notify_one(); } Ok(result) } - - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - self.inner.get_tiered_object(id).await - } - - async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { - self.inner.get_tiered_metadata(id).await - } } #[tokio::test] From 9e22ef7d6bdded6289d00eab751fa2180db1b8ee Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 21:54:47 +0100 Subject: [PATCH 3/8] ref(service): Extract legacy_tombstone_filter and improve filter comments Factor the shared legacy-metadata branch out of both `tombstone_predicate` and `redirect_target_filter` into a `legacy_tombstone_filter` helper. Improve the `redirect_target_filter` doc comment to show the Chain/Interleave structure as bullet points. Co-Authored-By: Claude --- objectstore-service/src/backend/bigtable.rs | 414 ++++++++------------ 1 file changed, 162 insertions(+), 252 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 9f8398c4..1c33d09d 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -35,13 +35,12 @@ use std::time::{Duration, SystemTime}; use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell}; use bigtable_rs::google::bigtable::v2::{self, mutation}; +use bytes::Bytes; use futures_util::TryStreamExt; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use serde::{Deserialize, Serialize}; use tonic::Code; -use bytes::Bytes; - use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, @@ -192,6 +191,25 @@ fn column_filter(column: &[u8]) -> v2::RowFilter { } } +/// Creates a row filter matching the legacy tombstone format: `m` column JSON starts with +/// `{"is_redirect_tombstone":true`. +/// +/// After legacy tombstones expire naturally this filter becomes dead code in both callers. +fn legacy_tombstone_filter() -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_METADATA), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRegexFilter( + b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), + )), + }, + ], + })), + } +} + /// Creates a row filter that matches any tombstone row, new- or legacy-format. /// /// New format: presence of the `r` column. @@ -207,33 +225,27 @@ fn tombstone_predicate() -> v2::RowFilter { filters: vec![ // Current: redirect column is present. column_filter(COLUMN_REDIRECT), - // Legacy: Metadata starts with `is_redirect_tombstone``. - v2::RowFilter { - filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { - filters: vec![ - column_filter(COLUMN_METADATA), - v2::RowFilter { - filter: Some(v2::row_filter::Filter::ValueRegexFilter( - b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), - )), - }, - ], - })), - }, + // Legacy: metadata column JSON format. + legacy_tombstone_filter(), ], }, )), } } -/// Creates a row filter that matches tombstones whose redirect resolves to `expected_target`. +/// Creates a row filter that matches tombstones whose redirect resolves to `target`. /// -/// Combines an exact value match on the `r` column with additional fallbacks when -/// `expected_target == hv_id` (i.e., the caller expects the legacy identity target): -/// - `r == b""` (empty-sentinel format written before the redirect column stored the path) -/// - legacy `m` column format (`is_redirect_tombstone: true`) -fn redirect_target_filter(expected_target: &ObjectId, hv_id: &ObjectId) -> v2::RowFilter { - let target_path = expected_target.as_storage_path().to_string().into_bytes(); +/// Always includes an exact match on the `r` (redirect) column: +/// - Chain: `r` column present AND value == `target` storage path +/// +/// When `target == own_id` (the caller expects a legacy identity redirect), the +/// exact match is wrapped in an Interleave with two additional fallbacks: +/// - Chain: `r` column present AND value == `b""` (empty-sentinel written before the redirect +/// column stored the path) +/// - Chain: `m` column present AND value matches `{"is_redirect_tombstone":true...}` regex +/// (legacy metadata format predating the dedicated `r` column) +fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + let target_path = target.as_storage_path().to_string().into_bytes(); let exact_match = v2::RowFilter { filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { @@ -251,48 +263,33 @@ fn redirect_target_filter(expected_target: &ObjectId, hv_id: &ObjectId) -> v2::R })), }; - if expected_target == hv_id { - // Also match legacy tombstones that resolve to the HV id: - // - empty `r` value (written before the redirect column stored the path) - // - legacy `m` column format (`is_redirect_tombstone: true`) - let empty_redirect_match = v2::RowFilter { - filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { - filters: vec![ - column_filter(COLUMN_REDIRECT), - v2::RowFilter { - filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { - start_value: Some(v2::value_range::StartValue::StartValueClosed( - vec![], - )), - end_value: Some(v2::value_range::EndValue::EndValueClosed(vec![])), - })), - }, - ], - })), - }; - - let legacy_meta_match = v2::RowFilter { - filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { - filters: vec![ - column_filter(COLUMN_METADATA), - v2::RowFilter { - filter: Some(v2::row_filter::Filter::ValueRegexFilter( - b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), - )), - }, - ], - })), - }; + if target != own_id { + return exact_match; + } - v2::RowFilter { - filter: Some(v2::row_filter::Filter::Interleave( - v2::row_filter::Interleave { - filters: vec![exact_match, empty_redirect_match, legacy_meta_match], + let empty_redirect_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_REDIRECT), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { + start_value: Some(v2::value_range::StartValue::StartValueClosed(vec![])), + end_value: Some(v2::value_range::EndValue::EndValueClosed(vec![])), + })), }, - )), - } - } else { - exact_match + ], + })), + }; + + // Also match legacy tombstones that resolve to the HV id: + // - empty `r` value (written before the redirect column stored the path) + // - legacy `m` column format (`is_redirect_tombstone: true`) + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Interleave( + v2::row_filter::Interleave { + filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()], + }, + )), } } @@ -1338,16 +1335,11 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; let result = backend.get_metadata(&id).await; assert!( @@ -1488,16 +1480,11 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; let result = backend.delete_non_tombstone(&id).await?; let tombstone = result.expect("Some(tombstone)"); @@ -1583,6 +1570,33 @@ mod tests { Ok(()) } + /// Write a new-format tombstone row with an empty `r` value directly, + /// simulating rows written by code before this change. + async fn write_empty_redirect_tombstone( + backend: &BigTableBackend, + id: &ObjectId, + ) -> Result<()> { + let path = id.as_storage_path().to_string().into_bytes(); + let mutations = [ + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_REDIRECT.to_owned(), + timestamp_micros: -1, + value: b"".to_vec(), // empty — legacy format + }), + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), + timestamp_micros: -1, + value: b"{}".to_vec(), + }), + ]; + + backend.mutate(path, mutations, "test-setup").await?; + + Ok(()) + } + /// /// Uses `Manual` expiration so `timestamp_micros = -1` (server-assigned ≈ write time) does /// not trigger immediate expiry. @@ -1604,17 +1618,15 @@ mod tests { // Recreate a fresh tombstone to test the other conditional operations. write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; - let t_opt = backend - .put_non_tombstone(&id, &Metadata::default(), bytes::Bytes::new()) + .put_non_tombstone(&id, &Metadata::default(), Bytes::new()) .await?; // Legacy tombstones resolve to hv_id; target should match id. - assert_eq!(t_opt.as_ref().map(|t| &t.target), Some(&id)); + assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id)); write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; - let t_opt = backend.delete_non_tombstone(&id).await?; - assert_eq!(t_opt.as_ref().map(|t| &t.target), Some(&id)); + assert_eq!(t_opt.map(|t| t.target), Some(id)); Ok(()) } @@ -1688,21 +1700,16 @@ mod tests { } #[tokio::test] - async fn test_cas_put_create_tombstone() -> Result<()> { + async fn test_swap_create_tombstone() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600)); - let committed = backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: id.clone(), - expiration_policy, - }), - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: id.clone(), + expiration_policy, + }); + let committed = backend.compare_and_write(&id, None, write).await?; assert!(committed, "expected CAS success on empty row"); // Both hv methods must surface the tombstone with the correct expiration_policy. @@ -1730,7 +1737,7 @@ mod tests { /// Attempting to create a tombstone when one already exists returns false. #[tokio::test] - async fn test_cas_put_create_tombstone_conflict() -> Result<()> { + async fn test_swap_create_tombstone_conflict() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); @@ -1756,31 +1763,25 @@ mod tests { /// CAS-swapping an existing tombstone for a new one succeeds when the expected target matches. #[tokio::test] - async fn test_cas_put_swap_tombstone() -> Result<()> { + async fn test_swap_tombstone() -> Result<()> { let backend = create_test_backend().await?; let hv_id = make_id(); let old_lt_id = ObjectId::random(hv_id.context().clone()); let new_lt_id = ObjectId::random(hv_id.context().clone()); - let old_tombstone = Tombstone { + let old_write = TieredWrite::Tombstone(Tombstone { target: old_lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, - }; - backend - .compare_and_write(&hv_id, None, TieredWrite::Tombstone(old_tombstone)) - .await?; + }); + backend.compare_and_write(&hv_id, None, old_write).await?; - let new_tombstone = Tombstone { + let new_write = TieredWrite::Tombstone(Tombstone { target: new_lt_id.clone(), expiration_policy: ExpirationPolicy::Manual, - }; + }); let swapped = backend - .compare_and_write( - &hv_id, - Some(&old_lt_id), - TieredWrite::Tombstone(new_tombstone), - ) + .compare_and_write(&hv_id, Some(&old_lt_id), new_write) .await?; assert!(swapped, "expected CAS success"); @@ -1794,7 +1795,7 @@ mod tests { /// CAS-swapping fails when the expected target does not match the current tombstone. #[tokio::test] - async fn test_cas_put_swap_tombstone_mismatch() -> Result<()> { + async fn test_swap_tombstone_mismatch() -> Result<()> { let backend = create_test_backend().await?; let hv_id = make_id(); @@ -1802,26 +1803,18 @@ mod tests { let wrong_lt_id = ObjectId::random(hv_id.context().clone()); let new_lt_id = ObjectId::random(hv_id.context().clone()); - backend - .compare_and_write( - &hv_id, - None, - TieredWrite::Tombstone(Tombstone { - target: actual_lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let old_write = TieredWrite::Tombstone(Tombstone { + target: actual_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&hv_id, None, old_write).await?; + let new_write = TieredWrite::Tombstone(Tombstone { + target: new_lt_id, + expiration_policy: ExpirationPolicy::Manual, + }); let swapped = backend - .compare_and_write( - &hv_id, - Some(&wrong_lt_id), - TieredWrite::Tombstone(Tombstone { - target: new_lt_id, - expiration_policy: ExpirationPolicy::Manual, - }), - ) + .compare_and_write(&hv_id, Some(&wrong_lt_id), new_write) .await?; assert!(!swapped, "expected CAS failure due to wrong target"); @@ -1836,31 +1829,22 @@ mod tests { /// CAS-swapping a tombstone for inline data succeeds when the target matches. #[tokio::test] - async fn test_cas_put_swap_inline() -> Result<()> { + async fn test_swap_inline() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let lt_id = ObjectId::random(id.context().clone()); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let old_write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, old_write).await?; - let metadata = Metadata::default(); - let payload = bytes::Bytes::from(b"hello inline".to_vec()); + let payload = Bytes::from_static(b"hello inline"); + let new_write = TieredWrite::Object(Metadata::default(), payload.clone()); let swapped = backend - .compare_and_write( - &id, - Some(<_id), - TieredWrite::Object(metadata, payload.clone()), - ) + .compare_and_write(&id, Some(<_id), new_write) .await?; assert!(swapped, "expected CAS success"); @@ -1876,30 +1860,22 @@ mod tests { /// Inline-swap fails when the expected target does not match. #[tokio::test] - async fn test_cas_put_swap_inline_mismatch() -> Result<()> { + async fn test_swap_inline_mismatch() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let lt_id = ObjectId::random(id.context().clone()); let wrong_id = ObjectId::random(id.context().clone()); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let old_write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, old_write).await?; + let new_write = TieredWrite::Object(Metadata::default(), Bytes::new()); let swapped = backend - .compare_and_write( - &id, - Some(&wrong_id), - TieredWrite::Object(Metadata::default(), bytes::Bytes::new()), - ) + .compare_and_write(&id, Some(&wrong_id), new_write) .await?; assert!(!swapped, "expected CAS failure"); @@ -1914,22 +1890,17 @@ mod tests { /// CAS-delete succeeds when the expected target matches. #[tokio::test] - async fn test_cas_put_delete() -> Result<()> { + async fn test_swap_delete() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let lt_id = ObjectId::random(id.context().clone()); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; let deleted = backend .compare_and_write(&id, Some(<_id), TieredWrite::Delete) @@ -1946,23 +1917,18 @@ mod tests { /// CAS-delete fails when the expected target does not match. #[tokio::test] - async fn test_cas_put_delete_mismatch() -> Result<()> { + async fn test_swap_delete_mismatch() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let lt_id = ObjectId::random(id.context().clone()); let wrong_id = ObjectId::random(id.context().clone()); - backend - .compare_and_write( - &id, - None, - TieredWrite::Tombstone(Tombstone { - target: lt_id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }), - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; let deleted = backend .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete) @@ -1980,7 +1946,7 @@ mod tests { /// CAS-delete with Some(target) against a regular object returns false. #[tokio::test] - async fn test_cas_put_delete_regular_object() -> Result<()> { + async fn test_swap_delete_regular_object() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); @@ -2001,46 +1967,19 @@ mod tests { Ok(()) } - /// Legacy empty-redirect tombstone (`r=b""`) is matched by `cas_put` when `expected=Some(id)`. + /// Legacy empty-redirect tombstone (`r=b""`) is matched when `expected=Some(id)`. #[tokio::test] - async fn test_cas_put_legacy_empty_redirect() -> Result<()> { + async fn test_swap_legacy_empty_redirect() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); - let path = id.as_storage_path().to_string().into_bytes(); - // Write a tombstone with an empty `r` value (legacy format). - backend - .mutate( - path, - [ - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_REDIRECT.to_owned(), - timestamp_micros: -1, - value: b"".to_vec(), - }), - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), - timestamp_micros: -1, - value: serde_json::to_vec(&TombstoneMeta { - expiration_policy: ExpirationPolicy::Manual, - }) - .unwrap(), - }), - ], - "test-setup", - ) - .await?; + write_empty_redirect_tombstone(&backend, &id).await?; // Expected target = id (the legacy fallback resolves to hv_id). let deleted = backend .compare_and_write(&id, Some(&id), TieredWrite::Delete) .await?; - assert!( - deleted, - "cas_put should match legacy empty-redirect tombstone" - ); + assert!(deleted, "should match legacy empty-redirect tombstone"); assert!(matches!( backend.get_tiered_metadata(&id).await?, @@ -2050,9 +1989,9 @@ mod tests { Ok(()) } - /// Legacy metadata-format tombstone is matched by `cas_put` when `expected=Some(id)`. + /// Legacy metadata-format tombstone is matched when `expected=Some(id)`. #[tokio::test] - async fn test_cas_put_legacy_metadata_format() -> Result<()> { + async fn test_swap_legacy_metadata_format() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); @@ -2062,7 +2001,7 @@ mod tests { let deleted = backend .compare_and_write(&id, Some(&id), TieredWrite::Delete) .await?; - assert!(deleted, "cas_put should match legacy-metadata tombstone"); + assert!(deleted, "should match legacy-metadata tombstone"); assert!(matches!( backend.get_tiered_metadata(&id).await?, @@ -2102,41 +2041,12 @@ mod tests { Ok(()) } - /// A tombstone row with an empty `r` value (written by old code before this change) - /// returns `target = hv_id` via the fallback, and emits `bigtable.empty_redirect_read`. #[tokio::test] async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); - let path = id.as_storage_path().to_string().into_bytes(); - - // Write a new-format tombstone row with an empty `r` value directly, - // simulating rows written by code before this change. - let tombstone_meta = TombstoneMeta { - expiration_policy: ExpirationPolicy::Manual, - }; - let meta_bytes = serde_json::to_vec(&tombstone_meta).unwrap(); - backend - .mutate( - path, - [ - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_REDIRECT.to_owned(), - timestamp_micros: -1, - value: b"".to_vec(), // empty — legacy format - }), - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), - timestamp_micros: -1, - value: meta_bytes, - }), - ], - "test-setup", - ) - .await?; + write_empty_redirect_tombstone(&backend, &id).await?; match backend.get_tiered_metadata(&id).await? { TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must use id"), other => panic!("expected tombstone, got {other:?}"), From 3e77dcfca8e2dc8fea8b48a4fbd4e69f1646321e Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 21:55:47 +0100 Subject: [PATCH 4/8] fix(service): Fix broken intra-doc link on TieredWrite Update doc comment to reference `compare_and_write` instead of the old `tiered_write` method name. Co-Authored-By: Claude --- objectstore-service/src/backend/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index e7253418..a950e47b 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -162,7 +162,7 @@ pub enum TieredMetadata { NotFound, } -/// The write operation performed by [`HighVolumeBackend::tiered_write`]. +/// The write operation performed by [`HighVolumeBackend::compare_and_write`]. #[derive(Debug)] pub enum TieredWrite { /// Write a redirect tombstone. From 5292bdbc4ae10f5e810f9ad83bb66562b217074d Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 22:18:49 +0100 Subject: [PATCH 5/8] docs(service): Document cross-tier consistency and compare-and-swap model The tiered storage documentation still described the pre-atomic write model. Update all three documentation layers to reflect the current compare-and-swap algorithm: - architecture.md: motivates why HighVolumeBackend needs atomic operations and points to the tiered module for details - tiered module doc: describes revision keys and per-operation sequences (large write, small write, delete) with conflict handling - TieredStorage struct doc: concise consistency summary with pointer to the module-level diagrams Also fixes the delete ordering (was backwards) and corrects field name references (high_volume_backend -> high_volume). --- objectstore-service/docs/architecture.md | 37 ++++++--- objectstore-service/src/backend/tiered.rs | 98 ++++++++++++++++++----- 2 files changed, 104 insertions(+), 31 deletions(-) diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index f6497552..41c56abb 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -55,18 +55,16 @@ panic-isolated — a failure in one request does not bring down the service. [`TieredStorage`](backend::tiered::TieredStorage) is the [`Backend`](backend::common::Backend) implementation that provides the two-tier -system. Passing it to [`StorageService::new`] enables the two-tier -configuration. This split exists because no single storage system optimally -handles both small, frequently-accessed objects and large, infrequently-accessed -ones: +system. It is the typical backend passed to [`StorageService::new`], though any +`Backend` implementation can be used. The two-tier split exists because no +single storage system optimally handles both small, frequently-accessed objects +and large, infrequently-accessed ones: - **High-volume backend** (typically [BigTable](backend::StorageConfig::BigTable)): optimized for low-latency reads - and writes of small objects. Must implement - [`HighVolumeBackend`](backend::common::HighVolumeBackend), which adds atomic - operations used by `TieredStorage` to maintain consistency of redirects. - Objects in practice are small (metadata blobs, event attachments, etc.), so - this path handles the majority of traffic by volume. + and writes of small objects. Objects in practice are small (metadata blobs, + event attachments, etc.), so this path handles the majority of traffic by + volume. - **Long-term backend** (typically [GCS](backend::StorageConfig::Gcs)): optimized for large objects and long retention periods where per-byte storage cost matters more than access latency. @@ -80,16 +78,29 @@ See [`backend::StorageConfig`] for available backend implementations. ## Redirect Tombstones For large objects, `TieredStorage` stores a **redirect tombstone** in the -high-volume backend — a marker that signals the real payload lives on the -long-term backend. This allows reads to check only the high-volume backend and -follow the tombstone to long-term storage, without scanning both backends on -every read. +high-volume backend — a marker that carries the target `ObjectId` where the real +payload lives in the long-term backend. Reads check only the high-volume +backend: they either find the object directly (small) or follow the tombstone's +target to long-term storage (large), without probing both backends. How tombstones are physically stored is determined by the [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) implementation. Refer to the backend's own documentation for storage format details. +## Cross-Tier Consistency + +Because a single logical object may span both backends (tombstone in HV, payload +in LT), mutations must keep them in sync without distributed locks. The +high-volume backend must implement +[`HighVolumeBackend`](backend::common::HighVolumeBackend), which provides +compare-and-swap operations that `TieredStorage` uses to atomically commit +cross-tier state changes — rolling back on conflict so that concurrent writers +never corrupt each other's data. + +See the [`backend::tiered`] module documentation for the per-operation +sequences. + # Metadata and Payload Every object consists of structured **metadata** and a binary **payload**. diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 1818594d..ed1fff09 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -2,7 +2,68 @@ //! //! [`TieredStorage`] routes objects to a high-volume or long-term backend based //! on size and maintains redirect tombstones so that reads never need to probe -//! both backends. See the [crate-level documentation](crate) for details. +//! both backends. See the [crate-level documentation](crate) for the high-level +//! motivation, and the [`TieredStorage`] struct docs for routing and tombstone +//! semantics. +//! +//! # Cross-Tier Consistency +//! +//! A single logical object may span both backends: a tombstone in HV pointing +//! to a payload in LT. Mutations keep the two in sync through compare-and-swap +//! on the high-volume backend (see [`HighVolumeBackend::compare_and_write`]). +//! Each operation reads the current HV revision, performs its work, then +//! atomically swaps the HV entry only if the revision is still current — +//! rolling back on conflict. +//! +//! ## Revision Keys +//! +//! Every large-object write stores its payload at a **revision key** in the +//! long-term backend: `{original_key}/{uuid}`. The UUID suffix is random (no +//! monotonicity is guaranteed), so each write targets a distinct LT path +//! regardless of whether another write to the same logical key is in progress. +//! The tombstone in HV then points to this specific revision. Because each +//! writer owns its own LT blob, the compare-and-swap on the tombstone becomes +//! an atomic pointer swap: the winner's revision is committed and the loser +//! can safely delete its own blob without affecting the winner. +//! +//! See `new_long_term_revision` for the key construction. +//! +//! ## Large-Object Write (> 1 MiB) +//! +//! 1. **Read HV** to capture the current revision (existing tombstone target, +//! or absent). +//! 2. **Write payload to LT** at a unique revision key. +//! 3. **Compare-and-swap in HV**: write a tombstone pointing to the new +//! revision, only if the current revision still matches step 1. +//! - **OK** — delete the old LT blob, if any (best-effort). +//! - **Conflict** — another writer won the race; delete our new LT blob. +//! - **Error** — delete our new LT blob, then propagate the error. +//! +//! ## Small-Object Write (≤ 1 MiB) +//! +//! 1. **Write inline to HV**, skipping the write if a tombstone is present. +//! - **OK** — done; the object is stored entirely in HV. +//! - **Tombstone present** — a large object already occupies this key; +//! continue: +//! 2. **Compare-and-swap in HV**: replace the tombstone with inline data, only +//! if the tombstone's revision still matches. +//! - **OK** — delete the old LT blob (best-effort). +//! - **Conflict** — another writer won the race; they will clean up the +//! LT blob and we have no new LT blob to clean up. +//! +//! ## Delete +//! +//! 1. **Delete from HV** if the entry is not a tombstone. +//! - **OK** — done; there is no LT data to clean up. +//! - **Tombstone present** — a large object is stored here; continue: +//! 2. **Compare-and-swap in HV**: remove the tombstone, only if its revision +//! still matches. +//! - **OK** — delete the LT blob (best-effort). +//! - **Conflict** — another writer won the race; they will clean up. +//! +//! Tombstone removal is the commit point for deletes. If the subsequent LT +//! cleanup fails, an orphan blob remains but the object is already unreachable +//! through the normal read path. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; @@ -78,35 +139,36 @@ pub struct TieredStorageConfig { /// /// Objects are routed at write time based on their size relative to a **1 MiB threshold**: /// -/// - Objects **≤ 1 MiB** go to the `high_volume_backend` — optimized for low-latency reads +/// - Objects **≤ 1 MiB** go to the `high_volume` backend — optimized for low-latency reads /// and writes of small objects (e.g. BigTable). -/// - Objects **> 1 MiB** go to the `long_term_backend` — optimized for cost-efficient +/// - Objects **> 1 MiB** go to the `long_term` backend — optimized for cost-efficient /// storage of large objects (e.g. GCS). /// /// # Redirect Tombstones /// -/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object without -/// knowing which backend stores it. A naive approach would check the long-term backend on every -/// read miss in the high-volume backend — but that is slow and expensive. +/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object +/// without knowing which backend stores it. A naive approach would check the long-term +/// backend on every read miss in the high-volume backend — but that is slow and expensive. /// /// Instead, when an object is stored in the long-term backend, a **redirect tombstone** is /// written in the high-volume backend. It acts as a signpost: "the real data lives in the -/// other backend." How tombstones are physically stored is determined by the -/// [`HighVolumeBackend`] implementation — refer to the backend's own documentation for -/// storage format details. +/// other backend at this target." On reads, a single high-volume lookup either returns the +/// object directly or follows the tombstone to long-term storage, without probing both +/// backends. /// -/// # Consistency Without Locks +/// How tombstones are physically stored is determined by the [`HighVolumeBackend`] +/// implementation — refer to the backend's own documentation for storage format details. /// -/// The tombstone system maintains consistency through operation ordering rather than -/// distributed locks. The invariant is: a redirect tombstone is always the **last thing -/// written** and the **last thing removed**. +/// # Consistency /// -/// - On **write**, the real object is persisted before the tombstone. If the tombstone write -/// fails, the real object is rolled back. -/// - On **delete**, the real object is removed before the tombstone. If the long-term delete -/// fails, the tombstone remains and the data stays reachable. +/// Consistency across the two backends is maintained through compare-and-swap +/// operations on the high-volume backend (see +/// [`HighVolumeBackend::compare_and_write`]), not distributed locks. Each +/// mutating operation reads the current high-volume revision, performs its +/// work, and then atomically swaps the high-volume entry only if the revision +/// is still current — rolling back on conflict. /// -/// See the individual methods for per-operation tombstone behavior. +/// See the [module-level documentation](self) for per-operation diagrams. /// /// # Usage /// From 670a283970f4955245581077d5c10e0645e00776 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 19 Mar 2026 22:48:15 +0100 Subject: [PATCH 6/8] test(service): Restructure tiered.rs tests for coverage and clarity Fold redundant tests, fill gaps in CAS conflict paths, and reorganize into named groups matching the operation they exercise. --- objectstore-service/src/backend/tiered.rs | 508 +++++++++++++--------- 1 file changed, 300 insertions(+), 208 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ed1fff09..6de619da 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -478,8 +478,6 @@ where mod tests { use std::time::Duration; - use bytes::BytesMut; - use futures_util::TryStreamExt; use objectstore_types::metadata::ExpirationPolicy; use objectstore_types::scope::{Scope, Scopes}; @@ -487,20 +485,31 @@ mod tests { use crate::backend::in_memory::InMemoryBackend; use crate::error::Error; use crate::id::ObjectContext; - use crate::stream::{self}; + use crate::stream; - // --- revision_id tests --- + fn make_context() -> ObjectContext { + ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + } + } + + fn make_id(key: &str) -> ObjectId { + ObjectId::new(make_context(), key.into()) + } + + fn make_tiered_storage() -> (TieredStorage, InMemoryBackend, InMemoryBackend) { + let hv = InMemoryBackend::new("in-memory-hv"); + let lt = InMemoryBackend::new("in-memory-lt"); + let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone())); + (storage, hv, lt) + } + + // --- new_long_term_revision tests --- #[test] fn revision_id_preserves_context() { - let id = ObjectId { - context: ObjectContext { - usecase: "testing".to_string(), - scopes: Scopes::from_iter([Scope::create("org", "17").unwrap()]), - }, - key: "my-key".to_string(), - }; - + let id = make_id("my-key"); let revised = new_long_term_revision(&id); assert_eq!(revised.context, id.context); assert!( @@ -512,15 +521,7 @@ mod tests { #[test] fn revision_id_roundtrips_storage_path() { - use crate::id::ObjectId; - let id = ObjectId { - context: ObjectContext { - usecase: "attachments".to_string(), - scopes: Scopes::from_iter([Scope::create("org", "42").unwrap()]), - }, - key: "original".to_string(), - }; - + let id = make_id("original"); let revised = new_long_term_revision(&id); let path = revised.as_storage_path().to_string(); let parsed = ObjectId::from_storage_path(&path) @@ -530,39 +531,18 @@ mod tests { #[test] fn revision_id_is_unique() { - let id = ObjectId { - context: ObjectContext { - usecase: "testing".to_string(), - scopes: Scopes::empty(), - }, - key: "base-key".to_string(), - }; - + let id = make_id("base-key"); let a = new_long_term_revision(&id); let b = new_long_term_revision(&id); assert_ne!(a.key, b.key, "two calls should produce different keys"); } - fn make_context() -> ObjectContext { - ObjectContext { - usecase: "testing".into(), - scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), - } - } - - fn make_tiered_storage() -> (TieredStorage, InMemoryBackend, InMemoryBackend) { - let hv = InMemoryBackend::new("in-memory-hv"); - let lt = InMemoryBackend::new("in-memory-lt"); - let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone())); - (storage, hv, lt) - } - // --- Basic behavior --- #[tokio::test] async fn get_nonexistent_returns_none() { let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "does-not-exist".into()); + let id = make_id("does-not-exist"); assert!(storage.get_object(&id).await.unwrap().is_none()); assert!(storage.get_metadata(&id).await.unwrap().is_none()); @@ -571,72 +551,85 @@ mod tests { #[tokio::test] async fn delete_nonexistent_succeeds() { let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "does-not-exist".into()); + let id = make_id("does-not-exist"); storage.delete_object(&id).await.unwrap(); } - #[tokio::test] - async fn put_and_get_roundtrip() { - let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::random(make_context()); - - storage - .put_object(&id, &Default::default(), stream::single("auto-keyed")) - .await - .unwrap(); - - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body: BytesMut = s.try_collect().await.unwrap(); - assert_eq!(body.as_ref(), b"auto-keyed"); - } - - // --- Size-based routing tests --- + // --- Put routing --- #[tokio::test] - async fn small_object_goes_to_high_volume() { + async fn put_small_object_stores_inline() { let (storage, hv, lt) = make_tiered_storage(); - let payload = vec![0u8; 100]; // 100 bytes, well under 1 MiB - let id = ObjectId::new(make_context(), "small".into()); + let id = make_id("small"); + let payload = b"small payload".to_vec(); storage - .put_object(&id, &Default::default(), stream::single(payload)) + .put_object(&id, &Default::default(), stream::single(payload.clone())) .await .unwrap(); assert!(hv.contains(&id), "expected in high-volume"); assert!(!lt.contains(&id), "leaked to long-term"); + + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + assert!( + storage.get_metadata(&id).await.unwrap().is_some(), + "get_metadata should return metadata for inline objects" + ); } #[tokio::test] - async fn large_object_goes_to_long_term_with_tombstone() { + async fn put_large_object_creates_tombstone() { let (storage, hv, lt) = make_tiered_storage(); - let payload_len = 2 * 1024 * 1024; // 2 MiB, over threshold - let payload = vec![0xABu8; payload_len]; - let id = ObjectId::new(make_context(), "large".into()); + let id = make_id("large"); + let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold + let metadata_in = Metadata { + content_type: "image/png".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + origin: Some("10.0.0.1".into()), + ..Default::default() + }; storage - .put_object(&id, &Default::default(), stream::single(payload)) + .put_object(&id, &metadata_in, stream::single(payload.clone())) .await .unwrap(); - // A redirect tombstone should exist in high-volume pointing to a revision key. + // Tombstone in HV: correct expiration_policy, target is a revision key. let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); let lt_id = tombstone.target; assert!( lt_id.key().starts_with(id.key()), - "tombstone target key should be a revision of the HV key" + "tombstone target key should be a revision of the HV key, got: {}", + lt_id.key() ); - // Real payload should be in long-term at the revision key. - let (_, lt_bytes) = lt.get(<_id).expect_object(); - assert_eq!(lt_bytes.len(), payload_len); + // LT object at revision key with correct metadata. + let (lt_meta, _) = lt.get(<_id).expect_object(); + assert_eq!(lt_meta.content_type, "image/png"); + assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); + + // get_object follows the tombstone and returns the correct payload. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // get_metadata follows the tombstone and returns the correct content_type. + let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); + assert_eq!(metadata.content_type, "image/png"); } + // --- Put overwrites --- + #[tokio::test] async fn reinsert_small_over_large_swaps_to_inline() { let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "reinsert-key".into()); + let id = make_id("reinsert-key"); // First: insert a large object → creates tombstone in hv, payload in lt at lt_id let large_payload = vec![0xABu8; 2 * 1024 * 1024]; @@ -663,81 +656,149 @@ mod tests { } #[tokio::test] - async fn tombstone_inherits_expiration_policy() { + async fn overwrite_large_with_large_replaces_revision() { let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("overwrite-large"); - let metadata_in = Metadata { - content_type: "image/png".into(), - expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), - origin: Some("10.0.0.1".into()), - ..Default::default() - }; - let payload = vec![0u8; 2 * 1024 * 1024]; // force long-term - let id = ObjectId::new(make_context(), "expiry-test".into()); + let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; + storage + .put_object(&id, &Default::default(), stream::single(payload1)) + .await + .unwrap(); + let lt_id_1 = hv.get(&id).expect_tombstone().target; + let payload2 = vec![0xBBu8; 2 * 1024 * 1024]; storage - .put_object(&id, &metadata_in, stream::single(payload)) + .put_object(&id, &Default::default(), stream::single(payload2.clone())) .await .unwrap(); + let lt_id_2 = hv.get(&id).expect_tombstone().target; - // The tombstone in hv should have expiration_policy inherited. - // The target is a unique revision key, not `id` itself. - let tombstone = hv.get(&id).expect_tombstone(); - assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); - let lt_id = tombstone.target.clone(); assert_ne!( - lt_id, id, - "tombstone target must be a unique revision, not the HV id" - ); - assert!( - lt_id.key().starts_with(id.key()), - "revision key should have original key as prefix" + lt_id_1, lt_id_2, + "second write should create a new revision" ); + lt.get(<_id_1).expect_not_found(); + lt.get(<_id_2).expect_object(); - // The long-term object should be at the revision key with the full metadata. - let (lt_meta, _) = lt.get(<_id).expect_object(); - assert_eq!(lt_meta.content_type, "image/png"); - assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload2); } - // --- Tombstone redirect tests --- + // --- Delete --- #[tokio::test] - async fn reads_follow_tombstone_redirect() { - let (storage, _hv, _lt) = make_tiered_storage(); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold + async fn delete_small_object() { + let (storage, hv, _lt) = make_tiered_storage(); + let id = make_id("delete-small"); - let metadata_in = Metadata { - content_type: "image/png".into(), - ..Default::default() - }; - let id = ObjectId::new(make_context(), "redirect-read".into()); + storage + .put_object(&id, &Default::default(), stream::single("tiny")) + .await + .unwrap(); + + storage.delete_object(&id).await.unwrap(); + + hv.get(&id).expect_not_found(); + assert!(storage.get_object(&id).await.unwrap().is_none()); + } + + #[tokio::test] + async fn delete_large_object_cleans_up_both_backends() { + let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("delete-both"); + let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB storage - .put_object(&id, &metadata_in, stream::single(payload.clone())) + .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - // get_object should transparently follow the tombstone - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body = stream::read_to_vec(s).await.unwrap(); - assert_eq!(body, payload); + // Capture lt_id before deleting (it lives at the revision key, not at id). + let lt_id = hv.get(&id).expect_tombstone().target; - // get_metadata should also follow the tombstone - let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); - assert_eq!(metadata.content_type, "image/png"); + storage.delete_object(&id).await.unwrap(); + + assert!(!hv.contains(&id), "tombstone not cleaned up"); + assert!(!lt.contains(<_id), "long-term object not cleaned up"); + } + + /// A backend wrapper that delegates everything except `delete_object`, which always fails. + #[derive(Debug)] + struct FailingDeleteBackend(InMemoryBackend); + + #[async_trait::async_trait] + impl Backend for FailingDeleteBackend { + fn name(&self) -> &'static str { + "failing-delete" + } + + async fn put_object( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: ClientStream, + ) -> Result { + self.0.put_object(id, metadata, stream).await + } + + async fn get_object(&self, id: &ObjectId) -> Result { + self.0.get_object(id).await + } + + async fn delete_object(&self, _id: &ObjectId) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "simulated long-term delete failure", + ))) + } + } + + /// When the long-term GCS cleanup fails after the tombstone is deleted, the + /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may + /// remain in LT storage, which is accepted. + #[tokio::test] + async fn delete_succeeds_when_gcs_cleanup_fails() { + let hv = InMemoryBackend::new("hv"); + let lt = FailingDeleteBackend(InMemoryBackend::new("lt")); + let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt)); + + let id = make_id("fail-delete"); + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term + storage + .put_object(&id, &Default::default(), stream::single(payload)) + .await + .unwrap(); + + // Delete succeeds even though GCS cleanup fails (it is best-effort). + let result = storage.delete_object(&id).await; + assert!( + result.is_ok(), + "delete should succeed despite GCS cleanup failure" + ); + + // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup). + hv.get(&id).expect_not_found(); + + // The orphaned GCS blob remains but the object is unreachable through the service. + assert!( + storage.get_object(&id).await.unwrap().is_none(), + "object should be unreachable after tombstone is deleted" + ); } - // --- Tombstone inconsistency tests --- + // --- CAS conflicts --- - /// A backend where `cas_put` always fails, but all other operations work normally. + /// A backend wrapper that delegates everything except `compare_and_write`, which always + /// returns `Ok(false)` to simulate a lost CAS race. #[derive(Debug)] - struct FailingTombstoneBackend(InMemoryBackend); + struct ConflictingCasBackend(InMemoryBackend); #[async_trait::async_trait] - impl Backend for FailingTombstoneBackend { + impl Backend for ConflictingCasBackend { fn name(&self) -> &'static str { - "failing-tombstone" + "conflicting-cas" } async fn put_object( @@ -759,7 +820,7 @@ mod tests { } #[async_trait::async_trait] - impl HighVolumeBackend for FailingTombstoneBackend { + impl HighVolumeBackend for ConflictingCasBackend { async fn put_non_tombstone( &self, id: &ObjectId, @@ -787,92 +848,74 @@ mod tests { _current: Option<&ObjectId>, _write: TieredWrite, ) -> Result { - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "simulated tombstone write failure", - ))) + Ok(false) // always conflict } } - /// If the tombstone write to the high-volume backend fails after the long-term - /// write succeeds, the long-term object must be cleaned up so we never leave - /// an unreachable orphan in long-term storage. + /// After a large-object write loses the CAS race, the new LT blob must be + /// cleaned up. The put still returns `Ok(())` — from the caller's view, a + /// concurrent write won. #[tokio::test] - async fn no_orphan_when_tombstone_write_fails() { + async fn put_large_cas_conflict_cleans_up_new_blob() { + let hv = ConflictingCasBackend(InMemoryBackend::new("hv")); let lt = InMemoryBackend::new("lt"); - let hv = FailingTombstoneBackend(InMemoryBackend::new("hv")); let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone())); - let id = ObjectId::new(make_context(), "orphan-test".into()); + let id = make_id("cas-conflict-large"); let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path - let result = storage - .put_object(&id, &Default::default(), stream::single(payload)) - .await; - - assert!(result.is_err()); - assert!(lt.is_empty(), "long-term object not cleaned up"); - } - - /// If a tombstone exists in high-volume but the corresponding object is - /// missing from long-term storage (e.g. due to a race condition or partial - /// cleanup), reads should gracefully return None rather than error. - #[tokio::test] - async fn orphan_tombstone_returns_none() { - let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "orphan-tombstone".into()); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB storage .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - // The object is at the revision key in LT, not at id. - let lt_id = hv.get(&id).expect_tombstone().target; - - // Remove the long-term object, leaving an orphan tombstone in hv - lt.remove(<_id); - assert!( - storage.get_object(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_object" - ); - assert!( - storage.get_metadata(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_metadata" + lt.is_empty(), + "LT blob should be cleaned up after CAS conflict" ); } - // --- Delete tests --- - + /// When swapping a tombstone for inline data, a CAS conflict means another + /// writer won. The put still returns `Ok(())` — no LT blob was written, so + /// there is nothing to clean up. #[tokio::test] - async fn delete_cleans_up_both_backends() { - let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "delete-both".into()); - let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB + async fn put_small_over_tombstone_cas_conflict_succeeds() { + let inner = InMemoryBackend::new("hv"); + let id = make_id("cas-conflict-small"); - storage - .put_object(&id, &Default::default(), stream::single(payload)) + // Pre-seed a tombstone directly in the inner backend so put_non_tombstone + // returns it instead of writing inline. + let tombstone = Tombstone { + target: make_id("lt-object"), + expiration_policy: ExpirationPolicy::Manual, + }; + inner + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) .await .unwrap(); - // Capture lt_id before deleting (it lives at the revision key, not at id). - let lt_id = hv.get(&id).expect_tombstone().target; - - storage.delete_object(&id).await.unwrap(); + let lt = InMemoryBackend::new("lt"); + let hv = ConflictingCasBackend(inner); + let storage = TieredStorage::new(Box::new(hv), Box::new(lt)); - assert!(!hv.contains(&id), "tombstone not cleaned up"); - assert!(!lt.contains(<_id), "long-term object not cleaned up"); + // Writing a small object over a tombstone should succeed even when CAS + // conflicts — the other writer's write is accepted. + storage + .put_object(&id, &Default::default(), stream::single("tiny")) + .await + .unwrap(); } - /// A backend wrapper that delegates everything except `delete_object`, which always fails. + // --- Failure / inconsistency --- + + /// A backend where `compare_and_write` always errors, but all other operations work normally. #[derive(Debug)] - struct FailingDeleteBackend(InMemoryBackend); + struct FailingCasBackend(InMemoryBackend); #[async_trait::async_trait] - impl Backend for FailingDeleteBackend { + impl Backend for FailingCasBackend { fn name(&self) -> &'static str { - "failing-delete" + "failing-cas" } async fn put_object( @@ -888,48 +931,97 @@ mod tests { self.0.get_object(id).await } - async fn delete_object(&self, _id: &ObjectId) -> Result { + async fn delete_object(&self, id: &ObjectId) -> Result { + self.0.delete_object(id).await + } + } + + #[async_trait::async_trait] + impl HighVolumeBackend for FailingCasBackend { + async fn put_non_tombstone( + &self, + id: &ObjectId, + metadata: &Metadata, + payload: bytes::Bytes, + ) -> Result> { + self.0.put_non_tombstone(id, metadata, payload).await + } + + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + self.0.get_tiered_object(id).await + } + + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + self.0.get_tiered_metadata(id).await + } + + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { + self.0.delete_non_tombstone(id).await + } + + async fn compare_and_write( + &self, + _id: &ObjectId, + _current: Option<&ObjectId>, + _write: TieredWrite, + ) -> Result { Err(Error::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, - "simulated long-term delete failure", + "simulated compare_and_write failure", ))) } } - /// When the long-term GCS cleanup fails after the tombstone is deleted, the - /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may - /// remain in LT storage, which is accepted. + /// If the tombstone write to the high-volume backend fails after the long-term + /// write succeeds, the long-term object must be cleaned up so we never leave + /// an unreachable orphan in long-term storage. #[tokio::test] - async fn delete_succeeds_when_gcs_cleanup_fails() { - let hv = InMemoryBackend::new("hv"); - let lt = FailingDeleteBackend(InMemoryBackend::new("lt")); - let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt)); + async fn no_orphan_when_tombstone_write_fails() { + let lt = InMemoryBackend::new("lt"); + let hv = FailingCasBackend(InMemoryBackend::new("hv")); + let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone())); + + let id = make_id("orphan-test"); + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path + let result = storage + .put_object(&id, &Default::default(), stream::single(payload)) + .await; + + assert!(result.is_err()); + assert!(lt.is_empty(), "long-term object not cleaned up"); + } + + /// If a tombstone exists in high-volume but the corresponding object is + /// missing from long-term storage (e.g. due to a race condition or partial + /// cleanup), reads should gracefully return None rather than error. + #[tokio::test] + async fn orphan_tombstone_returns_none() { + let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("orphan-tombstone"); + let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB - let id = ObjectId::new(make_context(), "fail-delete".into()); - let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term storage - .put_object(&id, &Default::default(), stream::single(payload.clone())) + .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - // Delete succeeds even though GCS cleanup fails (it is best-effort). - let result = storage.delete_object(&id).await; - assert!( - result.is_ok(), - "delete should succeed despite GCS cleanup failure" - ); + // The object is at the revision key in LT, not at id. + let lt_id = hv.get(&id).expect_tombstone().target; - // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup). - hv.get(&id).expect_not_found(); + // Remove the long-term object, leaving an orphan tombstone in hv + lt.remove(<_id); - // The orphaned GCS blob remains but the object is unreachable through the service. assert!( storage.get_object(&id).await.unwrap().is_none(), - "object should be unreachable after tombstone is deleted" + "orphan tombstone should resolve to None on get_object" + ); + assert!( + storage.get_metadata(&id).await.unwrap().is_none(), + "orphan tombstone should resolve to None on get_metadata" ); } - // --- Redirect target tests --- + // --- Redirect target --- /// A tombstone carrying an explicit `target` is followed correctly on reads and deletes, /// including when the target ObjectId differs from the HV ObjectId. @@ -939,8 +1031,8 @@ mod tests { let lt = InMemoryBackend::new("lt"); let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone())); - let hv_id = ObjectId::new(make_context(), "hv-key".into()); - let lt_id = ObjectId::new(make_context(), "lt-key".into()); + let hv_id = make_id("hv-key"); + let lt_id = make_id("lt-key"); let payload = vec![0xABu8; 100]; // Write the object under the LT id and a tombstone pointing to it from HV. @@ -949,7 +1041,7 @@ mod tests { .unwrap(); let tombstone = Tombstone { target: lt_id.clone(), - expiration_policy: objectstore_types::metadata::ExpirationPolicy::Manual, + expiration_policy: ExpirationPolicy::Manual, }; hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) .await @@ -966,12 +1058,12 @@ mod tests { assert!(!lt.contains(<_id), "lt object should be removed"); } - // --- Multi-chunk streaming tests --- + // --- Multi-chunk --- #[tokio::test] async fn multi_chunk_large_object_chains_buffered_and_remaining() { let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "multi-chunk".into()); + let id = make_id("multi-chunk"); // Deliver a 2 MiB payload across multiple chunks that individually // fit under the threshold but collectively exceed it. From 9bbf7b259970c79532a050a2f7fd31750604106d Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 08:24:52 +0100 Subject: [PATCH 7/8] meta: Document last-writer-wins semantics --- objectstore-service/src/backend/tiered.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 6de619da..cd087188 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -64,6 +64,17 @@ //! Tombstone removal is the commit point for deletes. If the subsequent LT //! cleanup fails, an orphan blob remains but the object is already unreachable //! through the normal read path. +//! +//! ## Last-Writer-Wins +//! +//! Concurrent mutations on the same key are inherently a race. Even a write +//! that returns `Ok` may be immediately overwritten by another caller — there +//! is no ordering guarantee and objectstore cannot provide a read-your-writes +//! promise. +//! +//! CAS conflicts are therefore **not errors**: the losing writer's data is +//! cleaned up and `Ok` is returned, because the result is indistinguishable +//! from having succeeded a moment earlier and then been overwritten. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; @@ -232,6 +243,9 @@ impl TieredStorage { let _ = self.long_term.delete_object(&target).await; } + // Else: Another writer won the race. This is not an error - + // see "Last-Writer-Wins" in module docs. + Ok(()) } @@ -274,9 +288,9 @@ impl TieredStorage { } } Ok(false) => { - // Someone else won the race. Clean up our GCS blob. + // Another writer won the race. Clean up our GCS blob. + // This is not an error - see "Last-Writer-Wins" in module docs. let _ = self.long_term.delete_object(&new).await; - // Return OK — from the caller's perspective, a write happened. } Err(e) => { // CAS error. Clean up our GCS blob before propagating. @@ -420,6 +434,9 @@ impl Backend for TieredStorage { if deleted { let _ = self.long_term.delete_object(&tombstone.target).await; } + + // Else: Another writer won the race. This is not an error - + // see "Last-Writer-Wins" in module docs. } objectstore_metrics::record!( From 13e67f64487f8df9fc5922eaa15acc5957cab6f9 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 08:36:59 +0100 Subject: [PATCH 8/8] meta: Further improve --- objectstore-service/src/backend/tiered.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index cd087188..94432e44 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -28,7 +28,13 @@ //! //! See `new_long_term_revision` for the key construction. //! -//! ## Large-Object Write (> 1 MiB) +//! ## Compare-and-Swap +//! +//! All mutating operations follow a common pattern of reading the current +//! revision, performing the upload, atomically swapping the revision (commit +//! point), and finally cleaning up old objects: +//! +//! ### Large-Object Write (> 1 MiB) //! //! 1. **Read HV** to capture the current revision (existing tombstone target, //! or absent). @@ -39,7 +45,7 @@ //! - **Conflict** — another writer won the race; delete our new LT blob. //! - **Error** — delete our new LT blob, then propagate the error. //! -//! ## Small-Object Write (≤ 1 MiB) +//! ### Small-Object Write (≤ 1 MiB) //! //! 1. **Write inline to HV**, skipping the write if a tombstone is present. //! - **OK** — done; the object is stored entirely in HV. @@ -51,7 +57,7 @@ //! - **Conflict** — another writer won the race; they will clean up the //! LT blob and we have no new LT blob to clean up. //! -//! ## Delete +//! ### Delete //! //! 1. **Delete from HV** if the entry is not a tombstone. //! - **OK** — done; there is no LT data to clean up.