diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 1aa3c6e..50e8e5c 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -7,6 +7,7 @@ mod table_info; use alloc::{rc::Rc, vec::Vec}; pub use common::{ColumnFilter, SchemaTable}; use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args}; +pub use raw_table::InferredSchemaCache; use serde::Deserialize; use sqlite::ResultCode; pub use table_info::{ diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs index bd9c9f2..6369eff 100644 --- a/crates/core/src/schema/raw_table.rs +++ b/crates/core/src/schema/raw_table.rs @@ -1,21 +1,27 @@ -use core::fmt::{self, Formatter, Write, from_fn}; +use core::{ + cell::RefCell, + fmt::{self, Formatter, Write, from_fn}, +}; use alloc::{ + collections::btree_map::BTreeMap, format, + rc::Rc, string::{String, ToString}, vec, vec::Vec, }; -use powersync_sqlite_nostd::{Connection, Destructor, ResultCode}; +use powersync_sqlite_nostd::{self as sqlite, Connection, Destructor, ResultCode}; use crate::{ error::PowerSyncError, - schema::{ColumnFilter, RawTable, SchemaTable}, + schema::{ColumnFilter, PendingStatement, PendingStatementValue, RawTable, SchemaTable}, utils::{InsertIntoCrud, SqlBuffer, WriteType}, views::table_columns_to_json_object, }; pub struct InferredTableStructure { + pub name: String, pub columns: Vec, } @@ -24,7 +30,7 @@ impl InferredTableStructure { table_name: &str, db: impl Connection, ignored_local_columns: &ColumnFilter, - ) -> Result, PowerSyncError> { + ) -> Result { let stmt = db.prepare_v2("select name from pragma_table_info(?)")?; stmt.bind_text(1, table_name, Destructor::STATIC)?; @@ -41,15 +47,163 @@ impl InferredTableStructure { } if !has_id_column && columns.is_empty() { - Ok(None) + Err(PowerSyncError::argument_error(format!( + "Could not find {table_name} in local schema." + ))) } else if !has_id_column { Err(PowerSyncError::argument_error(format!( "Table {table_name} has no id column." ))) } else { - Ok(Some(Self { columns })) + Ok(Self { + name: table_name.to_string(), + columns, + }) + } + } + + /// Generates a statement of the form `INSERT OR REPLACE INTO $tbl ($cols) VALUES (?, ...)` for + /// the sync client. + pub fn infer_put_stmt(&self) -> PendingStatement { + let mut buffer = SqlBuffer::new(); + let mut params = vec![]; + + buffer.push_str("INSERT OR REPLACE INTO "); + let _ = buffer.identifier().write_str(&self.name); + buffer.push_str(" (id"); + for column in &self.columns { + buffer.comma(); + let _ = buffer.identifier().write_str(column); + } + buffer.push_str(") VALUES (?"); + params.push(PendingStatementValue::Id); + for column in &self.columns { + buffer.comma(); + buffer.push_str("?"); + params.push(PendingStatementValue::Column(column.clone())); + } + buffer.push_str(")"); + + PendingStatement { + sql: buffer.sql, + params, + named_parameters_index: None, } } + + /// Generates a statement of the form `DELETE FROM $tbl WHERE id = ?` for the sync client. + pub fn infer_delete_stmt(&self) -> PendingStatement { + let mut buffer = SqlBuffer::new(); + buffer.push_str("DELETE FROM "); + let _ = buffer.identifier().write_str(&self.name); + buffer.push_str(" WHERE id = ?"); + + PendingStatement { + sql: buffer.sql, + params: vec![PendingStatementValue::Id], + named_parameters_index: None, + } + } +} + +/// A cache of inferred raw table schema and associated put and delete statements for `sync_local`. +/// +/// This cache avoids having to re-generate statements on every (partial) checkpoint in the sync +/// client. +#[derive(Default)] +pub struct InferredSchemaCache { + entries: RefCell>, +} + +impl InferredSchemaCache { + pub fn current_schema_version(db: *mut sqlite::sqlite3) -> Result { + let version = db.prepare_v2("PRAGMA schema_version")?; + version.step()?; + let version = version.column_int64(0) as usize; + Ok(version) + } + + pub fn infer_put_statement( + &self, + db: *mut sqlite::sqlite3, + schema_version: usize, + tbl: &RawTable, + ) -> Result, PowerSyncError> { + self.with_entry(db, schema_version, tbl, SchemaCacheEntry::put) + } + + pub fn infer_delete_statement( + &self, + db: *mut sqlite::sqlite3, + schema_version: usize, + tbl: &RawTable, + ) -> Result, PowerSyncError> { + self.with_entry(db, schema_version, tbl, SchemaCacheEntry::delete) + } + + fn with_entry( + &self, + db: *mut sqlite::sqlite3, + schema_version: usize, + tbl: &RawTable, + f: impl FnOnce(&mut SchemaCacheEntry) -> Rc, + ) -> Result, PowerSyncError> { + let mut entries = self.entries.borrow_mut(); + if let Some(value) = entries.get_mut(&tbl.name) { + if value.schema_version != schema_version { + // Values are outdated, refresh. + *value = SchemaCacheEntry::infer(db, schema_version, tbl)?; + } + + Ok(f(value)) + } else { + let mut entry = SchemaCacheEntry::infer(db, schema_version, tbl)?; + let stmt = f(&mut entry); + entries.insert(tbl.name.clone(), entry); + Ok(stmt) + } + } +} + +pub struct SchemaCacheEntry { + schema_version: usize, + structure: InferredTableStructure, + put_stmt: Option>, + delete_stmt: Option>, +} + +impl SchemaCacheEntry { + fn infer( + db: *mut sqlite::sqlite3, + schema_version: usize, + table: &RawTable, + ) -> Result { + let local_table_name = table.require_table_name()?; + let structure = InferredTableStructure::read_from_database( + local_table_name, + db, + &table.schema.local_only_columns, + )?; + + Ok(Self { + schema_version, + structure, + put_stmt: None, + delete_stmt: None, + }) + } + + fn put(&mut self) -> Rc { + self.put_stmt + .get_or_insert_with(|| Rc::new(self.structure.infer_put_stmt())) + .clone() + } + + fn delete(&mut self) -> Rc { + self.delete_stmt + .get_or_insert_with(|| Rc::new(self.structure.infer_delete_stmt())) + .clone() + } } /// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to @@ -60,19 +214,10 @@ pub fn generate_raw_table_trigger( trigger_name: &str, write: WriteType, ) -> Result { - let Some(local_table_name) = table.schema.table_name.as_ref() else { - return Err(PowerSyncError::argument_error("Table has no local name")); - }; - + let local_table_name = table.require_table_name()?; let local_only_columns = &table.schema.local_only_columns; - let Some(resolved_table) = - InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)? - else { - return Err(PowerSyncError::argument_error(format!( - "Could not find {} in local schema", - local_table_name - ))); - }; + let resolved_table = + InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?; let as_schema_table = SchemaTable::Raw { definition: table, diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 20ff72a..23baf53 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -1,8 +1,10 @@ +use alloc::rc::Rc; use alloc::string::ToString; use alloc::vec; use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec}; use serde::{Deserialize, de::Visitor}; +use crate::error::PowerSyncError; use crate::schema::ColumnFilter; #[derive(Deserialize)] @@ -52,8 +54,8 @@ pub struct RawTable { pub name: String, #[serde(flatten, default)] pub schema: RawTableSchema, - pub put: PendingStatement, - pub delete: PendingStatement, + pub put: Option>, + pub delete: Option>, #[serde(default)] pub clear: Option, } @@ -78,6 +80,18 @@ impl Table { } } +impl RawTable { + pub fn require_table_name(&self) -> Result<&str, PowerSyncError> { + let Some(local_table_name) = self.schema.table_name.as_ref() else { + return Err(PowerSyncError::argument_error(format!( + "Raw table {} has no local name", + self.name, + ))); + }; + Ok(local_table_name) + } +} + #[derive(Deserialize)] pub struct Column { pub name: String, diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index 1239215..9024133 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -11,7 +11,10 @@ use alloc::{ use powersync_sqlite_nostd::{self as sqlite, Context}; use sqlite::{Connection, ResultCode}; -use crate::{schema::Schema, sync::SyncClient}; +use crate::{ + schema::{InferredSchemaCache, Schema}, + sync::SyncClient, +}; /// State that is shared for a SQLite database connection after the core extension has been /// registered on it. @@ -25,6 +28,9 @@ pub struct DatabaseState { pending_updates: RefCell>, commited_updates: RefCell>, pub sync_client: RefCell>, + /// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync + /// client. + pub inferred_schema_cache: InferredSchemaCache, } impl DatabaseState { diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 69a5572..ea031cd 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,5 +1,6 @@ use alloc::collections::btree_map::BTreeMap; use alloc::format; +use alloc::rc::Rc; use alloc::string::{String, ToString}; use alloc::vec::Vec; use serde::ser::SerializeMap; @@ -7,7 +8,9 @@ use serde::{Deserialize, Serialize}; use crate::error::{PSResult, PowerSyncError}; use crate::schema::inspection::ExistingTable; -use crate::schema::{PendingStatement, PendingStatementValue, RawTable, Schema}; +use crate::schema::{ + InferredSchemaCache, PendingStatement, PendingStatementValue, RawTable, Schema, +}; use crate::state::DatabaseState; use crate::sync::BucketPriority; use crate::utils::SqlBuffer; @@ -129,6 +132,9 @@ impl<'a> SyncOperation<'a> { self.collect_tables()?; let statement = self.collect_full_operations()?; + // We're in a transaction, so the schem can't change while we're applying changes. + let schema_version = InferredSchemaCache::current_schema_version(self.db)?; + let schema_cache = &self.state.inferred_schema_cache; // We cache the last insert and delete statements for each row struct CachedStatement { @@ -151,7 +157,7 @@ impl<'a> SyncOperation<'a> { if let Some(raw) = &mut known.raw { match data { Ok(data) => { - let stmt = raw.put_statement(self.db)?; + let stmt = raw.put_statement(self.db, schema_version, schema_cache)?; let parsed: serde_json::Value = serde_json::from_str(data) .map_err(PowerSyncError::json_local_error)?; let json_object = parsed.as_object().ok_or_else(|| { @@ -165,7 +171,8 @@ impl<'a> SyncOperation<'a> { stmt.exec(self.db, type_name, id, Some(&parsed))?; } Err(_) => { - let stmt = raw.delete_statement(self.db)?; + let stmt = + raw.delete_statement(self.db, schema_version, schema_cache)?; stmt.bind_for_delete(id)?; stmt.exec(self.db, type_name, id, None)?; } @@ -464,19 +471,16 @@ struct ParsedSchemaTable<'a> { struct RawTableWithCachedStatements<'a> { definition: &'a RawTable, - cached_put: Option>, - cached_delete: Option>, + cached_put: Option, + cached_delete: Option, } impl<'a> RawTableWithCachedStatements<'a> { - fn prepare_lazily<'b>( + fn prepare_lazily( db: *mut sqlite::sqlite3, - slot: &'b mut Option>, - def: &'a PendingStatement, - ) -> Result<&'b PreparedPendingStatement<'a>, PowerSyncError> - where - 'a: 'b, - { + slot: &mut Option, + def: Rc, + ) -> Result<&PreparedPendingStatement, PowerSyncError> { Ok(match slot { Some(stmt) => stmt, None => { @@ -489,15 +493,33 @@ impl<'a> RawTableWithCachedStatements<'a> { fn put_statement( &'_ mut self, db: *mut sqlite::sqlite3, - ) -> Result<&'_ PreparedPendingStatement<'_>, PowerSyncError> { - Self::prepare_lazily(db, &mut self.cached_put, &self.definition.put) + schema_version: usize, + cache: &InferredSchemaCache, + ) -> Result<&'_ PreparedPendingStatement, PowerSyncError> { + Self::prepare_lazily( + db, + &mut self.cached_put, + match self.definition.put { + Some(ref stmt) => stmt.clone(), + None => cache.infer_put_statement(db, schema_version, &self.definition)?, + }, + ) } fn delete_statement( &'_ mut self, db: *mut sqlite::sqlite3, - ) -> Result<&'_ PreparedPendingStatement<'_>, PowerSyncError> { - Self::prepare_lazily(db, &mut self.cached_delete, &self.definition.delete) + schema_version: usize, + cache: &InferredSchemaCache, + ) -> Result<&'_ PreparedPendingStatement, PowerSyncError> { + Self::prepare_lazily( + db, + &mut self.cached_delete, + match self.definition.delete { + Some(ref stmt) => stmt.clone(), + None => cache.infer_delete_statement(db, schema_version, &self.definition)?, + }, + ) } } @@ -517,15 +539,15 @@ impl<'a> ParsedSchemaTable<'a> { } } -struct PreparedPendingStatement<'a> { +struct PreparedPendingStatement { stmt: ManagedStmt, - definition: &'a PendingStatement, + definition: Rc, } -impl<'a> PreparedPendingStatement<'a> { +impl PreparedPendingStatement { pub fn prepare( db: *mut sqlite::sqlite3, - pending: &'a PendingStatement, + pending: Rc, ) -> Result { let stmt = db.prepare_v2(&pending.sql).into_db_result(db)?; if stmt.bind_parameter_count() as usize != pending.params.len() { diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 2f19a9b..faea8a7 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -818,7 +818,8 @@ void main() { db.execute('CREATE TABLE users (name TEXT);'); expect( () => createRawTableTriggers(rawTableDescription({})), - throwsA(isSqliteException(3091, contains('Table has no local name'))), + throwsA(isSqliteException( + 3091, contains('Raw table row_type has no local name'))), ); }); diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 5be01fd..5672aad 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1170,27 +1170,88 @@ CREATE TRIGGER users_delete expect(db.select('SELECT * FROM users'), isEmpty); }); - syncTest('default trigger smoke test', (_) { + syncTest('inferred schema smoke test', (_) { db.execute( - 'CREATE TABLE local_users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); + 'CREATE TABLE local_users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL, local TEXT) STRICT;'); final table = { 'name': 'users', 'table_name': 'local_users', + 'local_only_columns': ['local'], // This also tests that the trigger preventing updates and deletes on // insert-only tables is inert during sync_local. 'insert_only': true, - 'put': { - 'sql': 'INSERT OR REPLACE INTO local_users (id, name) VALUES (?, ?);', - 'params': [ - 'Id', - {'Column': 'name'} - ], + }; + + invokeControl( + 'start', + json.encode({ + 'schema': { + 'raw_tables': [table], + 'tables': [], + } + })); + + // Insert + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'PUT', + { + 'name': 'First user', + 'local': 'ignored because the column is local-only' }, - 'delete': { - 'sql': 'DELETE FROM local_users WHERE id = ?', - 'params': ['Id'], + objectType: 'users', + ); + pushCheckpointComplete(); + + var users = db.select('SELECT * FROM local_users;'); + expect(users, [ + { + 'id': 'my_user', + 'name': 'First user', + 'local': null, + } + ]); + + // Now, alter the underlying table to add a new column + db.execute('ALTER TABLE local_users ADD COLUMN email TEXT'); + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '2', + 'my_user', + 'PUT', + { + 'name': 'First user', + 'email': 'email@example.org', + 'local': 'still ignored' }, - 'clear': 'DELETE FROM local_users;', + objectType: 'users', + ); + pushCheckpointComplete(); + + users = db.select('SELECT * FROM local_users;'); + expect(users, [ + { + 'id': 'my_user', + 'name': 'First user', + 'email': 'email@example.org', + 'local': null, + } + ]); + }); + + test('inferred schema with changes', () { + db.execute( + 'CREATE TABLE local_users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); + final table = { + 'name': 'users', + 'table_name': 'local_users', + // This also tests that the trigger preventing updates and deletes on + // insert-only tables is inert during sync_local. + 'insert_only': true, }; db.execute(''' SELECT