diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index f6497552..41c56abb 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -55,18 +55,16 @@ panic-isolated — a failure in one request does not bring down the service. [`TieredStorage`](backend::tiered::TieredStorage) is the [`Backend`](backend::common::Backend) implementation that provides the two-tier -system. Passing it to [`StorageService::new`] enables the two-tier -configuration. This split exists because no single storage system optimally -handles both small, frequently-accessed objects and large, infrequently-accessed -ones: +system. It is the typical backend passed to [`StorageService::new`], though any +`Backend` implementation can be used. The two-tier split exists because no +single storage system optimally handles both small, frequently-accessed objects +and large, infrequently-accessed ones: - **High-volume backend** (typically [BigTable](backend::StorageConfig::BigTable)): optimized for low-latency reads - and writes of small objects. Must implement - [`HighVolumeBackend`](backend::common::HighVolumeBackend), which adds atomic - operations used by `TieredStorage` to maintain consistency of redirects. - Objects in practice are small (metadata blobs, event attachments, etc.), so - this path handles the majority of traffic by volume. + and writes of small objects. Objects in practice are small (metadata blobs, + event attachments, etc.), so this path handles the majority of traffic by + volume. - **Long-term backend** (typically [GCS](backend::StorageConfig::Gcs)): optimized for large objects and long retention periods where per-byte storage cost matters more than access latency. @@ -80,16 +78,29 @@ See [`backend::StorageConfig`] for available backend implementations. ## Redirect Tombstones For large objects, `TieredStorage` stores a **redirect tombstone** in the -high-volume backend — a marker that signals the real payload lives on the -long-term backend. This allows reads to check only the high-volume backend and -follow the tombstone to long-term storage, without scanning both backends on -every read. +high-volume backend — a marker that carries the target `ObjectId` where the real +payload lives in the long-term backend. Reads check only the high-volume +backend: they either find the object directly (small) or follow the tombstone's +target to long-term storage (large), without probing both backends. How tombstones are physically stored is determined by the [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) implementation. Refer to the backend's own documentation for storage format details. +## Cross-Tier Consistency + +Because a single logical object may span both backends (tombstone in HV, payload +in LT), mutations must keep them in sync without distributed locks. The +high-volume backend must implement +[`HighVolumeBackend`](backend::common::HighVolumeBackend), which provides +compare-and-swap operations that `TieredStorage` uses to atomically commit +cross-tier state changes — rolling back on conflict so that concurrent writers +never corrupt each other's data. + +See the [`backend::tiered`] module documentation for the per-operation +sequences. + # Metadata and Payload Every object consists of structured **metadata** and a binary **payload**. diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index ad3cac9b..1c33d09d 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -35,16 +35,15 @@ use std::time::{Duration, SystemTime}; use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell}; use bigtable_rs::google::bigtable::v2::{self, mutation}; +use bytes::Bytes; use futures_util::TryStreamExt; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use serde::{Deserialize, Serialize}; use tonic::Code; -use bytes::Bytes; - use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, Tombstone, + TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::gcp_auth::PrefetchingTokenProvider; @@ -192,6 +191,25 @@ fn column_filter(column: &[u8]) -> v2::RowFilter { } } +/// Creates a row filter matching the legacy tombstone format: `m` column JSON starts with +/// `{"is_redirect_tombstone":true`. +/// +/// After legacy tombstones expire naturally this filter becomes dead code in both callers. +fn legacy_tombstone_filter() -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_METADATA), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRegexFilter( + b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), + )), + }, + ], + })), + } +} + /// Creates a row filter that matches any tombstone row, new- or legacy-format. /// /// New format: presence of the `r` column. @@ -207,25 +225,74 @@ fn tombstone_predicate() -> v2::RowFilter { filters: vec![ // Current: redirect column is present. column_filter(COLUMN_REDIRECT), - // Legacy: Metadata starts with `is_redirect_tombstone``. - v2::RowFilter { - filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { - filters: vec![ - column_filter(COLUMN_METADATA), - v2::RowFilter { - filter: Some(v2::row_filter::Filter::ValueRegexFilter( - b"^\\{\"is_redirect_tombstone\":true[,}].*".to_vec(), - )), - }, - ], - })), - }, + // Legacy: metadata column JSON format. + legacy_tombstone_filter(), ], }, )), } } +/// Creates a row filter that matches tombstones whose redirect resolves to `target`. +/// +/// Always includes an exact match on the `r` (redirect) column: +/// - Chain: `r` column present AND value == `target` storage path +/// +/// When `target == own_id` (the caller expects a legacy identity redirect), the +/// exact match is wrapped in an Interleave with two additional fallbacks: +/// - Chain: `r` column present AND value == `b""` (empty-sentinel written before the redirect +/// column stored the path) +/// - Chain: `m` column present AND value matches `{"is_redirect_tombstone":true...}` regex +/// (legacy metadata format predating the dedicated `r` column) +fn redirect_target_filter(target: &ObjectId, own_id: &ObjectId) -> v2::RowFilter { + let target_path = target.as_storage_path().to_string().into_bytes(); + + let exact_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_REDIRECT), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { + start_value: Some(v2::value_range::StartValue::StartValueClosed( + target_path.clone(), + )), + end_value: Some(v2::value_range::EndValue::EndValueClosed(target_path)), + })), + }, + ], + })), + }; + + if target != own_id { + return exact_match; + } + + let empty_redirect_match = v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_REDIRECT), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::ValueRangeFilter(v2::ValueRange { + start_value: Some(v2::value_range::StartValue::StartValueClosed(vec![])), + end_value: Some(v2::value_range::EndValue::EndValueClosed(vec![])), + })), + }, + ], + })), + }; + + // Also match legacy tombstones that resolve to the HV id: + // - empty `r` value (written before the redirect column stored the path) + // - legacy `m` column format (`is_redirect_tombstone: true`) + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Interleave( + v2::row_filter::Interleave { + filters: vec![exact_match, empty_redirect_match, legacy_tombstone_filter()], + }, + )), + } +} + /// Creates a row filter that reads all non-payload columns (`m`, `r`, `t`). /// /// Used by metadata-only reads to avoid fetching the (potentially large) payload column @@ -775,6 +842,59 @@ impl HighVolumeBackend for BigTableBackend { Err(Error::generic("BigTable: race loop in put_non_tombstone")) } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + tracing::debug!("Reading from Bigtable backend"); + let path = id.as_storage_path().to_string().into_bytes(); + + let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else { + return Ok(TieredGet::NotFound); + }; + + if row.needs_tti_bump() { + self.bump_tti(path.clone(), &row, true, id).await; + } + + Ok(match row { + RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone { + target: parse_redirect_target(&target, id)?, + expiration_policy: meta.expiration_policy, + }), + RowData::Object { metadata, payload } => { + let mut metadata = metadata; + metadata.size = Some(payload.len()); + TieredGet::Object(metadata, crate::stream::single(payload)) + } + }) + } + + #[tracing::instrument(level = "trace", fields(?id), skip_all)] + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + tracing::debug!("Reading metadata from Bigtable backend"); + let path = id.as_storage_path().to_string().into_bytes(); + + // Read metadata and tombstone columns — skip the (potentially large) payload. + // NB: `metadata.size` will not be populated since the payload is not fetched. + let row_opt = self + .read_row(&path, Some(metadata_filter()), "get_tiered_metadata") + .await?; + let Some(row) = row_opt else { + return Ok(TieredMetadata::NotFound); + }; + + if row.needs_tti_bump() { + self.bump_tti(path.clone(), &row, false, id).await; + } + + Ok(match row { + RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone { + target: parse_redirect_target(&target, id)?, + expiration_policy: meta.expiration_policy, + }), + RowData::Object { metadata, .. } => TieredMetadata::Object(metadata), + }) + } + #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { tracing::debug!("Conditional delete from Bigtable backend"); @@ -834,65 +954,72 @@ impl HighVolumeBackend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - tracing::debug!("Writing tombstone to Bigtable backend"); - let path = id.as_storage_path().to_string().into_bytes(); - self.put_tombstone_row(path, &tombstone, "create_tombstone") - .await?; - Ok(()) - } + async fn compare_and_write( + &self, + id: &ObjectId, + current: Option<&ObjectId>, + write: TieredWrite, + ) -> Result { + tracing::debug!("CAS put to Bigtable backend"); - #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_tiered_object(&self, id: &ObjectId) -> Result { - tracing::debug!("Reading from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); - - let Some(row) = self.read_row(&path, None, "get_tiered_object").await? else { - return Ok(TieredGet::NotFound); - }; - - if row.needs_tti_bump() { - self.bump_tti(path.clone(), &row, true, id).await; - } - - Ok(match row { - RowData::Tombstone { meta, target, .. } => TieredGet::Tombstone(Tombstone { - target: parse_redirect_target(&target, id)?, - expiration_policy: meta.expiration_policy, - }), - RowData::Object { metadata, payload } => { - let mut metadata = metadata; - metadata.size = Some(payload.len()); - TieredGet::Object(metadata, crate::stream::single(payload)) + let now = SystemTime::now(); + + let write_mutations: Vec = match write { + TieredWrite::Tombstone(tombstone) => { + let mutations = build_tombstone_mutations(&tombstone, now)?; + mutations + .into_iter() + .map(|m| v2::Mutation { mutation: Some(m) }) + .collect() } - }) - } - - #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { - tracing::debug!("Reading metadata from Bigtable backend"); - let path = id.as_storage_path().to_string().into_bytes(); + TieredWrite::Object(metadata, payload) => { + let mutations = build_write_mutations(&metadata, payload.to_vec(), now)?; + mutations + .into_iter() + .map(|m| v2::Mutation { mutation: Some(m) }) + .collect() + } + TieredWrite::Delete => { + let delete = mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}); + vec![v2::Mutation { + mutation: Some(delete), + }] + } + }; - // Read metadata and tombstone columns — skip the (potentially large) payload. - // NB: `metadata.size` will not be populated since the payload is not fetched. - let row_opt = self - .read_row(&path, Some(metadata_filter()), "get_tiered_metadata") - .await?; - let Some(row) = row_opt else { - return Ok(TieredMetadata::NotFound); + let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current { + // Success = no tombstone. tombstone_predicate matches = tombstone found = fail. + // Write on false_mutations (no tombstone); noop on true_mutations (tombstone). + None => (tombstone_predicate(), vec![], write_mutations, false), + // Success = redirect matches target. Predicate match = CAS success. + Some(target) => ( + redirect_target_filter(target, id), + write_mutations, + vec![], + true, + ), }; - if row.needs_tti_bump() { - self.bump_tti(path.clone(), &row, false, id).await; - } + let request = v2::CheckAndMutateRowRequest { + table_name: self.table_path.clone(), + row_key: path, + predicate_filter: Some(predicate_filter), + true_mutations, + false_mutations, + ..Default::default() + }; - Ok(match row { - RowData::Tombstone { meta, target, .. } => TieredMetadata::Tombstone(Tombstone { - target: parse_redirect_target(&target, id)?, - expiration_policy: meta.expiration_policy, - }), - RowData::Object { metadata, .. } => TieredMetadata::Object(metadata), + let predicate_matched = retry("compare_and_write", || async { + self.bigtable + .client() + .check_and_mutate_row(request.clone()) + .await }) + .await? + .predicate_matched; + + Ok(predicate_matched == success_on_match) } } @@ -1208,15 +1335,11 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - backend - .create_tombstone( - &id, - Tombstone { - target: id.clone(), - expiration_policy: ExpirationPolicy::Manual, - }, - ) - .await?; + let write = TieredWrite::Tombstone(Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; let result = backend.get_metadata(&id).await; assert!( @@ -1357,12 +1480,11 @@ mod tests { let backend = create_test_backend().await?; let id = make_id(); - let tombstone = Tombstone { + let write = TieredWrite::Tombstone(Tombstone { target: id.clone(), expiration_policy: ExpirationPolicy::Manual, - }; - - backend.create_tombstone(&id, tombstone).await?; + }); + backend.compare_and_write(&id, None, write).await?; let result = backend.delete_non_tombstone(&id).await?; let tombstone = result.expect("Some(tombstone)"); @@ -1448,6 +1570,33 @@ mod tests { Ok(()) } + /// Write a new-format tombstone row with an empty `r` value directly, + /// simulating rows written by code before this change. + async fn write_empty_redirect_tombstone( + backend: &BigTableBackend, + id: &ObjectId, + ) -> Result<()> { + let path = id.as_storage_path().to_string().into_bytes(); + let mutations = [ + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_REDIRECT.to_owned(), + timestamp_micros: -1, + value: b"".to_vec(), // empty — legacy format + }), + mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_MANUAL.to_owned(), + column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), + timestamp_micros: -1, + value: b"{}".to_vec(), + }), + ]; + + backend.mutate(path, mutations, "test-setup").await?; + + Ok(()) + } + /// /// Uses `Manual` expiration so `timestamp_micros = -1` (server-assigned ≈ write time) does /// not trigger immediate expiry. @@ -1467,13 +1616,17 @@ mod tests { TieredGet::Tombstone(_) )); + // Recreate a fresh tombstone to test the other conditional operations. + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; let t_opt = backend - .put_non_tombstone(&id, &Metadata::default(), bytes::Bytes::new()) + .put_non_tombstone(&id, &Metadata::default(), Bytes::new()) .await?; - assert_eq!(t_opt.as_ref(), Some(&t)); + // Legacy tombstones resolve to hv_id; target should match id. + assert_eq!(t_opt.map(|t| t.target).as_ref(), Some(&id)); + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; let t_opt = backend.delete_non_tombstone(&id).await?; - assert_eq!(t_opt, Some(t)); + assert_eq!(t_opt.map(|t| t.target), Some(id)); Ok(()) } @@ -1547,16 +1700,17 @@ mod tests { } #[tokio::test] - async fn test_create_tombstone_round_trip() -> Result<()> { + async fn test_swap_create_tombstone() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); let expiration_policy = ExpirationPolicy::TimeToLive(Duration::from_secs(3600)); - let tombstone = Tombstone { + let write = TieredWrite::Tombstone(Tombstone { target: id.clone(), expiration_policy, - }; - backend.create_tombstone(&id, tombstone).await?; + }); + let committed = backend.compare_and_write(&id, None, write).await?; + assert!(committed, "expected CAS success on empty row"); // Both hv methods must surface the tombstone with the correct expiration_policy. let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&id).await? else { @@ -1581,6 +1735,282 @@ mod tests { Ok(()) } + /// Attempting to create a tombstone when one already exists returns false. + #[tokio::test] + async fn test_swap_create_tombstone_conflict() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let tombstone = Tombstone { + target: id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + let first = backend + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone.clone())) + .await?; + assert!(first, "first write should succeed"); + + let second = backend + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) + .await?; + assert!( + !second, + "second write should fail: tombstone already exists" + ); + + Ok(()) + } + + /// CAS-swapping an existing tombstone for a new one succeeds when the expected target matches. + #[tokio::test] + async fn test_swap_tombstone() -> Result<()> { + let backend = create_test_backend().await?; + + let hv_id = make_id(); + let old_lt_id = ObjectId::random(hv_id.context().clone()); + let new_lt_id = ObjectId::random(hv_id.context().clone()); + + let old_write = TieredWrite::Tombstone(Tombstone { + target: old_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&hv_id, None, old_write).await?; + + let new_write = TieredWrite::Tombstone(Tombstone { + target: new_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + let swapped = backend + .compare_and_write(&hv_id, Some(&old_lt_id), new_write) + .await?; + assert!(swapped, "expected CAS success"); + + let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else { + panic!("expected tombstone"); + }; + assert_eq!(t.target, new_lt_id, "tombstone target must be updated"); + + Ok(()) + } + + /// CAS-swapping fails when the expected target does not match the current tombstone. + #[tokio::test] + async fn test_swap_tombstone_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let hv_id = make_id(); + let actual_lt_id = ObjectId::random(hv_id.context().clone()); + let wrong_lt_id = ObjectId::random(hv_id.context().clone()); + let new_lt_id = ObjectId::random(hv_id.context().clone()); + + let old_write = TieredWrite::Tombstone(Tombstone { + target: actual_lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&hv_id, None, old_write).await?; + + let new_write = TieredWrite::Tombstone(Tombstone { + target: new_lt_id, + expiration_policy: ExpirationPolicy::Manual, + }); + let swapped = backend + .compare_and_write(&hv_id, Some(&wrong_lt_id), new_write) + .await?; + assert!(!swapped, "expected CAS failure due to wrong target"); + + // Row unchanged. + let TieredMetadata::Tombstone(t) = backend.get_tiered_metadata(&hv_id).await? else { + panic!("expected tombstone"); + }; + assert_eq!(t.target, actual_lt_id, "tombstone target must be unchanged"); + + Ok(()) + } + + /// CAS-swapping a tombstone for inline data succeeds when the target matches. + #[tokio::test] + async fn test_swap_inline() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + + let old_write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, old_write).await?; + + let payload = Bytes::from_static(b"hello inline"); + let new_write = TieredWrite::Object(Metadata::default(), payload.clone()); + let swapped = backend + .compare_and_write(&id, Some(<_id), new_write) + .await?; + assert!(swapped, "expected CAS success"); + + // Row is now an inline object. + let TieredGet::Object(_, stream) = backend.get_tiered_object(&id).await? else { + panic!("expected inline object after swap"); + }; + let body = crate::stream::read_to_vec(stream).await?; + assert_eq!(body, payload.as_ref()); + + Ok(()) + } + + /// Inline-swap fails when the expected target does not match. + #[tokio::test] + async fn test_swap_inline_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + let wrong_id = ObjectId::random(id.context().clone()); + + let old_write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, old_write).await?; + + let new_write = TieredWrite::Object(Metadata::default(), Bytes::new()); + let swapped = backend + .compare_and_write(&id, Some(&wrong_id), new_write) + .await?; + assert!(!swapped, "expected CAS failure"); + + // Tombstone still present. + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::Tombstone(_) + )); + + Ok(()) + } + + /// CAS-delete succeeds when the expected target matches. + #[tokio::test] + async fn test_swap_delete() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + + let write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; + + let deleted = backend + .compare_and_write(&id, Some(<_id), TieredWrite::Delete) + .await?; + assert!(deleted, "expected CAS delete success"); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + + /// CAS-delete fails when the expected target does not match. + #[tokio::test] + async fn test_swap_delete_mismatch() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let lt_id = ObjectId::random(id.context().clone()); + let wrong_id = ObjectId::random(id.context().clone()); + + let write = TieredWrite::Tombstone(Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }); + backend.compare_and_write(&id, None, write).await?; + + let deleted = backend + .compare_and_write(&id, Some(&wrong_id), TieredWrite::Delete) + .await?; + assert!(!deleted, "expected CAS failure"); + + // Row preserved. + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::Tombstone(_) + )); + + Ok(()) + } + + /// CAS-delete with Some(target) against a regular object returns false. + #[tokio::test] + async fn test_swap_delete_regular_object() -> Result<()> { + let backend = create_test_backend().await?; + + let id = make_id(); + let fake_lt_id = ObjectId::random(id.context().clone()); + + backend + .put_object(&id, &Metadata::default(), crate::stream::single("data")) + .await?; + + let deleted = backend + .compare_and_write(&id, Some(&fake_lt_id), TieredWrite::Delete) + .await?; + assert!(!deleted, "expected false: row is not a tombstone"); + + // Object preserved. + assert!(backend.get_object(&id).await?.is_some()); + + Ok(()) + } + + /// Legacy empty-redirect tombstone (`r=b""`) is matched when `expected=Some(id)`. + #[tokio::test] + async fn test_swap_legacy_empty_redirect() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + + write_empty_redirect_tombstone(&backend, &id).await?; + + // Expected target = id (the legacy fallback resolves to hv_id). + let deleted = backend + .compare_and_write(&id, Some(&id), TieredWrite::Delete) + .await?; + assert!(deleted, "should match legacy empty-redirect tombstone"); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + + /// Legacy metadata-format tombstone is matched when `expected=Some(id)`. + #[tokio::test] + async fn test_swap_legacy_metadata_format() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + + write_legacy_tombstone(&backend, &id, ExpirationPolicy::Manual, None).await?; + + // Legacy tombstones resolve to hv_id, so expected target = id. + let deleted = backend + .compare_and_write(&id, Some(&id), TieredWrite::Delete) + .await?; + assert!(deleted, "should match legacy-metadata tombstone"); + + assert!(matches!( + backend.get_tiered_metadata(&id).await?, + TieredMetadata::NotFound + )); + + Ok(()) + } + /// The redirect pointer stored in the `r` column survives a Bigtable write and read: /// `target` on write equals `target` on read, and the parsed `ObjectId` matches. #[tokio::test] @@ -1595,7 +2025,9 @@ mod tests { expiration_policy: ExpirationPolicy::Manual, }; - backend.create_tombstone(&hv_id, tombstone).await?; + backend + .compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) + .await?; match backend.get_tiered_metadata(&hv_id).await? { TieredMetadata::Tombstone(t) => assert_eq!(t.target, lt_id, "target must match"), @@ -1609,41 +2041,12 @@ mod tests { Ok(()) } - /// A tombstone row with an empty `r` value (written by old code before this change) - /// returns `target = hv_id` via the fallback, and emits `bigtable.empty_redirect_read`. #[tokio::test] async fn test_empty_redirect_falls_back_to_hv_id() -> Result<()> { let backend = create_test_backend().await?; let id = make_id(); - let path = id.as_storage_path().to_string().into_bytes(); - - // Write a new-format tombstone row with an empty `r` value directly, - // simulating rows written by code before this change. - let tombstone_meta = TombstoneMeta { - expiration_policy: ExpirationPolicy::Manual, - }; - let meta_bytes = serde_json::to_vec(&tombstone_meta).unwrap(); - backend - .mutate( - path, - [ - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_REDIRECT.to_owned(), - timestamp_micros: -1, - value: b"".to_vec(), // empty — legacy format - }), - mutation::Mutation::SetCell(mutation::SetCell { - family_name: FAMILY_MANUAL.to_owned(), - column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), - timestamp_micros: -1, - value: meta_bytes, - }), - ], - "test-setup", - ) - .await?; + write_empty_redirect_tombstone(&backend, &id).await?; match backend.get_tiered_metadata(&id).await? { TieredMetadata::Tombstone(t) => assert_eq!(t.target, id, "must use id"), other => panic!("expected tombstone, got {other:?}"), diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index 96991f14..a950e47b 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -10,53 +10,6 @@ use crate::error::Result; use crate::id::ObjectId; use crate::stream::{ClientStream, PayloadStream}; -/// Information about a redirect tombstone in the high-volume backend. -#[derive(Clone, Debug, PartialEq)] -pub struct Tombstone { - /// The [`ObjectId`] of the object in the long-term backend. - /// - /// For legacy tombstones with an empty `r` column, the HV backend resolves - /// this to the HV `ObjectId` itself before surfacing the tombstone to callers. - pub target: ObjectId, - - /// The expiration policy copied from the original object. - pub expiration_policy: ExpirationPolicy, -} - -/// Typed response from [`HighVolumeBackend::get_tiered_object`]. -pub enum TieredGet { - /// A real object was found. - Object(Metadata, PayloadStream), - /// A redirect tombstone was found; the real object lives in the long-term backend. - Tombstone(Tombstone), - /// No entry exists at this key. - NotFound, -} - -impl fmt::Debug for TieredGet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TieredGet::Object(metadata, _stream) => f - .debug_tuple("Object") - .field(metadata) - .finish_non_exhaustive(), - TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(), - TieredGet::NotFound => write!(f, "NotFound"), - } - } -} - -/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`]. -#[derive(Debug)] -pub enum TieredMetadata { - /// Metadata for a real object was found. - Object(Metadata), - /// A redirect tombstone was found; the real object lives in the long-term backend. - Tombstone(Tombstone), - /// No entry exists at this key. - NotFound, -} - /// User agent string used for outgoing requests. /// /// This intentionally has a "sentry" prefix so that it can easily be traced back to us. @@ -145,11 +98,79 @@ pub trait HighVolumeBackend: Backend { /// without a second round trip. async fn delete_non_tombstone(&self, id: &ObjectId) -> Result>; - /// Writes a redirect tombstone for the given object. + /// Atomically mutates the row if the current redirect state matches. + /// + /// `current` determines the precondition: + /// - `None`: succeeds only if no tombstone exists (row absent or inline). + /// - `Some(target)`: succeeds only if a tombstone exists whose redirect + /// resolves to `target`. /// - /// A tombstone signals that the real object lives in the long-term backend - /// identified by the tombstone's [`target`](Tombstone::target) ID. - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()>; + /// On match, applies `write`. Returns `true` on success, `false` if the + /// precondition was not met (row state changed concurrently). + async fn compare_and_write( + &self, + id: &ObjectId, + current: Option<&ObjectId>, + write: TieredWrite, + ) -> Result; +} + +/// Information about a redirect tombstone in the high-volume backend. +#[derive(Clone, Debug, PartialEq)] +pub struct Tombstone { + /// The [`ObjectId`] of the object in the long-term backend. + /// + /// For legacy tombstones with an empty `r` column, the HV backend resolves + /// this to the HV `ObjectId` itself before surfacing the tombstone to callers. + pub target: ObjectId, + + /// The expiration policy copied from the original object. + pub expiration_policy: ExpirationPolicy, +} + +/// Typed response from [`HighVolumeBackend::get_tiered_object`]. +pub enum TieredGet { + /// A real object was found. + Object(Metadata, PayloadStream), + /// A redirect tombstone was found; the real object lives in the long-term backend. + Tombstone(Tombstone), + /// No entry exists at this key. + NotFound, +} + +impl fmt::Debug for TieredGet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TieredGet::Object(metadata, _stream) => f + .debug_tuple("Object") + .field(metadata) + .finish_non_exhaustive(), + TieredGet::Tombstone(info) => f.debug_tuple("Tombstone").field(info).finish(), + TieredGet::NotFound => write!(f, "NotFound"), + } + } +} + +/// Typed metadata-only response from [`HighVolumeBackend::get_tiered_metadata`]. +#[derive(Debug)] +pub enum TieredMetadata { + /// Metadata for a real object was found. + Object(Metadata), + /// A redirect tombstone was found; the real object lives in the long-term backend. + Tombstone(Tombstone), + /// No entry exists at this key. + NotFound, +} + +/// The write operation performed by [`HighVolumeBackend::compare_and_write`]. +#[derive(Debug)] +pub enum TieredWrite { + /// Write a redirect tombstone. + Tombstone(Tombstone), + /// Write inline object data. + Object(Metadata, Bytes), + /// Delete the row entirely. + Delete, } /// Creates a reqwest client with required defaults. diff --git a/objectstore-service/src/backend/in_memory.rs b/objectstore-service/src/backend/in_memory.rs index 0597cf14..46dfcd07 100644 --- a/objectstore-service/src/backend/in_memory.rs +++ b/objectstore-service/src/backend/in_memory.rs @@ -13,7 +13,8 @@ use futures_util::TryStreamExt; use objectstore_types::metadata::Metadata; use super::common::{ - DeleteResponse, GetResponse, PutResponse, TieredGet, TieredMetadata, Tombstone, + DeleteResponse, GetResponse, HighVolumeBackend, PutResponse, TieredGet, TieredMetadata, + TieredWrite, Tombstone, }; use crate::error::{Error, Result}; use crate::id::ObjectId; @@ -115,7 +116,7 @@ impl super::common::Backend for InMemoryBackend { } #[async_trait::async_trait] -impl super::common::HighVolumeBackend for InMemoryBackend { +impl HighVolumeBackend for InMemoryBackend { async fn put_non_tombstone( &self, id: &ObjectId, @@ -133,24 +134,6 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Ok(None) } - async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { - let mut store = self.store.lock().unwrap(); - if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() { - return Ok(Some(tombstone)); - } - - store.remove(id); - Ok(None) - } - - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - self.store - .lock() - .unwrap() - .insert(id.clone(), StoreEntry::Tombstone(tombstone)); - Ok(()) - } - async fn get_tiered_object(&self, id: &ObjectId) -> Result { let entry = self.store.lock().unwrap().get(id).cloned(); Ok(match entry { @@ -171,6 +154,50 @@ impl super::common::HighVolumeBackend for InMemoryBackend { Some(StoreEntry::Object(metadata, _bytes)) => TieredMetadata::Object(metadata), }) } + + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { + let mut store = self.store.lock().unwrap(); + if let Some(StoreEntry::Tombstone(tombstone)) = store.get(id).cloned() { + return Ok(Some(tombstone)); + } + + store.remove(id); + Ok(None) + } + + async fn compare_and_write( + &self, + id: &ObjectId, + current: Option<&ObjectId>, + write: TieredWrite, + ) -> Result { + let mut store = self.store.lock().unwrap(); + let actual = store.get(id); + + let matches = match current { + None => !matches!(actual, Some(StoreEntry::Tombstone(_))), + Some(target) => matches!( + actual, + Some(StoreEntry::Tombstone(t)) if t.target == *target + ), + }; + + if matches { + match write { + TieredWrite::Tombstone(tombstone) => { + store.insert(id.clone(), StoreEntry::Tombstone(tombstone)); + } + TieredWrite::Object(metadata, payload) => { + store.insert(id.clone(), StoreEntry::Object(metadata, payload)); + } + TieredWrite::Delete => { + store.remove(id); + } + } + } + + Ok(matches) + } } /// Type returned by [`InMemoryBackend::get`] for direct inspection of stored entries. diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ad30f3f1..94432e44 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -2,19 +2,98 @@ //! //! [`TieredStorage`] routes objects to a high-volume or long-term backend based //! on size and maintains redirect tombstones so that reads never need to probe -//! both backends. See the [crate-level documentation](crate) for details. +//! both backends. See the [crate-level documentation](crate) for the high-level +//! motivation, and the [`TieredStorage`] struct docs for routing and tombstone +//! semantics. +//! +//! # Cross-Tier Consistency +//! +//! A single logical object may span both backends: a tombstone in HV pointing +//! to a payload in LT. Mutations keep the two in sync through compare-and-swap +//! on the high-volume backend (see [`HighVolumeBackend::compare_and_write`]). +//! Each operation reads the current HV revision, performs its work, then +//! atomically swaps the HV entry only if the revision is still current — +//! rolling back on conflict. +//! +//! ## Revision Keys +//! +//! Every large-object write stores its payload at a **revision key** in the +//! long-term backend: `{original_key}/{uuid}`. The UUID suffix is random (no +//! monotonicity is guaranteed), so each write targets a distinct LT path +//! regardless of whether another write to the same logical key is in progress. +//! The tombstone in HV then points to this specific revision. Because each +//! writer owns its own LT blob, the compare-and-swap on the tombstone becomes +//! an atomic pointer swap: the winner's revision is committed and the loser +//! can safely delete its own blob without affecting the winner. +//! +//! See `new_long_term_revision` for the key construction. +//! +//! ## Compare-and-Swap +//! +//! All mutating operations follow a common pattern of reading the current +//! revision, performing the upload, atomically swapping the revision (commit +//! point), and finally cleaning up old objects: +//! +//! ### Large-Object Write (> 1 MiB) +//! +//! 1. **Read HV** to capture the current revision (existing tombstone target, +//! or absent). +//! 2. **Write payload to LT** at a unique revision key. +//! 3. **Compare-and-swap in HV**: write a tombstone pointing to the new +//! revision, only if the current revision still matches step 1. +//! - **OK** — delete the old LT blob, if any (best-effort). +//! - **Conflict** — another writer won the race; delete our new LT blob. +//! - **Error** — delete our new LT blob, then propagate the error. +//! +//! ### Small-Object Write (≤ 1 MiB) +//! +//! 1. **Write inline to HV**, skipping the write if a tombstone is present. +//! - **OK** — done; the object is stored entirely in HV. +//! - **Tombstone present** — a large object already occupies this key; +//! continue: +//! 2. **Compare-and-swap in HV**: replace the tombstone with inline data, only +//! if the tombstone's revision still matches. +//! - **OK** — delete the old LT blob (best-effort). +//! - **Conflict** — another writer won the race; they will clean up the +//! LT blob and we have no new LT blob to clean up. +//! +//! ### Delete +//! +//! 1. **Delete from HV** if the entry is not a tombstone. +//! - **OK** — done; there is no LT data to clean up. +//! - **Tombstone present** — a large object is stored here; continue: +//! 2. **Compare-and-swap in HV**: remove the tombstone, only if its revision +//! still matches. +//! - **OK** — delete the LT blob (best-effort). +//! - **Conflict** — another writer won the race; they will clean up. +//! +//! Tombstone removal is the commit point for deletes. If the subsequent LT +//! cleanup fails, an orphan blob remains but the object is already unreachable +//! through the normal read path. +//! +//! ## Last-Writer-Wins +//! +//! Concurrent mutations on the same key are inherently a race. Even a write +//! that returns `Ok` may be immediately overwritten by another caller — there +//! is no ordering guarantee and objectstore cannot provide a read-your-writes +//! promise. +//! +//! CAS conflicts are therefore **not errors**: the losing writer's data is +//! cleaned up and `Ok` is returned, because the result is indistinguishable +//! from having succeeded a moment earlier and then been overwritten. use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; -use futures_util::StreamExt; +use bytes::Bytes; +use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; use serde::{Deserialize, Serialize}; use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, - TieredGet, TieredMetadata, Tombstone, + TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::{HighVolumeStorageConfig, StorageConfig}; use crate::error::Result; @@ -24,6 +103,18 @@ use crate::stream::{ClientStream, SizedPeek}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +/// Creates a new [`ObjectId`] with the same context but a unique revision key. +/// +/// The new key has the format `{original_key}/{uuid_v7}`, producing a distinct +/// storage path for each large-object write. [`ObjectId::from_storage_path`] parses +/// the result back correctly because the key portion may contain `/`. +fn new_long_term_revision(id: &ObjectId) -> ObjectId { + ObjectId { + context: id.context.clone(), + key: format!("{}/{}", id.key, uuid::Uuid::now_v7()), + } +} + /// Configuration for [`TieredStorage`]. /// /// Composes two backends into a tiered routing setup: `high_volume` for small @@ -55,27 +146,6 @@ pub struct TieredStorageConfig { pub long_term: Box, } -#[derive(Debug)] -enum BackendChoice { - HighVolume, - LongTerm, -} - -impl BackendChoice { - fn as_str(&self) -> &'static str { - match self { - BackendChoice::HighVolume => "high-volume", - BackendChoice::LongTerm => "long-term", - } - } -} - -impl std::fmt::Display for BackendChoice { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - /// Two-tier storage backend that routes objects by size. /// /// `TieredStorage` implements [`Backend`] and is intended to be used inside a @@ -86,35 +156,36 @@ impl std::fmt::Display for BackendChoice { /// /// Objects are routed at write time based on their size relative to a **1 MiB threshold**: /// -/// - Objects **≤ 1 MiB** go to the `high_volume_backend` — optimized for low-latency reads +/// - Objects **≤ 1 MiB** go to the `high_volume` backend — optimized for low-latency reads /// and writes of small objects (e.g. BigTable). -/// - Objects **> 1 MiB** go to the `long_term_backend` — optimized for cost-efficient +/// - Objects **> 1 MiB** go to the `long_term` backend — optimized for cost-efficient /// storage of large objects (e.g. GCS). /// /// # Redirect Tombstones /// -/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object without -/// knowing which backend stores it. A naive approach would check the long-term backend on every -/// read miss in the high-volume backend — but that is slow and expensive. +/// Because the [`ObjectId`] is backend-independent, reads must be able to find an object +/// without knowing which backend stores it. A naive approach would check the long-term +/// backend on every read miss in the high-volume backend — but that is slow and expensive. /// /// Instead, when an object is stored in the long-term backend, a **redirect tombstone** is /// written in the high-volume backend. It acts as a signpost: "the real data lives in the -/// other backend." How tombstones are physically stored is determined by the -/// [`HighVolumeBackend`] implementation — refer to the backend's own documentation for -/// storage format details. +/// other backend at this target." On reads, a single high-volume lookup either returns the +/// object directly or follows the tombstone to long-term storage, without probing both +/// backends. /// -/// # Consistency Without Locks +/// How tombstones are physically stored is determined by the [`HighVolumeBackend`] +/// implementation — refer to the backend's own documentation for storage format details. /// -/// The tombstone system maintains consistency through operation ordering rather than -/// distributed locks. The invariant is: a redirect tombstone is always the **last thing -/// written** and the **last thing removed**. +/// # Consistency /// -/// - On **write**, the real object is persisted before the tombstone. If the tombstone write -/// fails, the real object is rolled back. -/// - On **delete**, the real object is removed before the tombstone. If the long-term delete -/// fails, the tombstone remains and the data stays reachable. +/// Consistency across the two backends is maintained through compare-and-swap +/// operations on the high-volume backend (see +/// [`HighVolumeBackend::compare_and_write`]), not distributed locks. Each +/// mutating operation reads the current high-volume revision, performs its +/// work, and then atomically swaps the high-volume entry only if the revision +/// is still current — rolling back on conflict. /// -/// See the individual methods for per-operation tombstone behavior. +/// See the [module-level documentation](self) for per-operation diagrams. /// /// # Usage /// @@ -145,6 +216,97 @@ impl TieredStorage { BackendChoice::LongTerm => self.long_term.name(), } } + + /// Puts an object into the high-volume backend. + /// + /// If a tombstone already exists, attempts to swap it for the new object and delete the old + /// long-term object. + async fn put_high_volume( + &self, + id: &ObjectId, + metadata: &Metadata, + payload: Bytes, + ) -> Result<()> { + let tombstone_opt = self + .high_volume + .put_non_tombstone(id, metadata, payload.clone()) + .await?; + + let Some(Tombstone { target, .. }) = tombstone_opt else { + // No tombstone exists - write succeeded + return Ok(()); + }; + + // Tombstone exists — Swap it for inline data + let write = TieredWrite::Object(metadata.clone(), payload.clone()); + let written = self + .high_volume + .compare_and_write(id, Some(&target), write) + .await?; + + // TODO: Schedule cleanups into background to ensure eventual cleanup + if written { + let _ = self.long_term.delete_object(&target).await; + } + + // Else: Another writer won the race. This is not an error - + // see "Last-Writer-Wins" in module docs. + + Ok(()) + } + + /// Puts an object into the long-term backend with a redirect tombstone in front. + /// + /// Deletes the previous long-term object if overwriting an existing tombstone. If the tombstone + /// write fails, the new long-term object is cleaned up. + async fn put_long_term( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: ClientStream, + ) -> Result<()> { + // 1. Read current HV revision to establish the write precondition + let current = match self.high_volume.get_tiered_metadata(id).await? { + TieredMetadata::Tombstone(t) => Some(t.target), + _ => None, + }; + + // 2. Write payload to long-term at a unique revision key. + let new = new_long_term_revision(id); + self.long_term.put_object(&new, metadata, stream).await?; + + // 3. CAS commit: write tombstone only if HV state matches what we saw. + let tombstone = Tombstone { + target: new.clone(), + expiration_policy: metadata.expiration_policy, + }; + let written = self + .high_volume + .compare_and_write(id, current.as_ref(), TieredWrite::Tombstone(tombstone)) + .await; + + // TODO: Schedule cleanups into background to ensure eventual cleanup + match written { + Ok(true) => { + // Tombstone committed. Clean up old GCS blob if overwriting. + if let Some(current) = current { + let _ = self.long_term.delete_object(¤t).await; + } + } + Ok(false) => { + // Another writer won the race. Clean up our GCS blob. + // This is not an error - see "Last-Writer-Wins" in module docs. + let _ = self.long_term.delete_object(&new).await; + } + Err(e) => { + // CAS error. Clean up our GCS blob before propagating. + let _ = self.long_term.delete_object(&new).await; + return Err(e); + } + } + + Ok(()) + } } #[async_trait::async_trait] @@ -159,95 +321,39 @@ impl Backend for TieredStorage { metadata: &Metadata, stream: ClientStream, ) -> Result { + let start = Instant::now(); if metadata.origin.is_none() { objectstore_metrics::count!("put.origin_missing", usecase = id.usecase().to_owned()); } - let start = Instant::now(); - let peeked = SizedPeek::new(stream, BACKEND_SIZE_THRESHOLD).await?; - let backend_choice = if peeked.is_exhausted() { - BackendChoice::HighVolume - } else { - BackendChoice::LongTerm - }; - objectstore_metrics::record!( "put.first_chunk.latency" = start.elapsed(), usecase = id.usecase().to_owned(), - backend_choice = backend_choice.as_str(), + complete = if peeked.is_exhausted() { "yes" } else { "no" }, ); - let (final_choice, stored_size) = match backend_choice { - BackendChoice::HighVolume => { - let payload = peeked.into_bytes().await?; - let stored_size = payload.len() as u64; - - let tombstone_opt = self - .high_volume - .put_non_tombstone(id, metadata, payload.clone()) - .await?; - - if let Some(Tombstone { target, .. }) = tombstone_opt { - // Tombstone already exists in HV — write to long-term instead. - // TODO: The new object's expiry may differ from the tombstone's, - // leaving them inconsistent. This is a known gap and will be fixed - // in a follow-up. - let stream = crate::stream::single(payload).boxed(); - self.long_term.put_object(&target, metadata, stream).await?; - (BackendChoice::LongTerm, stored_size) - } else { - (BackendChoice::HighVolume, stored_size) - } - } - BackendChoice::LongTerm => { - let stored_size = Arc::new(AtomicU64::new(0)); - let stream = peeked - .into_stream() - .inspect({ - let stored_size = Arc::clone(&stored_size); - move |res| { - if let Ok(chunk) = res { - stored_size.fetch_add(chunk.len() as u64, Ordering::Relaxed); - } - } - }) - .boxed(); - - // First write the object to long-term. - let long_term_id = id.clone(); - self.long_term - .put_object(&long_term_id, metadata, stream) - .await?; - - // Then write the redirect tombstone to high-volume. - let tombstone = Tombstone { - target: long_term_id.clone(), - expiration_policy: metadata.expiration_policy, - }; - - if let Err(e) = self.high_volume.create_tombstone(id, tombstone).await { - // Clean up on any kind of error. - self.long_term.delete_object(&long_term_id).await?; - return Err(e); - } - - (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire)) - } + let (backend_choice, stored_size) = if peeked.is_exhausted() { + let payload = peeked.into_bytes().await?; + self.put_high_volume(id, metadata, payload.clone()).await?; + (BackendChoice::HighVolume, payload.len() as u64) + } else { + let (stored_size, stream) = counting_stream(peeked.into_stream()); + self.put_long_term(id, metadata, stream.boxed()).await?; + (BackendChoice::LongTerm, stored_size.load(Ordering::Acquire)) }; - let backend_ty = self.backend_type(&final_choice); - + let backend_ty = self.backend_type(&backend_choice); objectstore_metrics::record!( "put.latency" = start.elapsed(), usecase = id.usecase().to_owned(), - backend_choice = final_choice.as_str(), + backend_choice = backend_choice.as_str(), backend_type = backend_ty, ); objectstore_metrics::record!( "put.size" = stored_size, usecase = id.usecase().to_owned(), - backend_choice = final_choice.as_str(), + backend_choice = backend_choice.as_str(), backend_type = backend_ty, ); @@ -324,11 +430,19 @@ impl Backend for TieredStorage { if let Some(tombstone) = self.high_volume.delete_non_tombstone(id).await? { backend_choice = BackendChoice::LongTerm; - // Delete the long-term object first, then clean up the tombstone. - // This ordering ensures that if the long-term delete fails, the - // tombstone remains and the data is still reachable (not orphaned). - self.long_term.delete_object(&tombstone.target).await?; - self.high_volume.delete_object(id).await?; + // Delete the tombstone first, then clean up GCS. + let deleted = self + .high_volume + .compare_and_write(id, Some(&tombstone.target), TieredWrite::Delete) + .await?; + + // TODO: Schedule cleanups into background to ensure eventual cleanup + if deleted { + let _ = self.long_term.delete_object(&tombstone.target).await; + } + + // Else: Another writer won the race. This is not an error - + // see "Last-Writer-Wins" in module docs. } objectstore_metrics::record!( @@ -342,12 +456,51 @@ impl Backend for TieredStorage { } } +#[derive(Debug)] +enum BackendChoice { + HighVolume, + LongTerm, +} + +impl BackendChoice { + fn as_str(&self) -> &'static str { + match self { + BackendChoice::HighVolume => "high-volume", + BackendChoice::LongTerm => "long-term", + } + } +} + +impl std::fmt::Display for BackendChoice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Wraps a stream to count the total bytes yielded by successful chunks. +/// +/// Returns the shared counter and the wrapped stream. The counter is incremented +/// as the stream is consumed, so read it only after the stream is exhausted. +fn counting_stream(stream: S) -> (Arc, impl Stream>) +where + S: Stream>, +{ + let counter = Arc::new(AtomicU64::new(0)); + + ( + counter.clone(), + stream.inspect(move |res| { + if let Ok(chunk) = res { + counter.fetch_add(chunk.len() as u64, Ordering::Relaxed); + } + }), + ) +} + #[cfg(test)] mod tests { use std::time::Duration; - use bytes::BytesMut; - use futures_util::TryStreamExt; use objectstore_types::metadata::ExpirationPolicy; use objectstore_types::scope::{Scope, Scopes}; @@ -355,7 +508,7 @@ mod tests { use crate::backend::in_memory::InMemoryBackend; use crate::error::Error; use crate::id::ObjectContext; - use crate::stream::{self}; + use crate::stream; fn make_context() -> ObjectContext { ObjectContext { @@ -364,6 +517,10 @@ mod tests { } } + fn make_id(key: &str) -> ObjectId { + ObjectId::new(make_context(), key.into()) + } + fn make_tiered_storage() -> (TieredStorage, InMemoryBackend, InMemoryBackend) { let hv = InMemoryBackend::new("in-memory-hv"); let lt = InMemoryBackend::new("in-memory-lt"); @@ -371,12 +528,44 @@ mod tests { (storage, hv, lt) } + // --- new_long_term_revision tests --- + + #[test] + fn revision_id_preserves_context() { + let id = make_id("my-key"); + let revised = new_long_term_revision(&id); + assert_eq!(revised.context, id.context); + assert!( + revised.key.starts_with("my-key/"), + "revised key should have / suffix, got: {}", + revised.key + ); + } + + #[test] + fn revision_id_roundtrips_storage_path() { + let id = make_id("original"); + let revised = new_long_term_revision(&id); + let path = revised.as_storage_path().to_string(); + let parsed = ObjectId::from_storage_path(&path) + .unwrap_or_else(|| panic!("failed to parse '{path}'")); + assert_eq!(parsed, revised); + } + + #[test] + fn revision_id_is_unique() { + let id = make_id("base-key"); + let a = new_long_term_revision(&id); + let b = new_long_term_revision(&id); + assert_ne!(a.key, b.key, "two calls should produce different keys"); + } + // --- Basic behavior --- #[tokio::test] async fn get_nonexistent_returns_none() { let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "does-not-exist".into()); + let id = make_id("does-not-exist"); assert!(storage.get_object(&id).await.unwrap().is_none()); assert!(storage.get_metadata(&id).await.unwrap().is_none()); @@ -385,163 +574,254 @@ mod tests { #[tokio::test] async fn delete_nonexistent_succeeds() { let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "does-not-exist".into()); + let id = make_id("does-not-exist"); storage.delete_object(&id).await.unwrap(); } - #[tokio::test] - async fn put_and_get_roundtrip() { - let (storage, _hv, _lt) = make_tiered_storage(); - let id = ObjectId::random(make_context()); - - storage - .put_object(&id, &Default::default(), stream::single("auto-keyed")) - .await - .unwrap(); - - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body: BytesMut = s.try_collect().await.unwrap(); - assert_eq!(body.as_ref(), b"auto-keyed"); - } - - // --- Size-based routing tests --- + // --- Put routing --- #[tokio::test] - async fn small_object_goes_to_high_volume() { + async fn put_small_object_stores_inline() { let (storage, hv, lt) = make_tiered_storage(); - let payload = vec![0u8; 100]; // 100 bytes, well under 1 MiB - let id = ObjectId::new(make_context(), "small".into()); + let id = make_id("small"); + let payload = b"small payload".to_vec(); storage - .put_object(&id, &Default::default(), stream::single(payload)) + .put_object(&id, &Default::default(), stream::single(payload.clone())) .await .unwrap(); assert!(hv.contains(&id), "expected in high-volume"); assert!(!lt.contains(&id), "leaked to long-term"); + + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + assert!( + storage.get_metadata(&id).await.unwrap().is_some(), + "get_metadata should return metadata for inline objects" + ); } #[tokio::test] - async fn large_object_goes_to_long_term_with_tombstone() { + async fn put_large_object_creates_tombstone() { let (storage, hv, lt) = make_tiered_storage(); - let payload_len = 2 * 1024 * 1024; // 2 MiB, over threshold - let payload = vec![0xABu8; payload_len]; - let id = ObjectId::new(make_context(), "large".into()); + let id = make_id("large"); + let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold + let metadata_in = Metadata { + content_type: "image/png".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), + origin: Some("10.0.0.1".into()), + ..Default::default() + }; storage - .put_object(&id, &Default::default(), stream::single(payload)) + .put_object(&id, &metadata_in, stream::single(payload.clone())) .await .unwrap(); - // Real payload should be in long-term - let (_, lt_bytes) = lt.get(&id).expect_object(); - assert_eq!(lt_bytes.len(), payload_len); + // Tombstone in HV: correct expiration_policy, target is a revision key. + let tombstone = hv.get(&id).expect_tombstone(); + assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); + let lt_id = tombstone.target; + assert!( + lt_id.key().starts_with(id.key()), + "tombstone target key should be a revision of the HV key, got: {}", + lt_id.key() + ); + + // LT object at revision key with correct metadata. + let (lt_meta, _) = lt.get(<_id).expect_object(); + assert_eq!(lt_meta.content_type, "image/png"); + assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); - // A redirect tombstone should exist in high-volume - hv.get(&id).expect_tombstone(); + // get_object follows the tombstone and returns the correct payload. + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload); + + // get_metadata follows the tombstone and returns the correct content_type. + let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); + assert_eq!(metadata.content_type, "image/png"); } + // --- Put overwrites --- + #[tokio::test] - async fn reinsert_with_existing_tombstone_routes_to_long_term() { + async fn reinsert_small_over_large_swaps_to_inline() { let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "reinsert-key".into()); + let id = make_id("reinsert-key"); - // First: insert a large object → creates tombstone in hv, payload in lt + // First: insert a large object → creates tombstone in hv, payload in lt at lt_id let large_payload = vec![0xABu8; 2 * 1024 * 1024]; storage .put_object(&id, &Default::default(), stream::single(large_payload)) .await .unwrap(); - hv.get(&id).expect_tombstone(); + let lt_id = hv.get(&id).expect_tombstone().target; - // Now re-insert a SMALL payload with the same key. The service should - // detect the existing tombstone and route to long-term anyway. + // Re-insert a SMALL payload with the same key. + // The CAS-swap puts the small object inline in HV and cleans up the old GCS blob. let small_payload = vec![0xCDu8; 100]; // well under 1 MiB threshold storage .put_object(&id, &Default::default(), stream::single(small_payload)) .await .unwrap(); - // The small object should be in long-term (not high-volume) - let (_, lt_bytes) = lt.get(&id).expect_object(); - assert_eq!(lt_bytes.len(), 100); + // The small object is now inline in high-volume. + hv.get(&id).expect_object(); - // The tombstone in hv should still be present - hv.get(&id).expect_tombstone(); + // The old long-term blob was cleaned up. + lt.get(<_id).expect_not_found(); } #[tokio::test] - async fn tombstone_inherits_expiration_policy() { + async fn overwrite_large_with_large_replaces_revision() { let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("overwrite-large"); - let metadata_in = Metadata { - content_type: "image/png".into(), - expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(3600)), - origin: Some("10.0.0.1".into()), - ..Default::default() - }; - let payload = vec![0u8; 2 * 1024 * 1024]; // force long-term - let id = ObjectId::new(make_context(), "expiry-test".into()); + let payload1 = vec![0xAAu8; 2 * 1024 * 1024]; + storage + .put_object(&id, &Default::default(), stream::single(payload1)) + .await + .unwrap(); + let lt_id_1 = hv.get(&id).expect_tombstone().target; + let payload2 = vec![0xBBu8; 2 * 1024 * 1024]; storage - .put_object(&id, &metadata_in, stream::single(payload)) + .put_object(&id, &Default::default(), stream::single(payload2.clone())) .await .unwrap(); + let lt_id_2 = hv.get(&id).expect_tombstone().target; - // The tombstone in hv should have expiration_policy inherited and target set to id. - let tombstone = hv.get(&id).expect_tombstone(); - assert_eq!(tombstone.expiration_policy, metadata_in.expiration_policy); - assert_eq!( - tombstone.target, id, - "tombstone target must point to the LT ObjectId (identical to HV id for now)" + assert_ne!( + lt_id_1, lt_id_2, + "second write should create a new revision" ); + lt.get(<_id_1).expect_not_found(); + lt.get(<_id_2).expect_object(); - // The long-term object should have the full metadata - let (lt_meta, _) = lt.get(&id).expect_object(); - assert_eq!(lt_meta.content_type, "image/png"); - assert_eq!(lt_meta.expiration_policy, metadata_in.expiration_policy); + let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); + let body = stream::read_to_vec(s).await.unwrap(); + assert_eq!(body, payload2); } - // --- Tombstone redirect tests --- + // --- Delete --- #[tokio::test] - async fn reads_follow_tombstone_redirect() { - let (storage, _hv, _lt) = make_tiered_storage(); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB, over threshold + async fn delete_small_object() { + let (storage, hv, _lt) = make_tiered_storage(); + let id = make_id("delete-small"); - let metadata_in = Metadata { - content_type: "image/png".into(), - ..Default::default() - }; - let id = ObjectId::new(make_context(), "redirect-read".into()); + storage + .put_object(&id, &Default::default(), stream::single("tiny")) + .await + .unwrap(); + + storage.delete_object(&id).await.unwrap(); + + hv.get(&id).expect_not_found(); + assert!(storage.get_object(&id).await.unwrap().is_none()); + } + + #[tokio::test] + async fn delete_large_object_cleans_up_both_backends() { + let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("delete-both"); + let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB storage - .put_object(&id, &metadata_in, stream::single(payload.clone())) + .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - // get_object should transparently follow the tombstone - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body = stream::read_to_vec(s).await.unwrap(); - assert_eq!(body, payload); + // Capture lt_id before deleting (it lives at the revision key, not at id). + let lt_id = hv.get(&id).expect_tombstone().target; - // get_metadata should also follow the tombstone - let metadata = storage.get_metadata(&id).await.unwrap().unwrap(); - assert_eq!(metadata.content_type, "image/png"); + storage.delete_object(&id).await.unwrap(); + + assert!(!hv.contains(&id), "tombstone not cleaned up"); + assert!(!lt.contains(<_id), "long-term object not cleaned up"); } - // --- Tombstone inconsistency tests --- + /// A backend wrapper that delegates everything except `delete_object`, which always fails. + #[derive(Debug)] + struct FailingDeleteBackend(InMemoryBackend); + + #[async_trait::async_trait] + impl Backend for FailingDeleteBackend { + fn name(&self) -> &'static str { + "failing-delete" + } - /// A backend where `create_tombstone` always fails, but all other operations work normally. + async fn put_object( + &self, + id: &ObjectId, + metadata: &Metadata, + stream: ClientStream, + ) -> Result { + self.0.put_object(id, metadata, stream).await + } + + async fn get_object(&self, id: &ObjectId) -> Result { + self.0.get_object(id).await + } + + async fn delete_object(&self, _id: &ObjectId) -> Result { + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "simulated long-term delete failure", + ))) + } + } + + /// When the long-term GCS cleanup fails after the tombstone is deleted, the + /// delete still succeeds (GCS cleanup is best-effort). An orphan blob may + /// remain in LT storage, which is accepted. + #[tokio::test] + async fn delete_succeeds_when_gcs_cleanup_fails() { + let hv = InMemoryBackend::new("hv"); + let lt = FailingDeleteBackend(InMemoryBackend::new("lt")); + let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt)); + + let id = make_id("fail-delete"); + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term + storage + .put_object(&id, &Default::default(), stream::single(payload)) + .await + .unwrap(); + + // Delete succeeds even though GCS cleanup fails (it is best-effort). + let result = storage.delete_object(&id).await; + assert!( + result.is_ok(), + "delete should succeed despite GCS cleanup failure" + ); + + // The tombstone in HV is gone (CAS-deleted first, before GCS cleanup). + hv.get(&id).expect_not_found(); + + // The orphaned GCS blob remains but the object is unreachable through the service. + assert!( + storage.get_object(&id).await.unwrap().is_none(), + "object should be unreachable after tombstone is deleted" + ); + } + + // --- CAS conflicts --- + + /// A backend wrapper that delegates everything except `compare_and_write`, which always + /// returns `Ok(false)` to simulate a lost CAS race. #[derive(Debug)] - struct FailingTombstoneBackend(InMemoryBackend); + struct ConflictingCasBackend(InMemoryBackend); #[async_trait::async_trait] - impl Backend for FailingTombstoneBackend { + impl Backend for ConflictingCasBackend { fn name(&self) -> &'static str { - "failing-tombstone" + "conflicting-cas" } async fn put_object( @@ -563,7 +843,7 @@ mod tests { } #[async_trait::async_trait] - impl HighVolumeBackend for FailingTombstoneBackend { + impl HighVolumeBackend for ConflictingCasBackend { async fn put_non_tombstone( &self, id: &ObjectId, @@ -573,17 +853,6 @@ mod tests { self.0.put_non_tombstone(id, metadata, payload).await } - async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { - self.0.delete_non_tombstone(id).await - } - - async fn create_tombstone(&self, _id: &ObjectId, _tombstone: Tombstone) -> Result<()> { - Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "simulated tombstone write failure", - ))) - } - async fn get_tiered_object(&self, id: &ObjectId) -> Result { self.0.get_tiered_object(id).await } @@ -591,81 +860,85 @@ mod tests { async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { self.0.get_tiered_metadata(id).await } + + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { + self.0.delete_non_tombstone(id).await + } + + async fn compare_and_write( + &self, + _id: &ObjectId, + _current: Option<&ObjectId>, + _write: TieredWrite, + ) -> Result { + Ok(false) // always conflict + } } - /// If the tombstone write to the high-volume backend fails after the long-term - /// write succeeds, the long-term object must be cleaned up so we never leave - /// an unreachable orphan in long-term storage. + /// After a large-object write loses the CAS race, the new LT blob must be + /// cleaned up. The put still returns `Ok(())` — from the caller's view, a + /// concurrent write won. #[tokio::test] - async fn no_orphan_when_tombstone_write_fails() { + async fn put_large_cas_conflict_cleans_up_new_blob() { + let hv = ConflictingCasBackend(InMemoryBackend::new("hv")); let lt = InMemoryBackend::new("lt"); - let hv = FailingTombstoneBackend(InMemoryBackend::new("hv")); let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone())); - let id = ObjectId::new(make_context(), "orphan-test".into()); + let id = make_id("cas-conflict-large"); let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path - let result = storage - .put_object(&id, &Default::default(), stream::single(payload)) - .await; - - assert!(result.is_err()); - assert!(lt.is_empty(), "long-term object not cleaned up"); - } - - /// If a tombstone exists in high-volume but the corresponding object is - /// missing from long-term storage (e.g. due to a race condition or partial - /// cleanup), reads should gracefully return None rather than error. - #[tokio::test] - async fn orphan_tombstone_returns_none() { - let (storage, _hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "orphan-tombstone".into()); - let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB storage .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - // Remove the long-term object, leaving an orphan tombstone in hv - lt.remove(&id); - - assert!( - storage.get_object(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_object" - ); assert!( - storage.get_metadata(&id).await.unwrap().is_none(), - "orphan tombstone should resolve to None on get_metadata" + lt.is_empty(), + "LT blob should be cleaned up after CAS conflict" ); } - // --- Delete tests --- - + /// When swapping a tombstone for inline data, a CAS conflict means another + /// writer won. The put still returns `Ok(())` — no LT blob was written, so + /// there is nothing to clean up. #[tokio::test] - async fn delete_cleans_up_both_backends() { - let (storage, hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "delete-both".into()); - let payload = vec![0u8; 2 * 1024 * 1024]; // 2 MiB - - storage - .put_object(&id, &Default::default(), stream::single(payload)) + async fn put_small_over_tombstone_cas_conflict_succeeds() { + let inner = InMemoryBackend::new("hv"); + let id = make_id("cas-conflict-small"); + + // Pre-seed a tombstone directly in the inner backend so put_non_tombstone + // returns it instead of writing inline. + let tombstone = Tombstone { + target: make_id("lt-object"), + expiration_policy: ExpirationPolicy::Manual, + }; + inner + .compare_and_write(&id, None, TieredWrite::Tombstone(tombstone)) .await .unwrap(); - storage.delete_object(&id).await.unwrap(); + let lt = InMemoryBackend::new("lt"); + let hv = ConflictingCasBackend(inner); + let storage = TieredStorage::new(Box::new(hv), Box::new(lt)); - assert!(!hv.contains(&id), "tombstone not cleaned up"); - assert!(!lt.contains(&id), "object not cleaned up"); + // Writing a small object over a tombstone should succeed even when CAS + // conflicts — the other writer's write is accepted. + storage + .put_object(&id, &Default::default(), stream::single("tiny")) + .await + .unwrap(); } - /// A backend wrapper that delegates everything except `delete_object`, which always fails. + // --- Failure / inconsistency --- + + /// A backend where `compare_and_write` always errors, but all other operations work normally. #[derive(Debug)] - struct FailingDeleteBackend(InMemoryBackend); + struct FailingCasBackend(InMemoryBackend); #[async_trait::async_trait] - impl Backend for FailingDeleteBackend { + impl Backend for FailingCasBackend { fn name(&self) -> &'static str { - "failing-delete" + "failing-cas" } async fn put_object( @@ -681,41 +954,97 @@ mod tests { self.0.get_object(id).await } - async fn delete_object(&self, _id: &ObjectId) -> Result { + async fn delete_object(&self, id: &ObjectId) -> Result { + self.0.delete_object(id).await + } + } + + #[async_trait::async_trait] + impl HighVolumeBackend for FailingCasBackend { + async fn put_non_tombstone( + &self, + id: &ObjectId, + metadata: &Metadata, + payload: bytes::Bytes, + ) -> Result> { + self.0.put_non_tombstone(id, metadata, payload).await + } + + async fn get_tiered_object(&self, id: &ObjectId) -> Result { + self.0.get_tiered_object(id).await + } + + async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { + self.0.get_tiered_metadata(id).await + } + + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { + self.0.delete_non_tombstone(id).await + } + + async fn compare_and_write( + &self, + _id: &ObjectId, + _current: Option<&ObjectId>, + _write: TieredWrite, + ) -> Result { Err(Error::Io(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, - "simulated long-term delete failure", + "simulated compare_and_write failure", ))) } } - /// When the long-term delete fails, the tombstone must be preserved so the - /// object remains reachable and no data is orphaned. + /// If the tombstone write to the high-volume backend fails after the long-term + /// write succeeds, the long-term object must be cleaned up so we never leave + /// an unreachable orphan in long-term storage. #[tokio::test] - async fn tombstone_preserved_when_long_term_delete_fails() { - let hv = InMemoryBackend::new("hv"); - let lt = FailingDeleteBackend(InMemoryBackend::new("lt")); - let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt)); + async fn no_orphan_when_tombstone_write_fails() { + let lt = InMemoryBackend::new("lt"); + let hv = FailingCasBackend(InMemoryBackend::new("hv")); + let storage = TieredStorage::new(Box::new(hv), Box::new(lt.clone())); + + let id = make_id("orphan-test"); + let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> long-term path + let result = storage + .put_object(&id, &Default::default(), stream::single(payload)) + .await; + + assert!(result.is_err()); + assert!(lt.is_empty(), "long-term object not cleaned up"); + } + + /// If a tombstone exists in high-volume but the corresponding object is + /// missing from long-term storage (e.g. due to a race condition or partial + /// cleanup), reads should gracefully return None rather than error. + #[tokio::test] + async fn orphan_tombstone_returns_none() { + let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("orphan-tombstone"); + let payload = vec![0xCDu8; 2 * 1024 * 1024]; // 2 MiB - let id = ObjectId::new(make_context(), "fail-delete".into()); - let payload = vec![0xABu8; 2 * 1024 * 1024]; // 2 MiB -> goes to long-term storage - .put_object(&id, &Default::default(), stream::single(payload.clone())) + .put_object(&id, &Default::default(), stream::single(payload)) .await .unwrap(); - let result = storage.delete_object(&id).await; - assert!(result.is_err()); + // The object is at the revision key in LT, not at id. + let lt_id = hv.get(&id).expect_tombstone().target; - hv.get(&id).expect_tombstone(); + // Remove the long-term object, leaving an orphan tombstone in hv + lt.remove(<_id); - // The object should still be reachable through the service - let (_, s) = storage.get_object(&id).await.unwrap().unwrap(); - let body = stream::read_to_vec(s).await.unwrap(); - assert_eq!(body, payload); + assert!( + storage.get_object(&id).await.unwrap().is_none(), + "orphan tombstone should resolve to None on get_object" + ); + assert!( + storage.get_metadata(&id).await.unwrap().is_none(), + "orphan tombstone should resolve to None on get_metadata" + ); } - // --- Redirect target tests --- + // --- Redirect target --- /// A tombstone carrying an explicit `target` is followed correctly on reads and deletes, /// including when the target ObjectId differs from the HV ObjectId. @@ -725,23 +1054,21 @@ mod tests { let lt = InMemoryBackend::new("lt"); let storage = TieredStorage::new(Box::new(hv.clone()), Box::new(lt.clone())); - let hv_id = ObjectId::new(make_context(), "hv-key".into()); - let lt_id = ObjectId::new(make_context(), "lt-key".into()); + let hv_id = make_id("hv-key"); + let lt_id = make_id("lt-key"); let payload = vec![0xABu8; 100]; // Write the object under the LT id and a tombstone pointing to it from HV. lt.put_object(<_id, &Default::default(), stream::single(payload.clone())) .await .unwrap(); - hv.create_tombstone( - &hv_id, - crate::backend::common::Tombstone { - target: lt_id.clone(), - expiration_policy: objectstore_types::metadata::ExpirationPolicy::Manual, - }, - ) - .await - .unwrap(); + let tombstone = Tombstone { + target: lt_id.clone(), + expiration_policy: ExpirationPolicy::Manual, + }; + hv.compare_and_write(&hv_id, None, TieredWrite::Tombstone(tombstone)) + .await + .unwrap(); // get_object must follow the tombstone and find the object via the lt_id target. let (_, s) = storage.get_object(&hv_id).await.unwrap().unwrap(); @@ -754,12 +1081,12 @@ mod tests { assert!(!lt.contains(<_id), "lt object should be removed"); } - // --- Multi-chunk streaming tests --- + // --- Multi-chunk --- #[tokio::test] async fn multi_chunk_large_object_chains_buffered_and_remaining() { - let (storage, _hv, lt) = make_tiered_storage(); - let id = ObjectId::new(make_context(), "multi-chunk".into()); + let (storage, hv, lt) = make_tiered_storage(); + let id = make_id("multi-chunk"); // Deliver a 2 MiB payload across multiple chunks that individually // fit under the threshold but collectively exceed it. @@ -775,8 +1102,9 @@ mod tests { .await .unwrap(); - // Should have been routed to long-term (over 1 MiB). - let (_, lt_bytes) = lt.get(&id).expect_object(); + // Should have been routed to long-term (over 1 MiB) at the revision key. + let lt_id = hv.get(&id).expect_tombstone().target; + let (_, lt_bytes) = lt.get(<_id).expect_object(); assert_eq!(lt_bytes.len(), chunk_size * chunk_count); // Verify data integrity — each chunk's fill byte should appear in order. diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 2d34de78..c4c0be9d 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -235,7 +235,9 @@ mod tests { use super::*; use crate::backend::bigtable::{BigTableBackend, BigTableConfig}; - use crate::backend::common::{HighVolumeBackend, TieredGet, TieredMetadata, Tombstone}; + use crate::backend::common::{ + HighVolumeBackend, TieredGet, TieredMetadata, TieredWrite, Tombstone, + }; use crate::backend::gcs::{GcsBackend, GcsConfig}; use crate::backend::in_memory::InMemoryBackend; use crate::backend::tiered::TieredStorage; @@ -525,16 +527,6 @@ mod tests { self.inner.put_non_tombstone(id, metadata, payload).await } - async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { - self.inner.delete_non_tombstone(id).await - } - - async fn create_tombstone(&self, id: &ObjectId, tombstone: Tombstone) -> Result<()> { - self.inner.create_tombstone(id, tombstone).await?; - self.on_put.notify_one(); - Ok(()) - } - async fn get_tiered_object(&self, id: &ObjectId) -> Result { self.inner.get_tiered_object(id).await } @@ -542,6 +534,24 @@ mod tests { async fn get_tiered_metadata(&self, id: &ObjectId) -> Result { self.inner.get_tiered_metadata(id).await } + + async fn delete_non_tombstone(&self, id: &ObjectId) -> Result> { + self.inner.delete_non_tombstone(id).await + } + + async fn compare_and_write( + &self, + id: &ObjectId, + current: Option<&ObjectId>, + write: TieredWrite, + ) -> Result { + let notify = matches!(write, TieredWrite::Tombstone(_) | TieredWrite::Object(_, _)); + let result = self.inner.compare_and_write(id, current, write).await?; + if notify { + self.on_put.notify_one(); + } + Ok(result) + } } #[tokio::test] @@ -579,9 +589,11 @@ mod tests { .expect("timed out waiting for tombstone write"); // Verify the object was fully written despite the caller being dropped. + // The tombstone in HV points to the revision key in LT. let id = ObjectId::new(make_context(), "completion-test".into()); - assert!(lt.inner.contains(&id), "long-term object missing"); - hv.inner.get(&id).expect_tombstone(); + let tombstone = hv.inner.get(&id).expect_tombstone(); + let lt_id = tombstone.target; + assert!(lt.inner.contains(<_id), "long-term object missing"); } // --- Concurrency limit tests ---