Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 48 additions & 72 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,29 @@ fn metadata_filter() -> v2::RowFilter {
}
}

fn mutation(mutation: mutation::Mutation) -> v2::Mutation {
v2::Mutation {
mutation: Some(mutation),
}
}

/// Creates a `DeleteFromRow` mutation wrapped in the outer [`v2::Mutation`] envelope.
fn delete_row_mutation() -> v2::Mutation {
mutation(mutation::Mutation::DeleteFromRow(
mutation::DeleteFromRow {},
))
}

/// Builds the three mutations that write an object row: clear existing data,
/// then set the payload and metadata cells.
///
/// Used by both [`BigTableBackend::put_row`] (unconditional write) and
/// [`BigTableBackend::put_non_tombstone`] (conditional write).
fn build_write_mutations(
fn object_mutations(
metadata: &Metadata,
payload: Vec<u8>,
now: SystemTime,
) -> Result<[mutation::Mutation; 3]> {
) -> Result<[v2::Mutation; 3]> {
let (family, timestamp_micros) = match metadata.expiration_policy {
ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
Expand All @@ -326,19 +339,19 @@ fn build_write_mutations(

Ok([
// NB: We explicitly delete the row to clear metadata on overwrite.
mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}),
mutation::Mutation::SetCell(mutation::SetCell {
delete_row_mutation(),
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
column_qualifier: COLUMN_PAYLOAD.to_owned(),
timestamp_micros,
value: payload,
}),
mutation::Mutation::SetCell(mutation::SetCell {
})),
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
column_qualifier: COLUMN_METADATA.to_owned(),
timestamp_micros,
value: metadata_bytes,
}),
})),
])
}

Expand All @@ -360,10 +373,7 @@ struct TombstoneMeta {
///
/// Used by both [`BigTableBackend::put_tombstone_row`] (unconditional write) and the
/// TTI bump path in tiered reads.
fn build_tombstone_mutations(
tombstone: &Tombstone,
now: SystemTime,
) -> Result<[mutation::Mutation; 3]> {
fn tombstone_mutations(tombstone: &Tombstone, now: SystemTime) -> Result<[v2::Mutation; 3]> {
let (family, timestamp_micros) = match tombstone.expiration_policy {
ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, now)?),
Expand All @@ -375,20 +385,20 @@ fn build_tombstone_mutations(
};

Ok([
mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {}),
mutation::Mutation::SetCell(mutation::SetCell {
delete_row_mutation(),
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
column_qualifier: COLUMN_REDIRECT.to_owned(),
timestamp_micros,
value: tombstone.target.as_storage_path().to_string().into_bytes(),
}),
mutation::Mutation::SetCell(mutation::SetCell {
})),
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
timestamp_micros,
value: serde_json::to_vec(&tombstone_meta)
.map_err(|cause| Error::serde("failed to serialize tombstone", cause))?,
}),
})),
])
}

Expand Down Expand Up @@ -630,23 +640,16 @@ impl BigTableBackend {
})
}

