From 5c576bb78c4e41e35513b22906649fb77e84df25 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 20 Mar 2026 09:48:45 +0100 Subject: [PATCH] ref(service): Simplify Bigtable mutation construction Push v2::Mutation wrapping into builders and a helper so call sites are clean. Builders now return fixed-size arrays; callers needing Vec use .into(). Co-Authored-By: Claude --- objectstore-service/src/backend/bigtable.rs | 120 ++++++++------------ 1 file changed, 48 insertions(+), 72 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 68c0b1e6..eba9701f 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -305,16 +305,29 @@ fn metadata_filter() -> v2::RowFilter { } } +fn mutation(mutation: mutation::Mutation) -> v2::Mutation { + v2::Mutation { + mutation: Some(mutation), + } +} + +/// Creates a `DeleteFromRow` mutation wrapped in the outer [`v2::Mutation`] envelope. +fn delete_row_mutation() -> v2::Mutation { + mutation(mutation::Mutation::DeleteFromRow( + mutation::DeleteFromRow {}, + )) +} + /// Builds the three mutations that write an object row: clear existing data, /// then set the payload and metadata cells. /// /// Used by both [`BigTableBackend::put_row`] (unconditional write) and /// [`BigTableBackend::put_non_tombstone`] (conditional write). -fn build_write_mutations( +fn object_mutations( metadata: &Metadata, payload: Vec, now: SystemTime, -) -> Result<[mutation::Mutation; 3]> { +) -> Result<[v2::Mutation; 3]> { let (family, timestamp_micros) = match metadata.expiration_policy { ExpirationPolicy::Manual => (FAMILY_MANUAL, -1), ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?), @@ -326,19 +339,19 @@ fn build_write_mutations( Ok([ // NB: We explicitly delete the row to clear metadata on overwrite. - mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}), - mutation::Mutation::SetCell(mutation::SetCell { + delete_row_mutation(), + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), column_qualifier: COLUMN_PAYLOAD.to_owned(), timestamp_micros, value: payload, - }), - mutation::Mutation::SetCell(mutation::SetCell { + })), + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), column_qualifier: COLUMN_METADATA.to_owned(), timestamp_micros, value: metadata_bytes, - }), + })), ]) } @@ -360,10 +373,7 @@ struct TombstoneMeta { /// /// Used by both [`BigTableBackend::put_tombstone_row`] (unconditional write) and the /// TTI bump path in tiered reads. -fn build_tombstone_mutations( - tombstone: &Tombstone, - now: SystemTime, -) -> Result<[mutation::Mutation; 3]> { +fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> { let (family, timestamp_micros) = match tombstone.expiration_policy { ExpirationPolicy::Manual => (FAMILY_MANUAL, -1), ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?), @@ -375,20 +385,20 @@ fn build_tombstone_mutations( }; Ok([ - mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}), - mutation::Mutation::SetCell(mutation::SetCell { + delete_row_mutation(), + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), column_qualifier: COLUMN_REDIRECT.to_owned(), timestamp_micros, value: tombstone.target.as_storage_path().to_string().into_bytes(), - }), - mutation::Mutation::SetCell(mutation::SetCell { + })), + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), timestamp_micros, value: serde_json::to_vec(&tombstone_meta) .map_err(|cause| Error::serde("failed to serialize tombstone", cause))?, - }), + })), ]) } @@ -630,23 +640,16 @@ impl BigTableBackend { }) } - async fn mutate( + async fn mutate( &self, path: Vec, - mutations: I, + mutations: impl Into>, action: &'static str, - ) -> Result - where - I: IntoIterator, - { - let mutations = mutations - .into_iter() - .map(|m| v2::Mutation { mutation: Some(m) }) - .collect(); + ) -> Result { let request = v2::MutateRowRequest { table_name: self.table_path.clone(), row_key: path, - mutations, + mutations: mutations.into(), ..Default::default() }; @@ -665,7 +668,7 @@ impl BigTableBackend { payload: Vec, action: &'static str, ) -> Result { - let mutations = build_write_mutations(metadata, payload, SystemTime::now())?; + let mutations = object_mutations(metadata, payload, SystemTime::now())?; self.mutate(path, mutations, action).await } @@ -675,7 +678,7 @@ impl BigTableBackend { tombstone: &Tombstone, action: &'static str, ) -> Result { - let mutations = build_tombstone_mutations(tombstone, SystemTime::now())?; + let mutations = tombstone_mutations(tombstone, SystemTime::now())?; self.mutate(path, mutations, action).await } @@ -771,10 +774,7 @@ impl Backend for BigTableBackend { tracing::debug!("Deleting from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); - let mutations = [mutation::Mutation::DeleteFromRow( - mutation::DeleteFromRow {}, - )]; - self.mutate(path, mutations, "delete").await?; + self.mutate(path, [delete_row_mutation()], "delete").await?; Ok(()) } @@ -792,10 +792,8 @@ impl HighVolumeBackend for BigTableBackend { tracing::debug!("Conditional put to Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); - let false_mutations = build_write_mutations(metadata, payload.to_vec(), SystemTime::now())? - .into_iter() - .map(|m| v2::Mutation { mutation: Some(m) }) - .collect(); + let false_mutations = + object_mutations(metadata, payload.to_vec(), SystemTime::now())?.into(); let request = v2::CheckAndMutateRowRequest { table_name: self.table_path.clone(), @@ -900,18 +898,13 @@ impl HighVolumeBackend for BigTableBackend { tracing::debug!("Conditional delete from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); - let delete_mutation = v2::Mutation { - mutation: Some(mutation::Mutation::DeleteFromRow( - mutation::DeleteFromRow {}, - )), - }; let request = v2::CheckAndMutateRowRequest { table_name: self.table_path.clone(), row_key: path.clone(), predicate_filter: Some(tombstone_predicate()), true_mutations: vec![], // Tombstone matched → leave intact. - false_mutations: vec![delete_mutation], // No tombstone → delete. + false_mutations: vec![delete_row_mutation()], // No tombstone → delete. ..Default::default() }; @@ -965,27 +958,10 @@ impl HighVolumeBackend for BigTableBackend { let path = id.as_storage_path().to_string().into_bytes(); let now = SystemTime::now(); - let write_mutations: Vec = match write { - TieredWrite::Tombstone(tombstone) => { - let mutations = build_tombstone_mutations(&tombstone, now)?; - mutations - .into_iter() - .map(|m| v2::Mutation { mutation: Some(m) }) - .collect() - } - TieredWrite::Object(metadata, payload) => { - let mutations = build_write_mutations(&metadata, payload.to_vec(), now)?; - mutations - .into_iter() - .map(|m| v2::Mutation { mutation: Some(m) }) - .collect() - } - TieredWrite::Delete => { - let delete = mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}); - vec![v2::Mutation { - mutation: Some(delete), - }] - } + let write_mutations = match write { + TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(), + TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(), + TieredWrite::Delete => vec![delete_row_mutation()], }; let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current { @@ -1159,7 +1135,7 @@ mod tests { now: SystemTime, ) -> Result<()> { let path = id.as_storage_path().to_string().into_bytes(); - let mutations = build_write_mutations(metadata, payload.to_vec(), now)?; + let mutations = object_mutations(metadata, payload.to_vec(), now)?; backend.mutate(path, mutations, "test-setup").await?; Ok(()) } @@ -1171,7 +1147,7 @@ mod tests { now: SystemTime, ) -> Result<()> { let path = id.as_storage_path().to_string().into_bytes(); - let mutations = build_tombstone_mutations(tombstone, now)?; + let mutations = tombstone_mutations(tombstone, now)?; backend.mutate(path, mutations, "test-setup").await?; Ok(()) } @@ -1203,12 +1179,12 @@ mod tests { }; let path = id.as_storage_path().to_string().into_bytes(); - let mutations = [mutation::Mutation::SetCell(mutation::SetCell { + let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: family.to_owned(), column_qualifier: COLUMN_METADATA.to_owned(), timestamp_micros, value: meta.into_bytes(), - })]; + }))]; backend.mutate(path, mutations, "test-setup").await?; @@ -1223,18 +1199,18 @@ mod tests { ) -> Result<()> { let path = id.as_storage_path().to_string().into_bytes(); let mutations = [ - mutation::Mutation::SetCell(mutation::SetCell { + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: FAMILY_MANUAL.to_owned(), column_qualifier: COLUMN_REDIRECT.to_owned(), timestamp_micros: -1, value: b"".to_vec(), // empty — legacy format - }), - mutation::Mutation::SetCell(mutation::SetCell { + })), + mutation(mutation::Mutation::SetCell(mutation::SetCell { family_name: FAMILY_MANUAL.to_owned(), column_qualifier: COLUMN_TOMBSTONE_META.to_owned(), timestamp_micros: -1, value: b"{}".to_vec(), - }), + })), ]; backend.mutate(path, mutations, "test-setup").await?;