Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod table_info;
use alloc::{rc::Rc, vec::Vec};
pub use common::{ColumnFilter, SchemaTable};
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
pub use raw_table::InferredSchemaCache;
use serde::Deserialize;
use sqlite::ResultCode;
pub use table_info::{
Expand Down
181 changes: 163 additions & 18 deletions crates/core/src/schema/raw_table.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use core::fmt::{self, Formatter, Write, from_fn};
use core::{
cell::RefCell,
fmt::{self, Formatter, Write, from_fn},
};

use alloc::{
collections::btree_map::BTreeMap,
format,
rc::Rc,
string::{String, ToString},
vec,
vec::Vec,
};
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};
use powersync_sqlite_nostd::{self as sqlite, Connection, Destructor, ResultCode};

use crate::{
error::PowerSyncError,
schema::{ColumnFilter, RawTable, SchemaTable},
schema::{ColumnFilter, PendingStatement, PendingStatementValue, RawTable, SchemaTable},
utils::{InsertIntoCrud, SqlBuffer, WriteType},
views::table_columns_to_json_object,
};

pub struct InferredTableStructure {
pub name: String,
pub columns: Vec<String>,
}

Expand All @@ -24,7 +30,7 @@ impl InferredTableStructure {
table_name: &str,
db: impl Connection,
ignored_local_columns: &ColumnFilter,
) -> Result<Option<Self>, PowerSyncError> {
) -> Result<Self, PowerSyncError> {
let stmt = db.prepare_v2("select name from pragma_table_info(?)")?;
stmt.bind_text(1, table_name, Destructor::STATIC)?;

Expand All @@ -41,15 +47,163 @@ impl InferredTableStructure {
}

if !has_id_column && columns.is_empty() {
Ok(None)
Err(PowerSyncError::argument_error(format!(
"Could not find {table_name} in local schema."
)))
} else if !has_id_column {
Err(PowerSyncError::argument_error(format!(
"Table {table_name} has no id column."
)))
} else {
Ok(Some(Self { columns }))
Ok(Self {
name: table_name.to_string(),
columns,
})
}
}

/// Generates a statement of the form `INSERT OR REPLACE INTO $tbl ($cols) VALUES (?, ...)` for
/// the sync client.
pub fn infer_put_stmt(&self) -> PendingStatement {
let mut buffer = SqlBuffer::new();
let mut params = vec![];

buffer.push_str("INSERT OR REPLACE INTO ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we perhaps change this to use upsert statements instead?

This is specifically required when using local-only columns - see https://docs.powersync.com/client-sdks/advanced/raw-tables#use-upsert-instead-of-insert-or-replace

However, even without local-only columns it could be better, especially if the developer users custom triggers (REPLACE triggers a delete + insert if the record exists, while upsert just triggers an update).

It may even be useful to change or JSON tables to use upserts, but that may need a separate investigation on the implications.

As far as I know, the only advantage to using a REPLACE statement is to simplify the query (less duplication of column names), which is not relevant when we're generating it.

let _ = buffer.identifier().write_str(&self.name);
buffer.push_str(" (id");
for column in &self.columns {
buffer.comma();
let _ = buffer.identifier().write_str(column);
}
buffer.push_str(") VALUES (?");
params.push(PendingStatementValue::Id);
for column in &self.columns {
buffer.comma();
buffer.push_str("?");
params.push(PendingStatementValue::Column(column.clone()));
}
buffer.push_str(")");

PendingStatement {
sql: buffer.sql,
params,
named_parameters_index: None,
}
}

/// Generates a statement of the form `DELETE FROM $tbl WHERE id = ?` for the sync client.
pub fn infer_delete_stmt(&self) -> PendingStatement {
let mut buffer = SqlBuffer::new();
buffer.push_str("DELETE FROM ");
let _ = buffer.identifier().write_str(&self.name);
buffer.push_str(" WHERE id = ?");

PendingStatement {
sql: buffer.sql,
params: vec![PendingStatementValue::Id],
named_parameters_index: None,
}
}
}

/// A cache of inferred raw table schema and associated put and delete statements for `sync_local`.
///
/// This cache avoids having to re-generate statements on every (partial) checkpoint in the sync
/// client.
#[derive(Default)]
pub struct InferredSchemaCache {
entries: RefCell<BTreeMap<String, SchemaCacheEntry>>,
}

impl InferredSchemaCache {
pub fn current_schema_version(db: *mut sqlite::sqlite3) -> Result<usize, PowerSyncError> {
let version = db.prepare_v2("PRAGMA schema_version")?;
version.step()?;
let version = version.column_int64(0) as usize;
Ok(version)
}

pub fn infer_put_statement(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::put)
}

