Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use spacetimedb_lib::{http as st_http, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
use spacetimedb_sats::{
bsatn::{self, ToBsatn},
buffer::{CountWriter, TeeWriter},
buffer::CountWriter,
AlgebraicValue, ProductValue,
};
use spacetimedb_schema::identifier::Identifier;
Expand Down Expand Up @@ -343,16 +343,16 @@ impl InstanceEnv {
fn project_cols_bsatn(buffer: &mut [u8], cols: ColList, row_ref: RowRef<'_>) -> usize {
// We get back a col-list with the columns with generated values.
// Write those back to `buffer` and then the encoded length to `row_len`.
let counter = CountWriter::default();
let mut writer = TeeWriter::new(counter, buffer);
for col in cols.iter() {
// Read the column value to AV and then serialize.
let val = row_ref
.read_col::<AlgebraicValue>(col)
.expect("reading col as AV never panics");
bsatn::to_writer(&mut writer, &val).unwrap();
}
writer.w1.finish()
let (_, count) = CountWriter::run(buffer, |writer| {
for col in cols.iter() {
// Read the column value to AV and then serialize.
let val = row_ref
.read_col::<AlgebraicValue>(col)
.expect("reading col as AV never panics");
bsatn::to_writer(writer, &val).unwrap();
}
});
count
}

pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result<usize, NodesError> {
Expand Down
3 changes: 2 additions & 1 deletion crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ impl CommittedState {

let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) };
unsafe { table.insert_index(blob_store, index_id, index) }
.expect("rebuilding should not cause constraint violations");
index_id_map.insert(index_id, table_id);
}
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions crates/sats/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ pub struct CountWriter {
}

impl CountWriter {
/// Run `work` on `writer`, but also count the number of bytes written.
pub fn run<W: BufWriter, R>(writer: W, work: impl FnOnce(&mut TeeWriter<W, CountWriter>) -> R) -> (R, usize) {
let counter = Self::default();
let mut writer = TeeWriter::new(writer, counter);
let ret = work(&mut writer);
(ret, writer.w2.finish())
}

/// Consumes the counter and returns the final count.
pub fn finish(self) -> usize {
self.num_bytes
Expand Down
8 changes: 8 additions & 0 deletions crates/sats/src/product_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ impl From<ColId> for InvalidFieldError {
}

impl ProductValue {
/// Pushes a single value to he product.
pub fn push(self, val: impl Into<AlgebraicValue>) -> Self {
let mut vals: Vec<_> = self.elements.into();
vals.reserve(1);
vals.push(val.into());
Self::from(vals)
}

/// Borrow the value at field of `self` identified by `col_pos`.
///
/// The `name` is non-functional and is only used for error-messages.
Expand Down
2 changes: 1 addition & 1 deletion crates/table/benches/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ fn make_table_with_index<R: IndexedRow>(unique: bool) -> (Table, IndexId) {
let algo = BTreeAlgorithm { columns: cols }.into();
let idx = tbl.new_index(&algo, unique).unwrap();
// SAFETY: index was derived from the table.
unsafe { tbl.insert_index(&NullBlobStore, index_id, idx) };
unsafe { tbl.insert_index(&NullBlobStore, index_id, idx) }.unwrap();

(tbl, index_id)
}
Expand Down
1 change: 1 addition & 0 deletions crates/table/proptest-regressions/table.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ cc 1f295db61a02ac3378f5ffcceb084637d2391bcc1758af6fb2df8355a713e998 # shrinks to
cc 776d142680b35d7dad5b558fea7071b095f7e6a23c8549e9b32b452d5eebf92b # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Builtin(String) }] }, ProductValue { elements: [String("\u{16af0}a®ਲ𒒀A 𑌅 ಎ꒐𑍇A A𐫫Aⷀ𑌵ૠ\u{b55} aㄱ \u{f99}a ")] })
cc 66d99531b8e513d0fd558f492f708d110e1e117dfc7f3f42188bcc57c23bb89e # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Builtin(Map(MapType { key_ty: Builtin(U8), ty: Builtin(Map(MapType { key_ty: Builtin(I32), ty: Builtin(F32) })) })) }] }, ProductValue { elements: [Map({U8(0): Map({I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(-0.0)), I32(-1): F32(Total(-0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0))}), U8(1): Map({I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(-0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0)), I32(4): F32(Total(0.0)), I32(5): F32(Total(0.0)), I32(6): F32(Total(0.0)), I32(7): F32(Total(0.0))}), U8(2): Map({I32(-3): F32(Total(-0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(-0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0))}), U8(3): Map({I32(-10): F32(Total(0.0)), I32(-9): F32(Total(0.0)), I32(-8): F32(Total(-0.0)), I32(-7): F32(Total(0.0)), I32(-6): F32(Total(0.0)), I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0))}), U8(4): Map({I32(-7): F32(Total(0.0)), I32(-6): F32(Total(0.0)), I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0))}), U8(5): Map({I32(-9): F32(Total(0.0)), I32(-8): F32(Total(0.0)), I32(-7): F32(Total(0.0)), I32(-6): F32(Total(0.0)), I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0)), I32(4): F32(Total(0.0)), I32(5): F32(Total(0.0))}), U8(6): Map({I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0))}), U8(7): Map({I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(-0.0)), I32(3): F32(Total(0.0))}), U8(8): Map({I32(-7): F32(Total(0.0)), I32(-6): F32(Total(-0.0)), I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0)), I32(4): F32(Total(0.0)), I32(5): F32(Total(0.0)), I32(6): F32(Total(-0.0)), I32(7): F32(Total(0.0))}), U8(9): Map({I32(-1349171619): F32(Total(418648100.0)), I32(-665792478): F32(Total(-5.3081414e23)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0)), I32(5): F32(Total(-0.0)), I32(906732021): F32(Total(1.952517e16)), I32(1965197035): F32(Total(1020.84216))}), U8(11): Map({I32(-7): F32(Total(0.0)), I32(-6): F32(Total(0.0)), I32(-5): F32(Total(0.0)), I32(-4): F32(Total(0.0)), I32(-3): F32(Total(0.0)), I32(-2): F32(Total(0.0)), I32(-1): F32(Total(0.0)), I32(0): F32(Total(0.0)), I32(1): F32(Total(0.0)), I32(2): F32(Total(0.0)), I32(3): F32(Total(0.0)), I32(4): F32(Total(0.0)), I32(5): F32(Total(0.0)), I32(6): F32(Total(0.0))})})] })
cc 7f478c4dd0f24e715a74949c6d06af8ca2b4c8b82fae4f53c953a2b323cff851 # shrinks to (ty, val) = (ProductType { elements: [ProductTypeElement { name: None, algebraic_type: Builtin(Array(ArrayType { elem_ty: Builtin(Map(MapType { key_ty: Builtin(U64), ty: Builtin(Bool) })) })) }] }, ProductValue { elements: [Array([{U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false), U64(10): Bool(false), U64(11): Bool(false), U64(12): Bool(false), U64(13): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false), U64(10): Bool(false), U64(11): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false), U64(10): Bool(false), U64(11): Bool(false), U64(12): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false), U64(10): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false), U64(6): Bool(false), U64(7): Bool(false), U64(8): Bool(false), U64(9): Bool(false), U64(10): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false), U64(2): Bool(false), U64(3): Bool(false), U64(4): Bool(false), U64(5): Bool(false)}, {U64(0): Bool(false), U64(1): Bool(false)}])] })
cc 01bfd4449bee7eaa0b61b60792baed8d52d3589f4a5bb313bf057194a6248a83
94 changes: 63 additions & 31 deletions crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1413,36 +1413,57 @@ impl Table {
/// # Safety
///
/// Caller must promise that `index` was constructed with the same row type/layout as this table.
pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) {
pub unsafe fn insert_index(
&mut self,
blob_store: &dyn BlobStore,
index_id: IndexId,
mut index: TableIndex,
) -> Result<(), String> {
let rows = self.scan_rows(blob_store);
// SAFETY: Caller promised that table's row type/layout
// matches that which `index` was constructed with.
// It follows that this applies to any `rows`, as required.
let violation = unsafe { index.build_from_rows(rows) };
violation.unwrap_or_else(|ptr| {
let index_schema = &self.schema.indexes.iter().find(|index_schema| index_schema.index_id == index_id).expect("Index should exist");
let indexed_column = if let IndexAlgorithm::BTree(BTreeAlgorithm { columns }) = &index_schema.index_algorithm {
Some(columns)
} else { None };
let indexed_column = indexed_column.and_then(|columns| columns.as_singleton());
let indexed_column_info = indexed_column.and_then(|column| self.schema.get_column(column.idx()));
violation.map_err(|ptr| {
// SAFETY: `ptr` just came out of `self.scan_rows`, so it is present.
let row = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }.to_product_value();
panic!(
"Adding index `{}` {:?} to table `{}` {:?} on column `{}` {:?} should cause no unique constraint violations.

Found violation at pointer {ptr:?} to row {:?}.",
index_schema.index_name,
index_schema.index_id,
self.schema.table_name,
self.schema.table_id,
indexed_column_info.map(|column| &column.col_name[..]).unwrap_or("unknown column"),
indexed_column,
row,
);
});

if let Some(index_schema) = self.schema.indexes.iter().find(|index_schema| index_schema.index_id == index_id) {
let indexed_column = if let IndexAlgorithm::BTree(BTreeAlgorithm { columns }) = &index_schema.index_algorithm {
Some(columns)
} else {
None
};
let indexed_column = indexed_column.and_then(|columns| columns.as_singleton());
let indexed_column_info = indexed_column.and_then(|column| self.schema.get_column(column.idx()));

format!(
"Adding index `{}` {:?} to table `{}` {:?} on column `{}` {:?} should cause no unique constraint violations.\
Found violation at pointer {ptr:?} to row {:?}.",
index_schema.index_name,
index_schema.index_id,
self.schema.table_name,
self.schema.table_id,
indexed_column_info.map(|column| &column.col_name[..]).unwrap_or("unknown column"),
indexed_column,
row,
)
} else {
format!(
"Adding index to table `{}` {:?} on columns `{:?}` with key type {:?} should cause no unique constraint violations.\
Found violation at pointer {ptr:?} to row {:?}.",
self.schema.table_name,
self.schema.table_id,
index.indexed_columns,
index.key_type,
row,
)
}
})?;

// SAFETY: Forward caller requirement.
unsafe { self.add_index(index_id, index) };
Ok(())
}

/// Adds an index to the table without populating.
Expand Down Expand Up @@ -2453,7 +2474,7 @@ pub(crate) mod test {

let index = table.new_index(&algo, true).unwrap();
// SAFETY: Index was derived from `table`.
unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) };
unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) }.unwrap();

// Reserve a page so that we can check the hash.
let pi = table.inner.pages.reserve_empty_page(&pool, table.row_size()).unwrap();
Expand Down Expand Up @@ -2553,6 +2574,8 @@ pub(crate) mod test {
ty: ProductType,
vals: Vec<ProductValue>,
indexed_columns: ColList,
index_kind: IndexKind,
is_unique: bool,
) -> Result<(), TestCaseError> {
let pool = PagePool::new_for_test();
let mut blob_store = HashMapBlobStore::default();
Expand All @@ -2565,13 +2588,13 @@ pub(crate) mod test {
// We haven't added any indexes yet, so there should be 0 rows in indexes.
prop_assert_eq!(table.num_rows_in_indexes(), 0);

let index_id = IndexId(0);
let index_id = IndexId::SENTINEL;

let index = TableIndex::new(&ty, indexed_columns.clone(), IndexKind::BTree, false).unwrap();
let index = TableIndex::new(&ty, indexed_columns.clone(), index_kind, is_unique).unwrap();
// Add an index on column 0.
// Safety:
// We're using `ty` as the row type for both `table` and the new index.
unsafe { table.insert_index(&blob_store, index_id, index) };
prop_assume!(unsafe { table.insert_index(&blob_store, index_id, index) }.is_ok());

// We have one index, which should be fully populated,
// so in total we should have the same number of rows in indexes as we have rows.
Expand All @@ -2595,14 +2618,15 @@ pub(crate) mod test {
let key_size_in_pvs = vals
.iter()
.map(|row| crate::table_index::KeySize::key_size_in_bytes(&row.project(&indexed_columns).unwrap()) as u64)
.sum();
.sum::<u64>();
prop_assert_eq!(index.num_key_bytes(), key_size_in_pvs);

let index = TableIndex::new(&ty, indexed_columns, IndexKind::BTree, false).unwrap();
// Add a duplicate of the same index, so we can check that all above quantities double.
// Safety:
// As above, we're using `ty` as the row type for both `table` and the new index.
unsafe { table.insert_index(&blob_store, IndexId(1), index) };
unsafe { table.insert_index(&blob_store, IndexId(1), index) }
.expect("already inserted this index, should not error");

prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2);
prop_assert_eq!(table.bytes_used_by_index_keys(), key_size_in_pvs * 2);
Expand Down Expand Up @@ -2722,13 +2746,21 @@ pub(crate) mod test {
}

#[test]
fn index_size_reporting_matches_slow_implementations_single_column((ty, vals) in generate_typed_row_vec(1..SIZE, 128, 2048)) {
test_index_size_reporting(ty, vals, ColList::from(ColId(0)))?;
fn index_size_reporting_matches_slow_implementations_single_column(
(ty, vals) in generate_typed_row_vec(1..SIZE, 128, 2048),
index_kind: IndexKind,
is_unique: bool
) {
test_index_size_reporting(ty, vals, [0].into(), index_kind, is_unique)?;
}

#[test]
fn index_size_reporting_matches_slow_implementations_two_column((ty, vals) in generate_typed_row_vec(2..SIZE, 128, 2048)) {
test_index_size_reporting(ty, vals, ColList::from([ColId(0), ColId(1)]))?;
fn index_size_reporting_matches_slow_implementations_two_column(
(ty, vals) in generate_typed_row_vec(2..SIZE, 128, 2048),
index_kind: IndexKind,
is_unique: bool
) {
test_index_size_reporting(ty, vals, [0, 1].into(), index_kind, is_unique)?
}
}

Expand Down
Loading
Loading