From d760c64a47d4decfeb6c351d73bf0b9365490563 Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Fri, 27 Mar 2026 13:37:18 -0500 Subject: [PATCH 1/6] feat: add $pendingOperation virtual prop to track optimistic mutation type Add a new virtual property $pendingOperation ('insert' | 'update' | 'delete' | null) to every collection row. This tells consumers what type of optimistic mutation is pending, enabling draft/review UIs that show git-style change indicators. --- .../tests/persisted.test.ts | 1 + packages/db/src/collection/state.ts | 55 +- packages/db/src/index.ts | 1 + packages/db/src/query/compiler/group-by.ts | 28 + packages/db/src/virtual-props.ts | 32 + .../collection-subscribe-changes.test.ts | 1 + packages/db/tests/pending-operation.test.ts | 928 ++++++++++++++++++ packages/db/tests/utils.ts | 7 +- 8 files changed, 1050 insertions(+), 3 deletions(-) create mode 100644 packages/db/tests/pending-operation.test.ts diff --git a/packages/db-sqlite-persistence-core/tests/persisted.test.ts b/packages/db-sqlite-persistence-core/tests/persisted.test.ts index 63af1f5d4..f556044e3 100644 --- a/packages/db-sqlite-persistence-core/tests/persisted.test.ts +++ b/packages/db-sqlite-persistence-core/tests/persisted.test.ts @@ -241,6 +241,7 @@ const stripVirtualProps = | undefined>( $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as T diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index af65cb801..7d7ac6d37 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -3,6 +3,7 @@ import { SortedMap } from '../SortedMap' import { enrichRowWithVirtualProps } from '../virtual-props.js' import { DIRECT_TRANSACTION_METADATA_KEY } from './transaction-metadata.js' import type { + PendingOperationType, VirtualOrigin, VirtualRowProps, WithVirtualProps, @@ -110,6 +111,7 @@ export class CollectionStateManager< origin: VirtualOrigin key: TKey collectionId: string + pendingOperation: PendingOperationType enriched: WithVirtualProps } >() @@ -164,6 +166,32 @@ export class CollectionStateManager< return !this.optimisticUpserts.has(key) && !this.optimisticDeletes.has(key) } + /** + * Gets the pending operation type for a row. + * Returns the type of optimistic mutation pending for this key, or null if none. + * Used to compute the $pendingOperation virtual property. + */ + public getPendingOperation(key: TKey): PendingOperationType { + if (this.isLocalOnly) { + return null + } + // Check active optimistic state + if (this.optimisticDeletes.has(key)) { + return 'delete' + } + if (this.optimisticUpserts.has(key)) { + return this.syncedData.has(key) ? 'update' : 'insert' + } + // Check completed-but-awaiting-sync state to avoid flicker + if (this.pendingOptimisticDeletes.has(key)) { + return 'delete' + } + if (this.pendingOptimisticUpserts.has(key)) { + return this.syncedData.has(key) ? 'update' : 'insert' + } + return null + } + /** * Gets the origin of the last confirmed change to a row. * Returns 'local' if the row has optimistic mutations (optimistic changes are local). @@ -190,6 +218,10 @@ export class CollectionStateManager< $origin: overrides?.$origin ?? this.getRowOrigin(key), $key: overrides?.$key ?? key, $collectionId: overrides?.$collectionId ?? this.collection.id, + $pendingOperation: + overrides?.$pendingOperation !== undefined + ? overrides.$pendingOperation + : this.getPendingOperation(key), } } @@ -206,6 +238,7 @@ export class CollectionStateManager< return this.createVirtualPropsSnapshot(key, { $synced: true, $origin: 'local', + $pendingOperation: null, }) } @@ -218,11 +251,20 @@ export class CollectionStateManager< optimisticDeletes.has(key) || options?.completedOptimisticKeys?.has(key) === true + // Compute $pendingOperation from the provided or current state + let pendingOperation: PendingOperationType = null + if (optimisticDeletes.has(key)) { + pendingOperation = 'delete' + } else if (optimisticUpserts.has(key)) { + pendingOperation = this.syncedData.has(key) ? 'update' : 'insert' + } + return this.createVirtualPropsSnapshot(key, { $synced: !hasOptimisticChange, $origin: hasOptimisticChange ? 'local' : ((options?.rowOrigins ?? this.rowOrigins).get(key) ?? 'remote'), + $pendingOperation: pendingOperation, }) } @@ -235,6 +277,10 @@ export class CollectionStateManager< const origin = existingRow.$origin ?? virtualProps.$origin const resolvedKey = existingRow.$key ?? virtualProps.$key const collectionId = existingRow.$collectionId ?? virtualProps.$collectionId + const pendingOperation = + existingRow.$pendingOperation !== undefined + ? existingRow.$pendingOperation + : virtualProps.$pendingOperation const cached = this.virtualPropsCache.get(row as object) if ( @@ -242,7 +288,8 @@ export class CollectionStateManager< cached.synced === synced && cached.origin === origin && cached.key === resolvedKey && - cached.collectionId === collectionId + cached.collectionId === collectionId && + cached.pendingOperation === pendingOperation ) { return cached.enriched } @@ -253,6 +300,7 @@ export class CollectionStateManager< $origin: origin, $key: resolvedKey, $collectionId: collectionId, + $pendingOperation: pendingOperation, } as WithVirtualProps this.virtualPropsCache.set(row as object, { @@ -260,6 +308,7 @@ export class CollectionStateManager< origin, key: resolvedKey, collectionId, + pendingOperation, enriched, }) @@ -1173,7 +1222,9 @@ export class CollectionStateManager< const nextVirtualProps = this.getVirtualPropsSnapshotForState(key) const virtualChanged = previousVirtualProps.$synced !== nextVirtualProps.$synced || - previousVirtualProps.$origin !== nextVirtualProps.$origin + previousVirtualProps.$origin !== nextVirtualProps.$origin || + previousVirtualProps.$pendingOperation !== + nextVirtualProps.$pendingOperation const previousValueWithVirtual = previousVisibleValue !== undefined ? enrichRowWithVirtualProps( diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index ec1e22966..39f64fd6b 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -21,6 +21,7 @@ export * from './strategies/index.js' export { type VirtualRowProps, type VirtualOrigin, + type PendingOperationType, type WithVirtualProps, type WithoutVirtualProps, hasVirtualProps, diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 48661ba96..ce010c605 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -25,16 +25,19 @@ import type { VirtualOrigin } from '../../virtual-props.js' const VIRTUAL_SYNCED_KEY = `__virtual_synced__` const VIRTUAL_HAS_LOCAL_KEY = `__virtual_has_local__` +const VIRTUAL_PENDING_OP_KEY = `__virtual_pending_op__` type RowVirtualMetadata = { synced: boolean hasLocal: boolean + pendingOperation: string | null } function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { let found = false let allSynced = true let hasLocal = false + let pendingOperation: string | null = null for (const [alias, value] of Object.entries(row)) { if (alias === `$selected`) continue @@ -51,11 +54,19 @@ function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { if (asRecord.$origin === `local`) { hasLocal = true } + if ( + pendingOperation === null && + `$pendingOperation` in asRecord && + asRecord.$pendingOperation != null + ) { + pendingOperation = asRecord.$pendingOperation as string + } } return { synced: found ? allSynced : true, hasLocal, + pendingOperation, } } @@ -145,6 +156,19 @@ export function processGroupBy( return false }, }, + [VIRTUAL_PENDING_OP_KEY]: { + preMap: ([, row]: [string, NamespacedRow]) => + getRowVirtualMetadata(row).pendingOperation, + reduce: (values: Array<[string | null, number]>) => { + // Return the first non-null pending operation, or null if all are null + for (const [op, multiplicity] of values) { + if (op != null && multiplicity > 0) { + return op + } + } + return null + }, + }, } // Handle empty GROUP BY (single-group aggregation) @@ -232,10 +256,14 @@ export function processGroupBy( const groupHasLocal = (aggregatedRow as Record)[ VIRTUAL_HAS_LOCAL_KEY ] + const groupPendingOp = (aggregatedRow as Record)[ + VIRTUAL_PENDING_OP_KEY + ] resultRow.$synced = groupSynced ?? true resultRow.$origin = ( groupHasLocal ? `local` : `remote` ) satisfies VirtualOrigin + resultRow.$pendingOperation = groupPendingOp ?? null resultRow.$key = resultKey resultRow.$collectionId = aggregateCollectionId ?? resultRow.$collectionId diff --git a/packages/db/src/virtual-props.ts b/packages/db/src/virtual-props.ts index 3205d31c2..a1af4b45f 100644 --- a/packages/db/src/virtual-props.ts +++ b/packages/db/src/virtual-props.ts @@ -21,6 +21,16 @@ */ export type VirtualOrigin = 'local' | 'remote' +/** + * The type of pending optimistic operation for a row. + * + * - `'insert'`: Row was inserted in a pending transaction + * - `'update'`: Row was updated in a pending transaction + * - `'delete'`: Row was deleted in a pending transaction + * - `null`: No pending optimistic operation + */ +export type PendingOperationType = 'insert' | 'update' | 'delete' | null + /** * Virtual properties available on every row in TanStack DB collections. * @@ -94,6 +104,19 @@ export interface VirtualRowProps< * For live query collections, this is the ID of the upstream collection. */ readonly $collectionId: string + + /** + * The type of pending optimistic operation for this row. + * + * - `'insert'`: Row was inserted in a pending transaction + * - `'update'`: Row was updated in a pending transaction + * - `'delete'`: Row was deleted in a pending transaction + * - `null`: No pending optimistic operation + * + * For local-only collections, this is always `null`. + * For live query collections, this is passed through from the source collection. + */ + readonly $pendingOperation: PendingOperationType } /** @@ -171,12 +194,14 @@ export function createVirtualProps( collectionId: string, isSynced: boolean, origin: VirtualOrigin, + pendingOperation: PendingOperationType = null, ): VirtualRowProps { return { $synced: isSynced, $origin: origin, $key: key, $collectionId: collectionId, + $pendingOperation: pendingOperation, } } @@ -218,6 +243,7 @@ export function enrichRowWithVirtualProps< $origin: existingRow.$origin ?? computeOrigin(), $key: existingRow.$key ?? key, $collectionId: existingRow.$collectionId ?? collectionId, + $pendingOperation: existingRow.$pendingOperation ?? null, } as WithVirtualProps } @@ -246,11 +272,16 @@ export function computeAggregateVirtualProps( // $origin = 'local' if ANY row is local (consistent with "local influence" semantics) const hasLocal = rows.some((row) => row.$origin === 'local') + // $pendingOperation = null if all rows are null, otherwise the first non-null value + const firstPendingOp = + rows.find((row) => row.$pendingOperation != null)?.$pendingOperation ?? null + return { $synced: allSynced, $origin: hasLocal ? 'local' : 'remote', $key: groupKey, $collectionId: collectionId, + $pendingOperation: firstPendingOp, } } @@ -263,6 +294,7 @@ export const VIRTUAL_PROP_NAMES = [ '$origin', '$key', '$collectionId', + '$pendingOperation', ] as const /** diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4f851f08a..dca8e94e1 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -2680,6 +2680,7 @@ describe(`Virtual properties`, () => { $origin: `remote`, $key: `row-1`, $collectionId: `collection-1`, + $pendingOperation: null, }), ).toBe(true) }) diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts new file mode 100644 index 000000000..a9ae82e13 --- /dev/null +++ b/packages/db/tests/pending-operation.test.ts @@ -0,0 +1,928 @@ +import { describe, expect, it, vi } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { createLiveQueryCollection } from '../src/query/live-query-collection.js' +import { localOnlyCollectionOptions } from '../src/local-only.js' +import { createTransaction } from '../src/transactions' +import { mockSyncCollectionOptions, stripVirtualProps } from './utils' +import type { ChangeMessage } from '../src/types' + +const waitForChanges = () => new Promise((resolve) => setTimeout(resolve, 10)) + +type Item = { id: string; title: string } + +describe(`$pendingOperation virtual property`, () => { + // ── B1: Basic virtual prop behavior ───────────────────────────────── + + describe(`basic behavior`, () => { + it(`should be null for synced rows`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-synced`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Synced item` }], + }), + ) + + await collection.preload() + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(null) + }) + + it(`should be 'insert' for optimistic insert in pending transaction`, async () => { + const collection = createCollection({ + id: `pending-op-insert`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + // Never resolves during test — keeps transaction pending + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `new-1`, title: `New item` }) + await waitForChanges() + + const item = collection.get(`new-1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`insert`) + }) + + it(`should be 'update' for optimistic update in pending transaction`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-update`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`update`) + }) + + it(`should be 'delete' for optimistic delete in pending transaction`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-delete`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `To delete` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.delete(`1`) + }) + + // Note: get() returns undefined for deleted items (public API unchanged) + // but the internal state tracks pendingOperation as 'delete' + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`should return to null after transaction commit + sync confirmation`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `pending-op-confirm`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + onInsert: async () => {}, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(`insert`) + + // Simulate sync confirmation + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `New` } }) + syncFns!.commit() + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + + it(`should return to null after transaction rollback`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-rollback`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const mutationFn = vi + .fn() + .mockRejectedValue(new Error(`Rollback test`)) + + const tx = createTransaction({ + autoCommit: false, + mutationFn, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + expect(collection.get(`1`)!.$pendingOperation).toBe(`update`) + + // Commit should fail and trigger rollback + try { + await tx.commit() + } catch { + // Expected + } + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + + it(`should always be null for local-only collections`, () => { + const collection = createCollection( + localOnlyCollectionOptions({ + id: `pending-op-local-only`, + getKey: (item: any) => item.id, + }), + ) + + collection.insert({ id: `1`, title: `Local item` }) + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(null) + }) + + it(`should be null for non-optimistic mutations`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-non-optimistic`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, { optimistic: false }, (item) => { + item.title = `Non-optimistic update` + }) + }) + + // Non-optimistic mutations should not show pending operation + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + }) + + // ── B2: Transition and merging tests ──────────────────────────────── + + describe(`mutation merging`, () => { + it(`insert + update in same transaction shows $pendingOperation: 'insert'`, async () => { + const collection = createCollection({ + id: `pending-op-merge-insert-update`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.insert({ id: `1`, title: `New` }) + }) + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated new` + }) + }) + + // insert + update = insert (merged) + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`insert`) + expect(stripVirtualProps(item)).toEqual({ + id: `1`, + title: `Updated new`, + }) + }) + + it(`insert + delete in same transaction cancels out — item gone entirely`, async () => { + const collection = createCollection({ + id: `pending-op-merge-insert-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.insert({ id: `1`, title: `Temp` }) + }) + tx.mutate(() => { + collection.delete(`1`) + }) + + // insert + delete = cancelled + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(null) + }) + + it(`update + delete in same transaction shows $pendingOperation: 'delete'`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-merge-update-delete`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + tx.mutate(() => { + collection.delete(`1`) + }) + + // update + delete = delete + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`pending delete superseded by update in new transaction on same key`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-supersede`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx1 = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx1.mutate(() => { + collection.delete(`1`) + }) + + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + + // Second transaction inserts the item back (or updates it) + const tx2 = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx2.mutate(() => { + collection.insert({ id: `1`, title: `Reinserted` }) + }) + + // tx2's insert supersedes tx1's delete. Since the item exists in syncedData, + // the net effect is an upsert of an existing server item — so it's 'update', not 'insert'. + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`update`) + }) + + it(`subscribeChanges emits events when $pendingOperation transitions`, async () => { + const changes: Array> = [] + + const collection = createCollection({ + id: `pending-op-transitions`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + // Should have an insert event with $pendingOperation: 'insert' + const insertChange = changes.find( + (c) => c.key === `1` && c.type === `insert`, + ) + expect(insertChange).toBeDefined() + expect(insertChange!.value.$pendingOperation).toBe(`insert`) + + subscription.unsubscribe() + }) + + it(`subscribeChanges still emits type: 'delete' for optimistic deletes (backward compat)`, async () => { + const changes: Array> = [] + + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-delete-event`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Item` }], + }), + ) + + await collection.preload() + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // subscribeChanges should still emit a delete event (collection layer unchanged) + const deleteChange = changes.find( + (c) => c.key === `1` && c.type === `delete`, + ) + expect(deleteChange).toBeDefined() + + subscription.unsubscribe() + }) + }) + + // ── B3: Edge case tests ───────────────────────────────────────────── + + describe(`edge cases`, () => { + it(`$pendingOperation change invalidates virtual props cache when $synced and $origin are unchanged`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-cache`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + // First, do an update — item is $synced: false, $origin: 'local', $pendingOperation: 'update' + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + const updatedItem = collection.get(`1`) + expect(updatedItem!.$synced).toBe(false) + expect(updatedItem!.$origin).toBe(`local`) + expect(updatedItem!.$pendingOperation).toBe(`update`) + + // Now delete in the same transaction — $synced stays false, $origin stays local + // but $pendingOperation should change to 'delete' + tx.mutate(() => { + collection.delete(`1`) + }) + + // The item is deleted so get() returns undefined, but we can check internal state + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`item in completed-but-awaiting-sync window still shows non-null $pendingOperation`, async () => { + let resolveSync: (() => void) | undefined + + const collection = createCollection({ + id: `pending-op-awaiting-sync`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise((resolve) => { + resolveSync = resolve + }) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + // Item should have $pendingOperation: 'insert' before commit resolves + expect(collection.get(`1`)!.$pendingOperation).toBe(`insert`) + + // The onInsert creates a transaction that auto-commits. + // The mutationFn (onInsert) is still running — transaction is in 'persisting' state. + // $pendingOperation should still be non-null during this window. + expect(collection._state.getPendingOperation(`1`)).not.toBe(null) + + // Resolve the sync to clean up + resolveSync?.() + await waitForChanges() + }) + + it(`$pendingOperation transition triggers virtualChanged event during sync commit`, async () => { + const changes: Array> = [] + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `pending-op-virtual-changed`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + onInsert: async () => {}, + }) + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + const insertEvent = changes.find((c) => c.key === `1`) + expect(insertEvent).toBeDefined() + expect(insertEvent!.value.$pendingOperation).toBe(`insert`) + + changes.length = 0 + + // Sync confirms the insert + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `New` } }) + syncFns!.commit() + await waitForChanges() + + // Should emit an update event for the virtual prop transition + // $pendingOperation goes from 'insert' to null + const updateEvent = changes.find( + (c) => c.key === `1` && c.type === `update`, + ) + expect(updateEvent).toBeDefined() + if (updateEvent) { + expect(updateEvent.value.$pendingOperation).toBe(null) + } + + subscription.unsubscribe() + }) + }) + + // ── B4: Delete visibility tests (should FAIL until Phase C) ───────── + + describe(`delete visibility`, () => { + it(`deleted items are hidden from default query results (backward compat)`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-query-default`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + getKey: (item) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Default query should NOT show deleted items + const results = Array.from(liveQuery.values()) + expect(results.map((r) => stripVirtualProps(r))).toEqual([ + { id: `1`, title: `Keep` }, + ]) + }) + + it.fails( + `deleted items are visible in query when where clause references $pendingOperation`, + async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-query-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Query that references $pendingOperation — should show pending deletes + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + (item as any).$pendingOperation.neq(`no-match`), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // With $pendingOperation in where, deleted items should be visible + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) + }, + ) + + it.fails( + `pending-delete items appear in initial query snapshot when opted in`, + async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-snapshot`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Delete before creating the query + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Create a query AFTER the delete — initial snapshot should include the pending delete + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + (item as any).$pendingOperation.neq(`no-match`), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + }, + ) + + it.fails( + `sync-confirmed delete removes item from opted-in query results`, + async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const sourceCollection = createCollection({ + id: `pending-op-sync-delete`, + getKey: (item: any) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + begin() + write({ + type: `insert`, + value: { id: `1`, title: `Item 1` }, + }) + write({ + type: `insert`, + value: { id: `2`, title: `Item 2` }, + }) + commit() + markReady() + }, + }, + onDelete: async () => {}, + }) + + // Query with $pendingOperation reference + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + (item as any).$pendingOperation.neq(`no-match`), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Optimistically delete item 2 + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => {}, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Item 2 should be visible with $pendingOperation: 'delete' + let results = Array.from(liveQuery.values()) + let item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) + + // Commit the transaction + await tx.commit() + await waitForChanges() + + // Sync confirms the delete + syncFns!.begin() + syncFns!.write({ type: `delete`, key: `2` }) + syncFns!.commit() + await waitForChanges() + + // Item 2 should now be GONE (sync confirmed the delete) + results = Array.from(liveQuery.values()) + item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeUndefined() + }, + ) + }) + + // ── B5: Live query integration ────────────────────────────────────── + + describe(`live query integration`, () => { + it(`live query default behavior unchanged — deletes vanish from results`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-live-default`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + getKey: (item) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const ids = results.map((r) => (stripVirtualProps(r) as unknown as Item).id) + expect(ids).toEqual([`1`]) + }) + + it.fails( + `live query with $pendingOperation where clause shows deleted items inline`, + async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-live-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + { id: `3`, title: `Also keep` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + (item as any).$pendingOperation.neq(`no-match`), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const ids = results.map((r) => (stripVirtualProps(r) as unknown as Item).id) + + // All 3 items should be present — item 2 with $pendingOperation: 'delete' + expect(ids).toContain(`1`) + expect(ids).toContain(`2`) + expect(ids).toContain(`3`) + + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect((item2 as any).$pendingOperation).toBe(`delete`) + }, + ) + }) +}) diff --git a/packages/db/tests/utils.ts b/packages/db/tests/utils.ts index 2b5c5d0c8..d750ca3b9 100644 --- a/packages/db/tests/utils.ts +++ b/packages/db/tests/utils.ts @@ -23,6 +23,7 @@ export const stripVirtualProps = | undefined>( $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as T @@ -30,12 +31,16 @@ export const stripVirtualProps = | undefined>( export const omitVirtualProps = >( value: T, -): Omit => { +): Omit< + T, + '$synced' | '$origin' | '$key' | '$collectionId' | '$pendingOperation' +> => { const { $synced: _synced, $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as any From f95e28b2c07d07e903fd429496f39a498aad6dff Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Fri, 27 Mar 2026 15:21:51 -0500 Subject: [PATCH 2/6] feat: delete visibility in queries + GROUP BY fix + type tightening - Subscription-layer delete-to-update conversion for opted-in queries - Auto-detection of $pendingOperation in where clauses - Initial snapshot includes pending-delete items - Fix missing $pendingOperation in multi-group GROUP BY path - Tighten string|null to PendingOperationType in group-by compiler - Fix ?? vs !== undefined inconsistency for nullable $pendingOperation - Add GROUP BY tests for $pendingOperation aggregation --- packages/db/src/collection/subscription.ts | 82 ++++++++++- packages/db/src/indexes/auto-index.ts | 2 +- packages/db/src/query/compiler/group-by.ts | 14 +- .../src/query/live/collection-subscriber.ts | 30 ++++ packages/db/src/types.ts | 6 + packages/db/src/virtual-props.ts | 5 +- packages/db/tests/collection-events.test.ts | 2 +- packages/db/tests/pending-operation.test.ts | 131 ++++++++++++++++-- 8 files changed, 255 insertions(+), 17 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 2d48add4b..0a3151986 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -51,6 +51,9 @@ type CollectionSubscriptionOptions = { whereExpression?: BasicExpression /** Callback to call when the subscription is unsubscribed */ onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void + /** When true, optimistic delete events are converted to updates with $pendingOperation: 'delete' + * instead of being passed through as deletes. This keeps deleted items visible in query results. */ + includePendingDeletes?: boolean } export class CollectionSubscription @@ -319,7 +322,15 @@ export class CollectionSubscription } emitEvents(changes: Array>) { - const newChanges = this.filterAndFlipChanges(changes) + // When includePendingDeletes is enabled, convert optimistic delete events + // to updates BEFORE filterAndFlipChanges so that sentKeys correctly tracks + // the key as still present. This ensures the later sync-confirmed delete + // can properly remove the item. + const processedChanges = this.options.includePendingDeletes + ? this.convertPendingDeletes(changes) + : changes + + const newChanges = this.filterAndFlipChanges(processedChanges) if (this.isBufferingForTruncate) { // Buffer the changes instead of emitting immediately @@ -332,6 +343,50 @@ export class CollectionSubscription } } + /** + * Converts optimistic delete events to update events with $pendingOperation: 'delete'. + * Only converts deletes for keys that are still in optimisticDeletes or pendingOptimisticDeletes + * (i.e., pending deletes, not sync-confirmed ones). + */ + private convertPendingDeletes( + changes: Array>, + ): Array> { + const state = this.collection._state + const result: Array> = [] + + for (const change of changes) { + if (change.type === `delete`) { + // Check if this is an optimistic delete (key still pending) vs a sync-confirmed delete + const isPendingDelete = + state.optimisticDeletes.has(change.key) || + state.pendingOptimisticDeletes.has(change.key) + + if (isPendingDelete) { + // Convert to update — the delete event's value contains the pre-delete row data + // already enriched with virtual props. We need to add $pendingOperation: 'delete'. + const enrichedValue = { + ...change.value, + $pendingOperation: `delete` as const, + } + const previousValue = change.previousValue + ? { ...change.previousValue, $pendingOperation: null } + : { ...change.value, $pendingOperation: null } + + result.push({ + type: `update`, + key: change.key, + value: enrichedValue, + previousValue, + }) + continue + } + } + result.push(change) + } + + return result + } + /** * Sends the snapshot to the callback. * Returns a boolean indicating if it succeeded. @@ -397,6 +452,31 @@ export class CollectionSubscription return false } + // When includePendingDeletes is enabled, also include items that are + // optimistically deleted — they won't appear in entries() but their + // data is still in syncedData. + if (this.options.includePendingDeletes) { + const state = this.collection._state + const pendingDeleteKeys = new Set([ + ...state.optimisticDeletes, + ...state.pendingOptimisticDeletes, + ]) + for (const key of pendingDeleteKeys) { + const syncedValue = state.syncedData.get(key) + if (syncedValue !== undefined) { + const enrichedValue = state.enrichWithVirtualProps(syncedValue, key) + snapshot.push({ + type: `insert` as const, + key, + value: { + ...enrichedValue, + $pendingOperation: `delete` as const, + }, + }) + } + } + } + // Only send changes that have not been sent yet const filteredSnapshot = snapshot.filter( (change) => !this.sentKeys.has(change.key), diff --git a/packages/db/src/indexes/auto-index.ts b/packages/db/src/indexes/auto-index.ts index 303267cec..350b469a4 100644 --- a/packages/db/src/indexes/auto-index.ts +++ b/packages/db/src/indexes/auto-index.ts @@ -1,6 +1,6 @@ import { DEFAULT_COMPARE_OPTIONS } from '../utils' -import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry' import { hasVirtualPropPath } from '../virtual-props' +import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry' import type { CompareOptions } from '../query/builder/types' import type { BasicExpression } from '../query/ir' import type { CollectionImpl } from '../collection/index.js' diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index ce010c605..90fc0c5ed 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -21,7 +21,7 @@ import type { Select, } from '../ir.js' import type { NamespacedAndKeyedStream, NamespacedRow } from '../../types.js' -import type { VirtualOrigin } from '../../virtual-props.js' +import type { PendingOperationType, VirtualOrigin } from '../../virtual-props.js' const VIRTUAL_SYNCED_KEY = `__virtual_synced__` const VIRTUAL_HAS_LOCAL_KEY = `__virtual_has_local__` @@ -30,14 +30,14 @@ const VIRTUAL_PENDING_OP_KEY = `__virtual_pending_op__` type RowVirtualMetadata = { synced: boolean hasLocal: boolean - pendingOperation: string | null + pendingOperation: PendingOperationType } function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { let found = false let allSynced = true let hasLocal = false - let pendingOperation: string | null = null + let pendingOperation: PendingOperationType = null for (const [alias, value] of Object.entries(row)) { if (alias === `$selected`) continue @@ -59,7 +59,7 @@ function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { `$pendingOperation` in asRecord && asRecord.$pendingOperation != null ) { - pendingOperation = asRecord.$pendingOperation as string + pendingOperation = asRecord.$pendingOperation as PendingOperationType } } @@ -159,7 +159,7 @@ export function processGroupBy( [VIRTUAL_PENDING_OP_KEY]: { preMap: ([, row]: [string, NamespacedRow]) => getRowVirtualMetadata(row).pendingOperation, - reduce: (values: Array<[string | null, number]>) => { + reduce: (values: Array<[PendingOperationType, number]>) => { // Return the first non-null pending operation, or null if all are null for (const [op, multiplicity] of values) { if (op != null && multiplicity > 0) { @@ -436,10 +436,14 @@ export function processGroupBy( const groupHasLocal = (aggregatedRow as Record)[ VIRTUAL_HAS_LOCAL_KEY ] + const groupPendingOp = (aggregatedRow as Record)[ + VIRTUAL_PENDING_OP_KEY + ] resultRow.$synced = groupSynced ?? true resultRow.$origin = ( groupHasLocal ? `local` : `remote` ) satisfies VirtualOrigin + resultRow.$pendingOperation = groupPendingOp ?? null resultRow.$key = finalKey resultRow.$collectionId = aggregateCollectionId ?? resultRow.$collectionId if (mainSource && correlationKey !== undefined) { diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 8eda5cc88..977dee45e 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -212,6 +212,9 @@ export class CollectionSubscriber< } : undefined + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + const subscription = this.collection.subscribeChanges(sendChanges, { ...(includeInitialState && { includeInitialState }), whereExpression, @@ -219,6 +222,7 @@ export class CollectionSubscriber< orderBy: hints.orderBy, limit: hints.limit, onLoadSubsetResult, + ...(includePendingDeletes && { includePendingDeletes }), }) return subscription @@ -269,9 +273,13 @@ export class CollectionSubscriber< // Subscribe to changes with onStatusChange - listener is registered before any snapshot // values bigger than what we've sent don't need to be sent because they can't affect the topK + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + const subscription = this.collection.subscribeChanges(sendChangesInRange, { whereExpression, onStatusChange, + ...(includePendingDeletes && { includePendingDeletes }), }) subscriptionHolder.current = subscription @@ -468,3 +476,25 @@ export class CollectionSubscriber< ) } } + +/** + * Checks if a BasicExpression tree references $pendingOperation. + * Used to auto-detect whether a query opts into seeing pending-delete items. + */ +function expressionReferencesPendingOperation( + expr: BasicExpression | undefined, +): boolean { + if (!expr) return false + + if (expr.type === `ref`) { + return expr.path.some((segment) => segment === `$pendingOperation`) + } + + if (expr.type === `func`) { + return expr.args.some((arg) => + expressionReferencesPendingOperation(arg), + ) + } + + return false +} diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 6087e234e..41fe78be1 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -864,6 +864,12 @@ export interface SubscribeChangesOptions< * @internal */ onLoadSubsetResult?: (result: Promise | true) => void + /** + * When true, optimistic delete events are converted to updates with $pendingOperation: 'delete' + * instead of being passed through as deletes. This keeps deleted items visible in query results. + * @internal + */ + includePendingDeletes?: boolean } export interface SubscribeChangesSnapshotOptions< diff --git a/packages/db/src/virtual-props.ts b/packages/db/src/virtual-props.ts index a1af4b45f..65ad3e3e5 100644 --- a/packages/db/src/virtual-props.ts +++ b/packages/db/src/virtual-props.ts @@ -243,7 +243,10 @@ export function enrichRowWithVirtualProps< $origin: existingRow.$origin ?? computeOrigin(), $key: existingRow.$key ?? key, $collectionId: existingRow.$collectionId ?? collectionId, - $pendingOperation: existingRow.$pendingOperation ?? null, + $pendingOperation: + existingRow.$pendingOperation !== undefined + ? existingRow.$pendingOperation + : null, } as WithVirtualProps } diff --git a/packages/db/tests/collection-events.test.ts b/packages/db/tests/collection-events.test.ts index 97c6538ff..3e3221b43 100644 --- a/packages/db/tests/collection-events.test.ts +++ b/packages/db/tests/collection-events.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createCollection } from '../src/collection/index.js' -import type { Collection } from '../src/collection/index.js' import { BTreeIndex } from '../src/indexes/btree-index.js' +import type { Collection } from '../src/collection/index.js' describe(`Collection Events System`, () => { let collection: Collection diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts index a9ae82e13..ede8181d0 100644 --- a/packages/db/tests/pending-operation.test.ts +++ b/packages/db/tests/pending-operation.test.ts @@ -3,6 +3,7 @@ import { createCollection } from '../src/collection/index.js' import { createLiveQueryCollection } from '../src/query/live-query-collection.js' import { localOnlyCollectionOptions } from '../src/local-only.js' import { createTransaction } from '../src/transactions' +import { count, isNull, not, or } from '../src/query/builder/functions' import { mockSyncCollectionOptions, stripVirtualProps } from './utils' import type { ChangeMessage } from '../src/types' @@ -642,7 +643,7 @@ describe(`$pendingOperation virtual property`, () => { ]) }) - it.fails( + it( `deleted items are visible in query when where clause references $pendingOperation`, async () => { const sourceCollection = createCollection( @@ -664,7 +665,10 @@ describe(`$pendingOperation virtual property`, () => { q .from({ item: sourceCollection }) .where(({ item }) => - (item as any).$pendingOperation.neq(`no-match`), + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), ), getKey: (item: any) => item.id, }) @@ -693,7 +697,7 @@ describe(`$pendingOperation virtual property`, () => { }, ) - it.fails( + it( `pending-delete items appear in initial query snapshot when opted in`, async () => { const sourceCollection = createCollection( @@ -728,7 +732,10 @@ describe(`$pendingOperation virtual property`, () => { q .from({ item: sourceCollection }) .where(({ item }) => - (item as any).$pendingOperation.neq(`no-match`), + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), ), getKey: (item: any) => item.id, }) @@ -744,7 +751,7 @@ describe(`$pendingOperation virtual property`, () => { }, ) - it.fails( + it( `sync-confirmed delete removes item from opted-in query results`, async () => { let syncFns: { @@ -781,7 +788,10 @@ describe(`$pendingOperation virtual property`, () => { q .from({ item: sourceCollection }) .where(({ item }) => - (item as any).$pendingOperation.neq(`no-match`), + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), ), getKey: (item: any) => item.id, }) @@ -869,7 +879,7 @@ describe(`$pendingOperation virtual property`, () => { expect(ids).toEqual([`1`]) }) - it.fails( + it( `live query with $pendingOperation where clause shows deleted items inline`, async () => { const sourceCollection = createCollection( @@ -891,7 +901,10 @@ describe(`$pendingOperation virtual property`, () => { q .from({ item: sourceCollection }) .where(({ item }) => - (item as any).$pendingOperation.neq(`no-match`), + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), ), getKey: (item: any) => item.id, }) @@ -925,4 +938,106 @@ describe(`$pendingOperation virtual property`, () => { }, ) }) + + // ── GROUP BY ──────────────────────────────────────────────────────── + + describe(`GROUP BY`, () => { + type Task = { id: string; category: string; title: string } + + it(`$pendingOperation is null when all rows in group are null`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-groupby-null`, + getKey: (item: Task) => item.id, + initialData: [ + { id: `1`, category: `work`, title: `Task 1` }, + { id: `2`, category: `work`, title: `Task 2` }, + { id: `3`, category: `personal`, title: `Task 3` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ task: sourceCollection }) + .groupBy(({ task }: any) => task.category) + .select(({ task }: any) => ({ + category: task.category, + count: count(task.id), + })), + getKey: (item: any) => item.category, + }) + + await liveQuery.preload() + + const results = Array.from(liveQuery.values()) + for (const row of results) { + expect((row).$pendingOperation).toBe(null) + } + }) + + it(`$pendingOperation is non-null when any row in group has pending operation`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-groupby-nonnull`, + getKey: (item: Task) => item.id, + initialData: [ + { id: `1`, category: `work`, title: `Task 1` }, + { id: `2`, category: `work`, title: `Task 2` }, + { id: `3`, category: `personal`, title: `Task 3` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ task: sourceCollection }) + .groupBy(({ task }: any) => task.category) + .select(({ task }: any) => ({ + category: task.category, + count: count(task.id), + })), + getKey: (item: any) => item.category, + }) + + await liveQuery.preload() + + // Update one item in the 'work' group + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const workGroup = results.find( + (r) => (r).category === `work`, + ) + const personalGroup = results.find( + (r) => (r).category === `personal`, + ) + + // Work group has one updated item — $pendingOperation should be non-null + expect(workGroup).toBeDefined() + expect((workGroup).$pendingOperation).not.toBe(null) + + // Personal group has no pending changes — $pendingOperation should be null + expect(personalGroup).toBeDefined() + expect((personalGroup).$pendingOperation).toBe(null) + }) + }) }) From 81ef193fbb6cd9574ad184485f30db0fd84e54cb Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Sun, 29 Mar 2026 18:59:19 -0500 Subject: [PATCH 3/6] fix: rollback handling, isRowSynced consistency, lazy source support, memory cleanup - Fix stale $pendingOperation after rollback of optimistic delete by tracking converted delete values and converting rollback inserts to updates - Fix isRowSynced to check pendingOptimistic* maps for consistency with getPendingOperation (prevents contradictory $synced: true + $pendingOperation: 'delete') - Fix lazy source (join/subquery) pending deletes by merging child sourceWhereClauses into parent during query compilation - Fix convertedDeleteValues memory leak: clean up on sync-confirmed delete, truncate, and unsubscribe - Add computePendingOperation callback to enrichRowWithVirtualProps --- packages/db/src/collection/state.ts | 9 +- packages/db/src/collection/subscription.ts | 93 ++++++--- packages/db/src/query/compiler/index.ts | 3 + packages/db/src/virtual-props.ts | 4 +- packages/db/tests/pending-operation.test.ts | 210 +++++++++++++++++++- 5 files changed, 293 insertions(+), 26 deletions(-) diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index 7d7ac6d37..c64bbdb89 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -163,7 +163,12 @@ export class CollectionStateManager< if (this.isLocalOnly) { return true } - return !this.optimisticUpserts.has(key) && !this.optimisticDeletes.has(key) + return ( + !this.optimisticUpserts.has(key) && + !this.optimisticDeletes.has(key) && + !this.pendingOptimisticUpserts.has(key) && + !this.pendingOptimisticDeletes.has(key) + ) } /** @@ -1233,6 +1238,7 @@ export class CollectionStateManager< this.collection.id, () => previousVirtualProps.$synced, () => previousVirtualProps.$origin, + () => previousVirtualProps.$pendingOperation, ) : undefined @@ -1280,6 +1286,7 @@ export class CollectionStateManager< this.collection.id, () => previousVirtualProps.$synced, () => previousVirtualProps.$origin, + () => previousVirtualProps.$pendingOperation, ) events.push({ type: `update`, diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 0a3151986..efb7c3ebf 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -80,6 +80,12 @@ export class CollectionSubscription // Keep track of the keys we've sent (needed for join and orderBy optimizations) private sentKeys = new Set() + // Track keys where a delete was converted to an update by convertPendingDeletes, + // along with the enriched value that was sent to D2. When a rollback causes an insert + // for one of these keys, we convert it to an update with the correct previousValue + // so D2's multiplicity bookkeeping stays balanced. + private convertedDeleteValues = new Map() + // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination private limitedSnapshotRowCount = 0 @@ -168,6 +174,7 @@ export class CollectionSubscription this.limitedSnapshotRowCount = 0 this.lastSentKey = undefined this.loadedSubsets = [] + this.convertedDeleteValues.clear() return } @@ -184,6 +191,7 @@ export class CollectionSubscription this.loadedInitialState = false this.limitedSnapshotRowCount = 0 this.lastSentKey = undefined + this.convertedDeleteValues.clear() // Clear the loadedSubsets array since we're re-requesting fresh this.loadedSubsets = [] @@ -361,16 +369,20 @@ export class CollectionSubscription state.optimisticDeletes.has(change.key) || state.pendingOptimisticDeletes.has(change.key) - if (isPendingDelete) { + if (!isPendingDelete) { + // Sync-confirmed delete — clean up any stale converted value + this.convertedDeleteValues.delete(change.key) + } else { // Convert to update — the delete event's value contains the pre-delete row data // already enriched with virtual props. We need to add $pendingOperation: 'delete'. const enrichedValue = { ...change.value, $pendingOperation: `delete` as const, } - const previousValue = change.previousValue - ? { ...change.previousValue, $pendingOperation: null } - : { ...change.value, $pendingOperation: null } + // Use the pre-delete value as previousValue. change.value on delete events + // carries the pre-delete virtual props (including the previous $pendingOperation, + // e.g., 'update' if the item was updated before being deleted). + const previousValue = change.previousValue ?? change.value result.push({ type: `update`, @@ -378,8 +390,24 @@ export class CollectionSubscription value: enrichedValue, previousValue, }) + this.convertedDeleteValues.set(change.key, enrichedValue) continue } + } else if ( + change.type === `insert` && + this.convertedDeleteValues.has(change.key) + ) { + // This insert is from a rollback of a previously-converted delete. + // Convert to update with the correct previousValue (the pending-delete value + // we sent to D2) so the multiplicity bookkeeping stays balanced. + const pendingDeleteValue = this.convertedDeleteValues.get(change.key) + this.convertedDeleteValues.delete(change.key) + result.push({ + ...change, + type: `update`, + previousValue: pendingDeleteValue, + }) + continue } result.push(change) } @@ -387,6 +415,36 @@ export class CollectionSubscription return result } + /** + * Appends pending-delete items to a changes array. + * These items are in optimisticDeletes or pendingOptimisticDeletes but still + * have data in syncedData. They are added as inserts with $pendingOperation: 'delete'. + * Skips keys already in sentKeys to avoid duplicates. + */ + private appendPendingDeleteItems( + changes: Array>, + ): void { + const state = this.collection._state + const pendingDeleteKeys = new Set([ + ...state.optimisticDeletes, + ...state.pendingOptimisticDeletes, + ]) + for (const key of pendingDeleteKeys) { + if (this.sentKeys.has(key)) continue + const syncedValue = state.syncedData.get(key) + if (syncedValue !== undefined) { + // enrichWithVirtualProps computes $pendingOperation via getPendingOperation(key), + // which returns 'delete' for keys in optimisticDeletes/pendingOptimisticDeletes. + const enrichedValue = state.enrichWithVirtualProps(syncedValue, key) + changes.push({ + type: `insert` as const, + key, + value: enrichedValue, + }) + } + } + } + /** * Sends the snapshot to the callback. * Returns a boolean indicating if it succeeded. @@ -456,25 +514,7 @@ export class CollectionSubscription // optimistically deleted — they won't appear in entries() but their // data is still in syncedData. if (this.options.includePendingDeletes) { - const state = this.collection._state - const pendingDeleteKeys = new Set([ - ...state.optimisticDeletes, - ...state.pendingOptimisticDeletes, - ]) - for (const key of pendingDeleteKeys) { - const syncedValue = state.syncedData.get(key) - if (syncedValue !== undefined) { - const enrichedValue = state.enrichWithVirtualProps(syncedValue, key) - snapshot.push({ - type: `insert` as const, - key, - value: { - ...enrichedValue, - $pendingOperation: `delete` as const, - }, - }) - } - } + this.appendPendingDeleteItems(snapshot) } // Only send changes that have not been sent yet @@ -623,6 +663,12 @@ export class CollectionSubscription keys = index.take(valuesNeeded(), biggestObservedValue!, filterFn) } + // When includePendingDeletes is enabled, also include pending-delete items + // that the index traversal missed (because collection.get() returns undefined for them). + if (this.options.includePendingDeletes) { + this.appendPendingDeleteItems(changes) + } + // Track row count for offset-based pagination (before sending to callback) // Use the current count as the offset for this load const currentOffset = this.limitedSnapshotRowCount @@ -816,6 +862,7 @@ export class CollectionSubscription this.collection._sync.unloadSubset(options) } this.loadedSubsets = [] + this.convertedDeleteValues.clear() this.emitInner(`unsubscribed`, { type: `unsubscribed`, diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 70786ca8d..49882ba1d 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -410,6 +410,9 @@ export function compileQuery( // Merge child's alias metadata into parent's Object.assign(aliasToCollectionId, childResult.aliasToCollectionId) Object.assign(aliasRemapping, childResult.aliasRemapping) + for (const [alias, whereClause] of childResult.sourceWhereClauses) { + sourceWhereClauses.set(alias, whereClause) + } includesResults.push({ pipeline: childResult.pipeline, diff --git a/packages/db/src/virtual-props.ts b/packages/db/src/virtual-props.ts index 65ad3e3e5..525def288 100644 --- a/packages/db/src/virtual-props.ts +++ b/packages/db/src/virtual-props.ts @@ -219,6 +219,7 @@ export function createVirtualProps( * @param collectionId - The collection's ID * @param computeSynced - Function to compute $synced if missing * @param computeOrigin - Function to compute $origin if missing + * @param computePendingOperation - Function to compute $pendingOperation if missing (defaults to null) * @returns The row with virtual properties (possibly the same object if already present) * * @internal @@ -232,6 +233,7 @@ export function enrichRowWithVirtualProps< collectionId: string, computeSynced: () => boolean, computeOrigin: () => VirtualOrigin, + computePendingOperation?: () => PendingOperationType, ): WithVirtualProps { // Use nullish coalescing to preserve existing virtual properties (pass-through) // This is the "add-if-missing" pattern described in the RFC @@ -246,7 +248,7 @@ export function enrichRowWithVirtualProps< $pendingOperation: existingRow.$pendingOperation !== undefined ? existingRow.$pendingOperation - : null, + : (computePendingOperation?.() ?? null), } as WithVirtualProps } diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts index ede8181d0..389a66aba 100644 --- a/packages/db/tests/pending-operation.test.ts +++ b/packages/db/tests/pending-operation.test.ts @@ -3,7 +3,7 @@ import { createCollection } from '../src/collection/index.js' import { createLiveQueryCollection } from '../src/query/live-query-collection.js' import { localOnlyCollectionOptions } from '../src/local-only.js' import { createTransaction } from '../src/transactions' -import { count, isNull, not, or } from '../src/query/builder/functions' +import { and, count, eq, isNull, not, or } from '../src/query/builder/functions' import { mockSyncCollectionOptions, stripVirtualProps } from './utils' import type { ChangeMessage } from '../src/types' @@ -1040,4 +1040,212 @@ describe(`$pendingOperation virtual property`, () => { expect((personalGroup).$pendingOperation).toBe(null) }) }) + + it(`$synced should be false when $pendingOperation is non-null (pendingOptimistic state)`, async () => { + let resolveDelete: (() => void) | undefined + + const collection = createCollection({ + id: `pending-op-synced-consistency`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: `1`, title: `Item 1` } }) + commit() + markReady() + }, + }, + onDelete: async () => { + await new Promise((resolve) => { + resolveDelete = resolve + }) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: true }) + await waitForChanges() + + // Delete item — auto-commits via onDelete, holds in persisting state + collection.delete(`1`) + await waitForChanges() + + // Resolve to move to 'completed' state (pending sync confirmation) + resolveDelete?.() + await waitForChanges() + + // $synced and $pendingOperation should be consistent: + // if there's a pending operation, $synced should be false + const pendingOp = collection._state.getPendingOperation(`1`) + const synced = collection._state.isRowSynced(`1`) + + expect(pendingOp).toBe(`delete`) + expect(synced).toBe(false) + }) + + it(`rollback of optimistic delete restores item in query with $pendingOperation`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `rollback-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + }) + + // Live query with $pendingOperation tautology (enables includePendingDeletes) + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .where(({ item }: any) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Sync an item into the collection + syncFns!.begin() + syncFns!.write({ + type: `insert`, + value: { id: `1`, title: `Existing` }, + }) + syncFns!.commit() + await waitForChanges() + + // Item should be visible with $pendingOperation: null + let results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect((results[0]).$pendingOperation).toBe(null) + + // Delete via a pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // Item should still be visible with $pendingOperation: 'delete' + results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect((results[0]).$pendingOperation).toBe(`delete`) + + // Rollback the delete + tx.rollback() + await waitForChanges() + + // Item should be restored with $pendingOperation: null + results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect((results[0]).$pendingOperation).toBe(null) + }) + + // Lazy source (join) — test that pending-delete items in joined collections + // are visible via live events (not snapshot, since lazy sources skip initial state) + it(`pending-delete items in joined collection are visible via live events`, async () => { + type Parent = { id: string } + type Child = { id: string; parentId: string; name: string } + + let parentSyncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + let childSyncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const parents = createCollection({ + id: `lazy-parent`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + parentSyncFns = { begin, write, commit } + markReady() + }, + }, + }) + + const children = createCollection({ + id: `lazy-child`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + childSyncFns = { begin, write, commit } + markReady() + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ p: parents }) + .select(({ p }: any) => ({ + ...p, + children: q + .from({ c: children }) + .where(({ c }: any) => + and( + eq(c.parentId, p.id), + or(isNull(c.$pendingOperation), not(isNull(c.$pendingOperation))), + ), + ), + })), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Sync parent and child + parentSyncFns!.begin() + parentSyncFns!.write({ type: `insert`, value: { id: `p1` } }) + parentSyncFns!.commit() + + childSyncFns!.begin() + childSyncFns!.write({ type: `insert`, value: { id: `c1`, parentId: `p1`, name: `Child` } }) + childSyncFns!.commit() + await waitForChanges() + + // Delete the child via pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + children.delete(`c1`) + }) + await waitForChanges() + + // Child should be visible in the join with $pendingOperation: 'delete' + const results = Array.from(liveQuery.values()) + const parent = results.find((r: any) => r.id === `p1`) + expect(parent).toBeDefined() + const childResults = Array.from((parent).children.values()) + expect(childResults).toHaveLength(1) + expect((childResults[0] as any).$pendingOperation).toBe(`delete`) + }) }) From dc898c2c2b4cb43b1c2bce7ae04b042286fdd4cc Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Sun, 29 Mar 2026 19:40:36 -0500 Subject: [PATCH 4/6] feat: add $pendingOperation support to createEffect Export expressionReferencesPendingOperation from collection-subscriber and use it in effect.ts buildSubscriptionOptions to auto-detect $pendingOperation references and set includePendingDeletes on the subscription. --- packages/db/src/query/effect.ts | 12 +++- .../src/query/live/collection-subscriber.ts | 2 +- packages/db/tests/pending-operation.test.ts | 69 +++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 10857d06e..42ca99d0b 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -7,6 +7,7 @@ import { normalizeOrderByPaths, } from './compiler/expressions.js' import { getCollectionBuilder } from './live/collection-registry.js' +import { expressionReferencesPendingOperation } from './live/collection-subscriber.js' import { buildQueryFromConfig, computeOrderedLoadCursor, @@ -805,13 +806,21 @@ class EffectPipelineRunner { ): { includeInitialState?: boolean whereExpression?: BasicExpression + includePendingDeletes?: boolean orderBy?: any limit?: number } { + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + // Ordered aliases explicitly disable initial state — data is loaded // via requestLimitedSnapshot/requestSnapshot after subscription setup. if (orderByInfo) { - return { includeInitialState: false, whereExpression } + return { + includeInitialState: false, + whereExpression, + ...(includePendingDeletes && { includePendingDeletes }), + } } const includeInitialState = !isLazy @@ -823,6 +832,7 @@ class EffectPipelineRunner { return { includeInitialState, whereExpression, + ...(includePendingDeletes && { includePendingDeletes }), ...(hints.orderBy ? { orderBy: hints.orderBy } : {}), ...(hints.limit !== undefined ? { limit: hints.limit } : {}), } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 977dee45e..4ae5b1135 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -481,7 +481,7 @@ export class CollectionSubscriber< * Checks if a BasicExpression tree references $pendingOperation. * Used to auto-detect whether a query opts into seeing pending-delete items. */ -function expressionReferencesPendingOperation( +export function expressionReferencesPendingOperation( expr: BasicExpression | undefined, ): boolean { if (!expr) return false diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts index 389a66aba..bba2fce39 100644 --- a/packages/db/tests/pending-operation.test.ts +++ b/packages/db/tests/pending-operation.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest' import { createCollection } from '../src/collection/index.js' import { createLiveQueryCollection } from '../src/query/live-query-collection.js' +import { createEffect } from '../src/index.js' import { localOnlyCollectionOptions } from '../src/local-only.js' import { createTransaction } from '../src/transactions' import { and, count, eq, isNull, not, or } from '../src/query/builder/functions' @@ -1248,4 +1249,72 @@ describe(`$pendingOperation virtual property`, () => { expect(childResults).toHaveLength(1) expect((childResults[0] as any).$pendingOperation).toBe(`delete`) }) + + // createEffect support + it(`createEffect onEnter fires for pending-delete items when $pendingOperation is in where`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `effect-pending-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + }) + + // Activate sync by subscribing + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + // Sync an item + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `Hello` } }) + syncFns!.commit() + await waitForChanges() + + // Delete via pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // createEffect with $pendingOperation in where — should find the deleted item + const entered: Array = [] + const effect = createEffect({ + query: (q: any) => + q + .from({ item: collection }) + .where(({ item }: any) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + onEnter: (event: any) => { + entered.push(event.value) + }, + }) + + await waitForChanges() + + // The deleted item should have triggered onEnter with $pendingOperation: 'delete' + const deletedItem = entered.find((v: any) => v.id === `1`) + expect(deletedItem).toBeDefined() + expect(deletedItem.$pendingOperation).toBe(`delete`) + + effect.dispose() + }) }) From 69edb878955f4387ca330226a263eb7a2ca822d5 Mon Sep 17 00:00:00 2001 From: Marc MacLeod Date: Sun, 29 Mar 2026 20:40:02 -0500 Subject: [PATCH 5/6] chore: `$pendingOperation` changeset Signed-off-by: Marc MacLeod --- .changeset/pending-operation-virtual-prop.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/pending-operation-virtual-prop.md diff --git a/.changeset/pending-operation-virtual-prop.md b/.changeset/pending-operation-virtual-prop.md new file mode 100644 index 000000000..d0dcb23b9 --- /dev/null +++ b/.changeset/pending-operation-virtual-prop.md @@ -0,0 +1,9 @@ +--- +"@tanstack/db": minor +--- + +Add `$pendingOperation` virtual property to track optimistic mutation type + +- New virtual property `$pendingOperation` on every collection row: `'insert' | 'update' | 'delete' | null` +- Items deleted in pending transactions can stay visible in query results when `$pendingOperation` is referenced in a `.where()` clause +- Works with live queries, `createEffect`, joins/subqueries, GROUP BY, and ordered/paginated queries From afb1604a2aab61135962670e925f1b43d2431339 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 30 Mar 2026 01:55:17 +0000 Subject: [PATCH 6/6] ci: apply automated fixes --- .changeset/pending-operation-virtual-prop.md | 2 +- packages/db/src/query/compiler/group-by.ts | 5 +- .../src/query/live/collection-subscriber.ts | 4 +- packages/db/tests/pending-operation.test.ts | 504 +++++++++--------- 4 files changed, 253 insertions(+), 262 deletions(-) diff --git a/.changeset/pending-operation-virtual-prop.md b/.changeset/pending-operation-virtual-prop.md index d0dcb23b9..58b8af46f 100644 --- a/.changeset/pending-operation-virtual-prop.md +++ b/.changeset/pending-operation-virtual-prop.md @@ -1,5 +1,5 @@ --- -"@tanstack/db": minor +'@tanstack/db': minor --- Add `$pendingOperation` virtual property to track optimistic mutation type diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 90fc0c5ed..e32f9260e 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -21,7 +21,10 @@ import type { Select, } from '../ir.js' import type { NamespacedAndKeyedStream, NamespacedRow } from '../../types.js' -import type { PendingOperationType, VirtualOrigin } from '../../virtual-props.js' +import type { + PendingOperationType, + VirtualOrigin, +} from '../../virtual-props.js' const VIRTUAL_SYNCED_KEY = `__virtual_synced__` const VIRTUAL_HAS_LOCAL_KEY = `__virtual_has_local__` diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 4ae5b1135..213524f72 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -491,9 +491,7 @@ export function expressionReferencesPendingOperation( } if (expr.type === `func`) { - return expr.args.some((arg) => - expressionReferencesPendingOperation(arg), - ) + return expr.args.some((arg) => expressionReferencesPendingOperation(arg)) } return false diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts index bba2fce39..fa3954be2 100644 --- a/packages/db/tests/pending-operation.test.ts +++ b/packages/db/tests/pending-operation.test.ts @@ -162,9 +162,7 @@ describe(`$pendingOperation virtual property`, () => { await collection.preload() - const mutationFn = vi - .fn() - .mockRejectedValue(new Error(`Rollback test`)) + const mutationFn = vi.fn().mockRejectedValue(new Error(`Rollback test`)) const tx = createTransaction({ autoCommit: false, @@ -644,199 +642,190 @@ describe(`$pendingOperation virtual property`, () => { ]) }) - it( - `deleted items are visible in query when where clause references $pendingOperation`, - async () => { - const sourceCollection = createCollection( - mockSyncCollectionOptions({ - id: `pending-op-query-optin`, - getKey: (item: Item) => item.id, - initialData: [ - { id: `1`, title: `Keep` }, - { id: `2`, title: `Delete me` }, - ], - }), - ) - - await sourceCollection.preload() - - // Query that references $pendingOperation — should show pending deletes - const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ item: sourceCollection }) - .where(({ item }) => - or( - isNull(item.$pendingOperation), - not(isNull(item.$pendingOperation)), - ), + it(`deleted items are visible in query when where clause references $pendingOperation`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-query-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Query that references $pendingOperation — should show pending deletes + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), ), - getKey: (item: any) => item.id, - }) + ), + getKey: (item: any) => item.id, + }) - await liveQuery.preload() + await liveQuery.preload() - const tx = createTransaction({ - autoCommit: false, - mutationFn: async () => { - await new Promise(() => {}) - }, - }) + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) - tx.mutate(() => { - sourceCollection.delete(`2`) - }) - await waitForChanges() - - // With $pendingOperation in where, deleted items should be visible - const results = Array.from(liveQuery.values()) - const item2 = results.find( - (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, - ) - expect(item2).toBeDefined() - expect((item2 as any).$pendingOperation).toBe(`delete`) - }, - ) - - it( - `pending-delete items appear in initial query snapshot when opted in`, - async () => { - const sourceCollection = createCollection( - mockSyncCollectionOptions({ - id: `pending-op-snapshot`, - getKey: (item: Item) => item.id, - initialData: [ - { id: `1`, title: `Keep` }, - { id: `2`, title: `Delete me` }, - ], - }), - ) - - await sourceCollection.preload() - - // Delete before creating the query - const tx = createTransaction({ - autoCommit: false, - mutationFn: async () => { - await new Promise(() => {}) - }, - }) + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() - tx.mutate(() => { - sourceCollection.delete(`2`) - }) - await waitForChanges() - - // Create a query AFTER the delete — initial snapshot should include the pending delete - const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ item: sourceCollection }) - .where(({ item }) => - or( - isNull(item.$pendingOperation), - not(isNull(item.$pendingOperation)), - ), + // With $pendingOperation in where, deleted items should be visible + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) + }) + + it(`pending-delete items appear in initial query snapshot when opted in`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-snapshot`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Delete before creating the query + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Create a query AFTER the delete — initial snapshot should include the pending delete + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), ), - getKey: (item: any) => item.id, - }) + ), + getKey: (item: any) => item.id, + }) - await liveQuery.preload() - await waitForChanges() + await liveQuery.preload() + await waitForChanges() - const results = Array.from(liveQuery.values()) - const item2 = results.find( - (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, - ) - expect(item2).toBeDefined() - }, - ) - - it( - `sync-confirmed delete removes item from opted-in query results`, - async () => { - let syncFns: { - begin: () => void - write: (msg: any) => void - commit: () => void - } - - const sourceCollection = createCollection({ - id: `pending-op-sync-delete`, - getKey: (item: any) => item.id, - sync: { - sync: ({ begin, write, commit, markReady }) => { - syncFns = { begin, write, commit } - begin() - write({ - type: `insert`, - value: { id: `1`, title: `Item 1` }, - }) - write({ - type: `insert`, - value: { id: `2`, title: `Item 2` }, - }) - commit() - markReady() - }, + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + }) + + it(`sync-confirmed delete removes item from opted-in query results`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const sourceCollection = createCollection({ + id: `pending-op-sync-delete`, + getKey: (item: any) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + begin() + write({ + type: `insert`, + value: { id: `1`, title: `Item 1` }, + }) + write({ + type: `insert`, + value: { id: `2`, title: `Item 2` }, + }) + commit() + markReady() }, - onDelete: async () => {}, - }) + }, + onDelete: async () => {}, + }) - // Query with $pendingOperation reference - const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ item: sourceCollection }) - .where(({ item }) => - or( - isNull(item.$pendingOperation), - not(isNull(item.$pendingOperation)), - ), + // Query with $pendingOperation reference + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), ), - getKey: (item: any) => item.id, - }) + ), + getKey: (item: any) => item.id, + }) - await liveQuery.preload() - await waitForChanges() + await liveQuery.preload() + await waitForChanges() - // Optimistically delete item 2 - const tx = createTransaction({ - autoCommit: false, - mutationFn: async () => {}, - }) + // Optimistically delete item 2 + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => {}, + }) - tx.mutate(() => { - sourceCollection.delete(`2`) - }) - await waitForChanges() + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() - // Item 2 should be visible with $pendingOperation: 'delete' - let results = Array.from(liveQuery.values()) - let item2 = results.find( - (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, - ) - expect(item2).toBeDefined() - expect((item2 as any).$pendingOperation).toBe(`delete`) + // Item 2 should be visible with $pendingOperation: 'delete' + let results = Array.from(liveQuery.values()) + let item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) - // Commit the transaction - await tx.commit() - await waitForChanges() - - // Sync confirms the delete - syncFns!.begin() - syncFns!.write({ type: `delete`, key: `2` }) - syncFns!.commit() - await waitForChanges() - - // Item 2 should now be GONE (sync confirmed the delete) - results = Array.from(liveQuery.values()) - item2 = results.find( - (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, - ) - expect(item2).toBeUndefined() - }, - ) + // Commit the transaction + await tx.commit() + await waitForChanges() + + // Sync confirms the delete + syncFns!.begin() + syncFns!.write({ type: `delete`, key: `2` }) + syncFns!.commit() + await waitForChanges() + + // Item 2 should now be GONE (sync confirmed the delete) + results = Array.from(liveQuery.values()) + item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeUndefined() + }) }) // ── B5: Live query integration ────────────────────────────────────── @@ -876,68 +865,69 @@ describe(`$pendingOperation virtual property`, () => { await waitForChanges() const results = Array.from(liveQuery.values()) - const ids = results.map((r) => (stripVirtualProps(r) as unknown as Item).id) + const ids = results.map( + (r) => (stripVirtualProps(r) as unknown as Item).id, + ) expect(ids).toEqual([`1`]) }) - it( - `live query with $pendingOperation where clause shows deleted items inline`, - async () => { - const sourceCollection = createCollection( - mockSyncCollectionOptions({ - id: `pending-op-live-optin`, - getKey: (item: Item) => item.id, - initialData: [ - { id: `1`, title: `Keep` }, - { id: `2`, title: `Delete me` }, - { id: `3`, title: `Also keep` }, - ], - }), - ) - - await sourceCollection.preload() - - const liveQuery = createLiveQueryCollection({ - query: (q) => - q - .from({ item: sourceCollection }) - .where(({ item }) => - or( - isNull(item.$pendingOperation), - not(isNull(item.$pendingOperation)), - ), + it(`live query with $pendingOperation where clause shows deleted items inline`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-live-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + { id: `3`, title: `Also keep` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), ), - getKey: (item: any) => item.id, - }) + ), + getKey: (item: any) => item.id, + }) - await liveQuery.preload() + await liveQuery.preload() - const tx = createTransaction({ - autoCommit: false, - mutationFn: async () => { - await new Promise(() => {}) - }, - }) + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) - tx.mutate(() => { - sourceCollection.delete(`2`) - }) - await waitForChanges() + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() - const results = Array.from(liveQuery.values()) - const ids = results.map((r) => (stripVirtualProps(r) as unknown as Item).id) + const results = Array.from(liveQuery.values()) + const ids = results.map( + (r) => (stripVirtualProps(r) as unknown as Item).id, + ) - // All 3 items should be present — item 2 with $pendingOperation: 'delete' - expect(ids).toContain(`1`) - expect(ids).toContain(`2`) - expect(ids).toContain(`3`) + // All 3 items should be present — item 2 with $pendingOperation: 'delete' + expect(ids).toContain(`1`) + expect(ids).toContain(`2`) + expect(ids).toContain(`3`) - const item2 = results.find( - (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, - ) - expect((item2 as any).$pendingOperation).toBe(`delete`) - }, - ) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect((item2 as any).$pendingOperation).toBe(`delete`) + }) }) // ── GROUP BY ──────────────────────────────────────────────────────── @@ -976,7 +966,7 @@ describe(`$pendingOperation virtual property`, () => { const results = Array.from(liveQuery.values()) for (const row of results) { - expect((row).$pendingOperation).toBe(null) + expect(row.$pendingOperation).toBe(null) } }) @@ -1025,20 +1015,16 @@ describe(`$pendingOperation virtual property`, () => { await waitForChanges() const results = Array.from(liveQuery.values()) - const workGroup = results.find( - (r) => (r).category === `work`, - ) - const personalGroup = results.find( - (r) => (r).category === `personal`, - ) + const workGroup = results.find((r) => r.category === `work`) + const personalGroup = results.find((r) => r.category === `personal`) // Work group has one updated item — $pendingOperation should be non-null expect(workGroup).toBeDefined() - expect((workGroup).$pendingOperation).not.toBe(null) + expect(workGroup.$pendingOperation).not.toBe(null) // Personal group has no pending changes — $pendingOperation should be null expect(personalGroup).toBeDefined() - expect((personalGroup).$pendingOperation).toBe(null) + expect(personalGroup.$pendingOperation).toBe(null) }) }) @@ -1130,7 +1116,7 @@ describe(`$pendingOperation virtual property`, () => { // Item should be visible with $pendingOperation: null let results = Array.from(liveQuery.values()) expect(results).toHaveLength(1) - expect((results[0]).$pendingOperation).toBe(null) + expect(results[0].$pendingOperation).toBe(null) // Delete via a pending transaction const tx = createTransaction({ @@ -1147,7 +1133,7 @@ describe(`$pendingOperation virtual property`, () => { // Item should still be visible with $pendingOperation: 'delete' results = Array.from(liveQuery.values()) expect(results).toHaveLength(1) - expect((results[0]).$pendingOperation).toBe(`delete`) + expect(results[0].$pendingOperation).toBe(`delete`) // Rollback the delete tx.rollback() @@ -1156,7 +1142,7 @@ describe(`$pendingOperation virtual property`, () => { // Item should be restored with $pendingOperation: null results = Array.from(liveQuery.values()) expect(results).toHaveLength(1) - expect((results[0]).$pendingOperation).toBe(null) + expect(results[0].$pendingOperation).toBe(null) }) // Lazy source (join) — test that pending-delete items in joined collections @@ -1200,19 +1186,20 @@ describe(`$pendingOperation virtual property`, () => { const liveQuery = createLiveQueryCollection({ query: (q: any) => - q - .from({ p: parents }) - .select(({ p }: any) => ({ - ...p, - children: q - .from({ c: children }) - .where(({ c }: any) => - and( - eq(c.parentId, p.id), - or(isNull(c.$pendingOperation), not(isNull(c.$pendingOperation))), + q.from({ p: parents }).select(({ p }: any) => ({ + ...p, + children: q + .from({ c: children }) + .where(({ c }: any) => + and( + eq(c.parentId, p.id), + or( + isNull(c.$pendingOperation), + not(isNull(c.$pendingOperation)), ), ), - })), + ), + })), getKey: (item: any) => item.id, }) @@ -1225,7 +1212,10 @@ describe(`$pendingOperation virtual property`, () => { parentSyncFns!.commit() childSyncFns!.begin() - childSyncFns!.write({ type: `insert`, value: { id: `c1`, parentId: `p1`, name: `Child` } }) + childSyncFns!.write({ + type: `insert`, + value: { id: `c1`, parentId: `p1`, name: `Child` }, + }) childSyncFns!.commit() await waitForChanges() @@ -1245,7 +1235,7 @@ describe(`$pendingOperation virtual property`, () => { const results = Array.from(liveQuery.values()) const parent = results.find((r: any) => r.id === `p1`) expect(parent).toBeDefined() - const childResults = Array.from((parent).children.values()) + const childResults = Array.from(parent.children.values()) expect(childResults).toHaveLength(1) expect((childResults[0] as any).$pendingOperation).toBe(`delete`) })