pub fn infer_delete_statement(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::delete)
}

fn with_entry(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
f: impl FnOnce(&mut SchemaCacheEntry) -> Rc<PendingStatement>,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
let mut entries = self.entries.borrow_mut();
if let Some(value) = entries.get_mut(&tbl.name) {
if value.schema_version != schema_version {
// Values are outdated, refresh.
*value = SchemaCacheEntry::infer(db, schema_version, tbl)?;
}

Ok(f(value))
} else {
let mut entry = SchemaCacheEntry::infer(db, schema_version, tbl)?;
let stmt = f(&mut entry);
entries.insert(tbl.name.clone(), entry);
Ok(stmt)
}
}
}

pub struct SchemaCacheEntry {
schema_version: usize,
structure: InferredTableStructure,
put_stmt: Option<Rc<PendingStatement>>,
delete_stmt: Option<Rc<PendingStatement>>,
}

impl SchemaCacheEntry {
fn infer(
db: *mut sqlite::sqlite3,
schema_version: usize,
table: &RawTable,
) -> Result<Self, PowerSyncError> {
let local_table_name = table.require_table_name()?;
let structure = InferredTableStructure::read_from_database(
local_table_name,
db,
&table.schema.local_only_columns,
)?;

Ok(Self {
schema_version,
structure,
put_stmt: None,
delete_stmt: None,
})
}

fn put(&mut self) -> Rc<PendingStatement> {
self.put_stmt
.get_or_insert_with(|| Rc::new(self.structure.infer_put_stmt()))
.clone()
}

fn delete(&mut self) -> Rc<PendingStatement> {
self.delete_stmt
.get_or_insert_with(|| Rc::new(self.structure.infer_delete_stmt()))
.clone()
}
}

/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
Expand All @@ -60,19 +214,10 @@ pub fn generate_raw_table_trigger(
trigger_name: &str,
write: WriteType,
) -> Result<String, PowerSyncError> {
let Some(local_table_name) = table.schema.table_name.as_ref() else {
return Err(PowerSyncError::argument_error("Table has no local name"));
};

let local_table_name = table.require_table_name()?;
let local_only_columns = &table.schema.local_only_columns;
let Some(resolved_table) =
InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?
else {
return Err(PowerSyncError::argument_error(format!(
"Could not find {} in local schema",
local_table_name
)));
};
let resolved_table =
InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?;

let as_schema_table = SchemaTable::Raw {
definition: table,
Expand Down
18 changes: 16 additions & 2 deletions crates/core/src/schema/table_info.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use alloc::rc::Rc;
use alloc::string::ToString;
use alloc::vec;
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
use serde::{Deserialize, de::Visitor};

use crate::error::PowerSyncError;
use crate::schema::ColumnFilter;

#[derive(Deserialize)]
Expand Down Expand Up @@ -52,8 +54,8 @@ pub struct RawTable {
pub name: String,
#[serde(flatten, default)]
pub schema: RawTableSchema,
pub put: PendingStatement,
pub delete: PendingStatement,
pub put: Option<Rc<PendingStatement>>,
pub delete: Option<Rc<PendingStatement>>,
#[serde(default)]
pub clear: Option<String>,
}
Expand All @@ -78,6 +80,18 @@ impl Table {
}
}

impl RawTable {
pub fn require_table_name(&self) -> Result<&str, PowerSyncError> {
let Some(local_table_name) = self.schema.table_name.as_ref() else {
return Err(PowerSyncError::argument_error(format!(
"Raw table {} has no local name",
self.name,
)));
};
Ok(local_table_name)
}
}

#[derive(Deserialize)]
pub struct Column {
pub name: String,
Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use alloc::{
use powersync_sqlite_nostd::{self as sqlite, Context};
use sqlite::{Connection, ResultCode};

use crate::{schema::Schema, sync::SyncClient};
use crate::{
schema::{InferredSchemaCache, Schema},
sync::SyncClient,
};

/// State that is shared for a SQLite database connection after the core extension has been
/// registered on it.
Expand All @@ -25,6 +28,9 @@ pub struct DatabaseState {
pending_updates: RefCell<BTreeSet<String>>,
commited_updates: RefCell<BTreeSet<String>>,
pub sync_client: RefCell<Option<SyncClient>>,
/// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync
/// client.
pub inferred_schema_cache: InferredSchemaCache,
}

impl DatabaseState {
Expand Down
Loading