async fn mutate<I>(
async fn mutate(
&self,
path: Vec<u8>,
mutations: I,
mutations: impl Into<Vec<v2::Mutation>>,
action: &'static str,
) -> Result<v2::MutateRowResponse>
where
I: IntoIterator<Item = mutation::Mutation>,
{
let mutations = mutations
.into_iter()
.map(|m| v2::Mutation { mutation: Some(m) })
.collect();
) -> Result<v2::MutateRowResponse> {
let request = v2::MutateRowRequest {
table_name: self.table_path.clone(),
row_key: path,
mutations,
mutations: mutations.into(),
..Default::default()
};

Expand All @@ -665,7 +668,7 @@ impl BigTableBackend {
payload: Vec<u8>,
action: &'static str,
) -> Result<v2::MutateRowResponse> {
let mutations = build_write_mutations(metadata, payload, SystemTime::now())?;
let mutations = object_mutations(metadata, payload, SystemTime::now())?;
self.mutate(path, mutations, action).await
}

Expand All @@ -675,7 +678,7 @@ impl BigTableBackend {
tombstone: &Tombstone,
action: &'static str,
) -> Result<v2::MutateRowResponse> {
let mutations = build_tombstone_mutations(tombstone, SystemTime::now())?;
let mutations = tombstone_mutations(tombstone, SystemTime::now())?;
self.mutate(path, mutations, action).await
}

Expand Down Expand Up @@ -771,10 +774,7 @@ impl Backend for BigTableBackend {
tracing::debug!("Deleting from Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
let mutations = [mutation::Mutation::DeleteFromRow(
mutation::DeleteFromRow {},
)];
self.mutate(path, mutations, "delete").await?;
self.mutate(path, [delete_row_mutation()], "delete").await?;

Ok(())
}
Expand All @@ -792,10 +792,8 @@ impl HighVolumeBackend for BigTableBackend {
tracing::debug!("Conditional put to Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
let false_mutations = build_write_mutations(metadata, payload.to_vec(), SystemTime::now())?
.into_iter()
.map(|m| v2::Mutation { mutation: Some(m) })
.collect();
let false_mutations =
object_mutations(metadata, payload.to_vec(), SystemTime::now())?.into();

let request = v2::CheckAndMutateRowRequest {
table_name: self.table_path.clone(),
Expand Down Expand Up @@ -900,18 +898,13 @@ impl HighVolumeBackend for BigTableBackend {
tracing::debug!("Conditional delete from Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
let delete_mutation = v2::Mutation {
mutation: Some(mutation::Mutation::DeleteFromRow(
mutation::DeleteFromRow {},
)),
};

let request = v2::CheckAndMutateRowRequest {
table_name: self.table_path.clone(),
row_key: path.clone(),
predicate_filter: Some(tombstone_predicate()),
true_mutations: vec![], // Tombstone matched → leave intact.
false_mutations: vec![delete_mutation], // No tombstone → delete.
false_mutations: vec![delete_row_mutation()], // No tombstone → delete.
..Default::default()
};

Expand Down Expand Up @@ -965,27 +958,10 @@ impl HighVolumeBackend for BigTableBackend {
let path = id.as_storage_path().to_string().into_bytes();
let now = SystemTime::now();

let write_mutations: Vec<v2::Mutation> = match write {
TieredWrite::Tombstone(tombstone) => {
let mutations = build_tombstone_mutations(&tombstone, now)?;
mutations
.into_iter()
.map(|m| v2::Mutation { mutation: Some(m) })
.collect()
}
TieredWrite::Object(metadata, payload) => {
let mutations = build_write_mutations(&metadata, payload.to_vec(), now)?;
mutations
.into_iter()
.map(|m| v2::Mutation { mutation: Some(m) })
.collect()
}
TieredWrite::Delete => {
let delete = mutation::Mutation::DeleteFromRow(mutation::DeleteFromRow {});
vec![v2::Mutation {
mutation: Some(delete),
}]
}
let write_mutations = match write {
TieredWrite::Tombstone(tombstone) => tombstone_mutations(&tombstone, now)?.into(),
TieredWrite::Object(m, p) => object_mutations(&m, p.to_vec(), now)?.into(),
TieredWrite::Delete => vec![delete_row_mutation()],
};

let (predicate_filter, true_mutations, false_mutations, success_on_match) = match current {
Expand Down Expand Up @@ -1159,7 +1135,7 @@ mod tests {
now: SystemTime,
) -> Result<()> {
let path = id.as_storage_path().to_string().into_bytes();
let mutations = build_write_mutations(metadata, payload.to_vec(), now)?;
let mutations = object_mutations(metadata, payload.to_vec(), now)?;
backend.mutate(path, mutations, "test-setup").await?;
Ok(())
}
Expand All @@ -1171,7 +1147,7 @@ mod tests {
now: SystemTime,
) -> Result<()> {
let path = id.as_storage_path().to_string().into_bytes();
let mutations = build_tombstone_mutations(tombstone, now)?;
let mutations = tombstone_mutations(tombstone, now)?;
backend.mutate(path, mutations, "test-setup").await?;
Ok(())
}
Expand Down Expand Up @@ -1203,12 +1179,12 @@ mod tests {
};

let path = id.as_storage_path().to_string().into_bytes();
let mutations = [mutation::Mutation::SetCell(mutation::SetCell {
let mutations = [mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family.to_owned(),
column_qualifier: COLUMN_METADATA.to_owned(),
timestamp_micros,
value: meta.into_bytes(),
})];
}))];

backend.mutate(path, mutations, "test-setup").await?;

Expand All @@ -1223,18 +1199,18 @@ mod tests {
) -> Result<()> {
let path = id.as_storage_path().to_string().into_bytes();
let mutations = [
mutation::Mutation::SetCell(mutation::SetCell {
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: FAMILY_MANUAL.to_owned(),
column_qualifier: COLUMN_REDIRECT.to_owned(),
timestamp_micros: -1,
value: b"".to_vec(), // empty — legacy format
}),
mutation::Mutation::SetCell(mutation::SetCell {
})),
mutation(mutation::Mutation::SetCell(mutation::SetCell {
family_name: FAMILY_MANUAL.to_owned(),
column_qualifier: COLUMN_TOMBSTONE_META.to_owned(),
timestamp_micros: -1,
value: b"{}".to_vec(),
}),
})),
];

backend.mutate(path, mutations, "test-setup").await?;
Expand Down
Loading