diff --git a/.changeset/pending-operation-virtual-prop.md b/.changeset/pending-operation-virtual-prop.md new file mode 100644 index 000000000..58b8af46f --- /dev/null +++ b/.changeset/pending-operation-virtual-prop.md @@ -0,0 +1,9 @@ +--- +'@tanstack/db': minor +--- + +Add `$pendingOperation` virtual property to track optimistic mutation type + +- New virtual property `$pendingOperation` on every collection row: `'insert' | 'update' | 'delete' | null` +- Items deleted in pending transactions can stay visible in query results when `$pendingOperation` is referenced in a `.where()` clause +- Works with live queries, `createEffect`, joins/subqueries, GROUP BY, and ordered/paginated queries diff --git a/packages/db-sqlite-persistence-core/tests/persisted.test.ts b/packages/db-sqlite-persistence-core/tests/persisted.test.ts index 63af1f5d4..f556044e3 100644 --- a/packages/db-sqlite-persistence-core/tests/persisted.test.ts +++ b/packages/db-sqlite-persistence-core/tests/persisted.test.ts @@ -241,6 +241,7 @@ const stripVirtualProps = | undefined>( $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as T diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index af65cb801..c64bbdb89 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -3,6 +3,7 @@ import { SortedMap } from '../SortedMap' import { enrichRowWithVirtualProps } from '../virtual-props.js' import { DIRECT_TRANSACTION_METADATA_KEY } from './transaction-metadata.js' import type { + PendingOperationType, VirtualOrigin, VirtualRowProps, WithVirtualProps, @@ -110,6 +111,7 @@ export class CollectionStateManager< origin: VirtualOrigin key: TKey collectionId: string + pendingOperation: PendingOperationType enriched: WithVirtualProps } >() @@ -161,7 +163,38 @@ export class CollectionStateManager< if (this.isLocalOnly) { return true } - return !this.optimisticUpserts.has(key) && !this.optimisticDeletes.has(key) + return ( + !this.optimisticUpserts.has(key) && + !this.optimisticDeletes.has(key) && + !this.pendingOptimisticUpserts.has(key) && + !this.pendingOptimisticDeletes.has(key) + ) + } + + /** + * Gets the pending operation type for a row. + * Returns the type of optimistic mutation pending for this key, or null if none. + * Used to compute the $pendingOperation virtual property. + */ + public getPendingOperation(key: TKey): PendingOperationType { + if (this.isLocalOnly) { + return null + } + // Check active optimistic state + if (this.optimisticDeletes.has(key)) { + return 'delete' + } + if (this.optimisticUpserts.has(key)) { + return this.syncedData.has(key) ? 'update' : 'insert' + } + // Check completed-but-awaiting-sync state to avoid flicker + if (this.pendingOptimisticDeletes.has(key)) { + return 'delete' + } + if (this.pendingOptimisticUpserts.has(key)) { + return this.syncedData.has(key) ? 'update' : 'insert' + } + return null } /** @@ -190,6 +223,10 @@ export class CollectionStateManager< $origin: overrides?.$origin ?? this.getRowOrigin(key), $key: overrides?.$key ?? key, $collectionId: overrides?.$collectionId ?? this.collection.id, + $pendingOperation: + overrides?.$pendingOperation !== undefined + ? overrides.$pendingOperation + : this.getPendingOperation(key), } } @@ -206,6 +243,7 @@ export class CollectionStateManager< return this.createVirtualPropsSnapshot(key, { $synced: true, $origin: 'local', + $pendingOperation: null, }) } @@ -218,11 +256,20 @@ export class CollectionStateManager< optimisticDeletes.has(key) || options?.completedOptimisticKeys?.has(key) === true + // Compute $pendingOperation from the provided or current state + let pendingOperation: PendingOperationType = null + if (optimisticDeletes.has(key)) { + pendingOperation = 'delete' + } else if (optimisticUpserts.has(key)) { + pendingOperation = this.syncedData.has(key) ? 'update' : 'insert' + } + return this.createVirtualPropsSnapshot(key, { $synced: !hasOptimisticChange, $origin: hasOptimisticChange ? 'local' : ((options?.rowOrigins ?? this.rowOrigins).get(key) ?? 'remote'), + $pendingOperation: pendingOperation, }) } @@ -235,6 +282,10 @@ export class CollectionStateManager< const origin = existingRow.$origin ?? virtualProps.$origin const resolvedKey = existingRow.$key ?? virtualProps.$key const collectionId = existingRow.$collectionId ?? virtualProps.$collectionId + const pendingOperation = + existingRow.$pendingOperation !== undefined + ? existingRow.$pendingOperation + : virtualProps.$pendingOperation const cached = this.virtualPropsCache.get(row as object) if ( @@ -242,7 +293,8 @@ export class CollectionStateManager< cached.synced === synced && cached.origin === origin && cached.key === resolvedKey && - cached.collectionId === collectionId + cached.collectionId === collectionId && + cached.pendingOperation === pendingOperation ) { return cached.enriched } @@ -253,6 +305,7 @@ export class CollectionStateManager< $origin: origin, $key: resolvedKey, $collectionId: collectionId, + $pendingOperation: pendingOperation, } as WithVirtualProps this.virtualPropsCache.set(row as object, { @@ -260,6 +313,7 @@ export class CollectionStateManager< origin, key: resolvedKey, collectionId, + pendingOperation, enriched, }) @@ -1173,7 +1227,9 @@ export class CollectionStateManager< const nextVirtualProps = this.getVirtualPropsSnapshotForState(key) const virtualChanged = previousVirtualProps.$synced !== nextVirtualProps.$synced || - previousVirtualProps.$origin !== nextVirtualProps.$origin + previousVirtualProps.$origin !== nextVirtualProps.$origin || + previousVirtualProps.$pendingOperation !== + nextVirtualProps.$pendingOperation const previousValueWithVirtual = previousVisibleValue !== undefined ? enrichRowWithVirtualProps( @@ -1182,6 +1238,7 @@ export class CollectionStateManager< this.collection.id, () => previousVirtualProps.$synced, () => previousVirtualProps.$origin, + () => previousVirtualProps.$pendingOperation, ) : undefined @@ -1229,6 +1286,7 @@ export class CollectionStateManager< this.collection.id, () => previousVirtualProps.$synced, () => previousVirtualProps.$origin, + () => previousVirtualProps.$pendingOperation, ) events.push({ type: `update`, diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 2d48add4b..efb7c3ebf 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -51,6 +51,9 @@ type CollectionSubscriptionOptions = { whereExpression?: BasicExpression /** Callback to call when the subscription is unsubscribed */ onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void + /** When true, optimistic delete events are converted to updates with $pendingOperation: 'delete' + * instead of being passed through as deletes. This keeps deleted items visible in query results. */ + includePendingDeletes?: boolean } export class CollectionSubscription @@ -77,6 +80,12 @@ export class CollectionSubscription // Keep track of the keys we've sent (needed for join and orderBy optimizations) private sentKeys = new Set() + // Track keys where a delete was converted to an update by convertPendingDeletes, + // along with the enriched value that was sent to D2. When a rollback causes an insert + // for one of these keys, we convert it to an update with the correct previousValue + // so D2's multiplicity bookkeeping stays balanced. + private convertedDeleteValues = new Map() + // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination private limitedSnapshotRowCount = 0 @@ -165,6 +174,7 @@ export class CollectionSubscription this.limitedSnapshotRowCount = 0 this.lastSentKey = undefined this.loadedSubsets = [] + this.convertedDeleteValues.clear() return } @@ -181,6 +191,7 @@ export class CollectionSubscription this.loadedInitialState = false this.limitedSnapshotRowCount = 0 this.lastSentKey = undefined + this.convertedDeleteValues.clear() // Clear the loadedSubsets array since we're re-requesting fresh this.loadedSubsets = [] @@ -319,7 +330,15 @@ export class CollectionSubscription } emitEvents(changes: Array>) { - const newChanges = this.filterAndFlipChanges(changes) + // When includePendingDeletes is enabled, convert optimistic delete events + // to updates BEFORE filterAndFlipChanges so that sentKeys correctly tracks + // the key as still present. This ensures the later sync-confirmed delete + // can properly remove the item. + const processedChanges = this.options.includePendingDeletes + ? this.convertPendingDeletes(changes) + : changes + + const newChanges = this.filterAndFlipChanges(processedChanges) if (this.isBufferingForTruncate) { // Buffer the changes instead of emitting immediately @@ -332,6 +351,100 @@ export class CollectionSubscription } } + /** + * Converts optimistic delete events to update events with $pendingOperation: 'delete'. + * Only converts deletes for keys that are still in optimisticDeletes or pendingOptimisticDeletes + * (i.e., pending deletes, not sync-confirmed ones). + */ + private convertPendingDeletes( + changes: Array>, + ): Array> { + const state = this.collection._state + const result: Array> = [] + + for (const change of changes) { + if (change.type === `delete`) { + // Check if this is an optimistic delete (key still pending) vs a sync-confirmed delete + const isPendingDelete = + state.optimisticDeletes.has(change.key) || + state.pendingOptimisticDeletes.has(change.key) + + if (!isPendingDelete) { + // Sync-confirmed delete — clean up any stale converted value + this.convertedDeleteValues.delete(change.key) + } else { + // Convert to update — the delete event's value contains the pre-delete row data + // already enriched with virtual props. We need to add $pendingOperation: 'delete'. + const enrichedValue = { + ...change.value, + $pendingOperation: `delete` as const, + } + // Use the pre-delete value as previousValue. change.value on delete events + // carries the pre-delete virtual props (including the previous $pendingOperation, + // e.g., 'update' if the item was updated before being deleted). + const previousValue = change.previousValue ?? change.value + + result.push({ + type: `update`, + key: change.key, + value: enrichedValue, + previousValue, + }) + this.convertedDeleteValues.set(change.key, enrichedValue) + continue + } + } else if ( + change.type === `insert` && + this.convertedDeleteValues.has(change.key) + ) { + // This insert is from a rollback of a previously-converted delete. + // Convert to update with the correct previousValue (the pending-delete value + // we sent to D2) so the multiplicity bookkeeping stays balanced. + const pendingDeleteValue = this.convertedDeleteValues.get(change.key) + this.convertedDeleteValues.delete(change.key) + result.push({ + ...change, + type: `update`, + previousValue: pendingDeleteValue, + }) + continue + } + result.push(change) + } + + return result + } + + /** + * Appends pending-delete items to a changes array. + * These items are in optimisticDeletes or pendingOptimisticDeletes but still + * have data in syncedData. They are added as inserts with $pendingOperation: 'delete'. + * Skips keys already in sentKeys to avoid duplicates. + */ + private appendPendingDeleteItems( + changes: Array>, + ): void { + const state = this.collection._state + const pendingDeleteKeys = new Set([ + ...state.optimisticDeletes, + ...state.pendingOptimisticDeletes, + ]) + for (const key of pendingDeleteKeys) { + if (this.sentKeys.has(key)) continue + const syncedValue = state.syncedData.get(key) + if (syncedValue !== undefined) { + // enrichWithVirtualProps computes $pendingOperation via getPendingOperation(key), + // which returns 'delete' for keys in optimisticDeletes/pendingOptimisticDeletes. + const enrichedValue = state.enrichWithVirtualProps(syncedValue, key) + changes.push({ + type: `insert` as const, + key, + value: enrichedValue, + }) + } + } + } + /** * Sends the snapshot to the callback. * Returns a boolean indicating if it succeeded. @@ -397,6 +510,13 @@ export class CollectionSubscription return false } + // When includePendingDeletes is enabled, also include items that are + // optimistically deleted — they won't appear in entries() but their + // data is still in syncedData. + if (this.options.includePendingDeletes) { + this.appendPendingDeleteItems(snapshot) + } + // Only send changes that have not been sent yet const filteredSnapshot = snapshot.filter( (change) => !this.sentKeys.has(change.key), @@ -543,6 +663,12 @@ export class CollectionSubscription keys = index.take(valuesNeeded(), biggestObservedValue!, filterFn) } + // When includePendingDeletes is enabled, also include pending-delete items + // that the index traversal missed (because collection.get() returns undefined for them). + if (this.options.includePendingDeletes) { + this.appendPendingDeleteItems(changes) + } + // Track row count for offset-based pagination (before sending to callback) // Use the current count as the offset for this load const currentOffset = this.limitedSnapshotRowCount @@ -736,6 +862,7 @@ export class CollectionSubscription this.collection._sync.unloadSubset(options) } this.loadedSubsets = [] + this.convertedDeleteValues.clear() this.emitInner(`unsubscribed`, { type: `unsubscribed`, diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index ec1e22966..39f64fd6b 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -21,6 +21,7 @@ export * from './strategies/index.js' export { type VirtualRowProps, type VirtualOrigin, + type PendingOperationType, type WithVirtualProps, type WithoutVirtualProps, hasVirtualProps, diff --git a/packages/db/src/indexes/auto-index.ts b/packages/db/src/indexes/auto-index.ts index 303267cec..350b469a4 100644 --- a/packages/db/src/indexes/auto-index.ts +++ b/packages/db/src/indexes/auto-index.ts @@ -1,6 +1,6 @@ import { DEFAULT_COMPARE_OPTIONS } from '../utils' -import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry' import { hasVirtualPropPath } from '../virtual-props' +import { checkCollectionSizeForIndex, isDevModeEnabled } from './index-registry' import type { CompareOptions } from '../query/builder/types' import type { BasicExpression } from '../query/ir' import type { CollectionImpl } from '../collection/index.js' diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 48661ba96..e32f9260e 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -21,20 +21,26 @@ import type { Select, } from '../ir.js' import type { NamespacedAndKeyedStream, NamespacedRow } from '../../types.js' -import type { VirtualOrigin } from '../../virtual-props.js' +import type { + PendingOperationType, + VirtualOrigin, +} from '../../virtual-props.js' const VIRTUAL_SYNCED_KEY = `__virtual_synced__` const VIRTUAL_HAS_LOCAL_KEY = `__virtual_has_local__` +const VIRTUAL_PENDING_OP_KEY = `__virtual_pending_op__` type RowVirtualMetadata = { synced: boolean hasLocal: boolean + pendingOperation: PendingOperationType } function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { let found = false let allSynced = true let hasLocal = false + let pendingOperation: PendingOperationType = null for (const [alias, value] of Object.entries(row)) { if (alias === `$selected`) continue @@ -51,11 +57,19 @@ function getRowVirtualMetadata(row: NamespacedRow): RowVirtualMetadata { if (asRecord.$origin === `local`) { hasLocal = true } + if ( + pendingOperation === null && + `$pendingOperation` in asRecord && + asRecord.$pendingOperation != null + ) { + pendingOperation = asRecord.$pendingOperation as PendingOperationType + } } return { synced: found ? allSynced : true, hasLocal, + pendingOperation, } } @@ -145,6 +159,19 @@ export function processGroupBy( return false }, }, + [VIRTUAL_PENDING_OP_KEY]: { + preMap: ([, row]: [string, NamespacedRow]) => + getRowVirtualMetadata(row).pendingOperation, + reduce: (values: Array<[PendingOperationType, number]>) => { + // Return the first non-null pending operation, or null if all are null + for (const [op, multiplicity] of values) { + if (op != null && multiplicity > 0) { + return op + } + } + return null + }, + }, } // Handle empty GROUP BY (single-group aggregation) @@ -232,10 +259,14 @@ export function processGroupBy( const groupHasLocal = (aggregatedRow as Record)[ VIRTUAL_HAS_LOCAL_KEY ] + const groupPendingOp = (aggregatedRow as Record)[ + VIRTUAL_PENDING_OP_KEY + ] resultRow.$synced = groupSynced ?? true resultRow.$origin = ( groupHasLocal ? `local` : `remote` ) satisfies VirtualOrigin + resultRow.$pendingOperation = groupPendingOp ?? null resultRow.$key = resultKey resultRow.$collectionId = aggregateCollectionId ?? resultRow.$collectionId @@ -408,10 +439,14 @@ export function processGroupBy( const groupHasLocal = (aggregatedRow as Record)[ VIRTUAL_HAS_LOCAL_KEY ] + const groupPendingOp = (aggregatedRow as Record)[ + VIRTUAL_PENDING_OP_KEY + ] resultRow.$synced = groupSynced ?? true resultRow.$origin = ( groupHasLocal ? `local` : `remote` ) satisfies VirtualOrigin + resultRow.$pendingOperation = groupPendingOp ?? null resultRow.$key = finalKey resultRow.$collectionId = aggregateCollectionId ?? resultRow.$collectionId if (mainSource && correlationKey !== undefined) { diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 70786ca8d..49882ba1d 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -410,6 +410,9 @@ export function compileQuery( // Merge child's alias metadata into parent's Object.assign(aliasToCollectionId, childResult.aliasToCollectionId) Object.assign(aliasRemapping, childResult.aliasRemapping) + for (const [alias, whereClause] of childResult.sourceWhereClauses) { + sourceWhereClauses.set(alias, whereClause) + } includesResults.push({ pipeline: childResult.pipeline, diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 10857d06e..42ca99d0b 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -7,6 +7,7 @@ import { normalizeOrderByPaths, } from './compiler/expressions.js' import { getCollectionBuilder } from './live/collection-registry.js' +import { expressionReferencesPendingOperation } from './live/collection-subscriber.js' import { buildQueryFromConfig, computeOrderedLoadCursor, @@ -805,13 +806,21 @@ class EffectPipelineRunner { ): { includeInitialState?: boolean whereExpression?: BasicExpression + includePendingDeletes?: boolean orderBy?: any limit?: number } { + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + // Ordered aliases explicitly disable initial state — data is loaded // via requestLimitedSnapshot/requestSnapshot after subscription setup. if (orderByInfo) { - return { includeInitialState: false, whereExpression } + return { + includeInitialState: false, + whereExpression, + ...(includePendingDeletes && { includePendingDeletes }), + } } const includeInitialState = !isLazy @@ -823,6 +832,7 @@ class EffectPipelineRunner { return { includeInitialState, whereExpression, + ...(includePendingDeletes && { includePendingDeletes }), ...(hints.orderBy ? { orderBy: hints.orderBy } : {}), ...(hints.limit !== undefined ? { limit: hints.limit } : {}), } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 8eda5cc88..213524f72 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -212,6 +212,9 @@ export class CollectionSubscriber< } : undefined + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + const subscription = this.collection.subscribeChanges(sendChanges, { ...(includeInitialState && { includeInitialState }), whereExpression, @@ -219,6 +222,7 @@ export class CollectionSubscriber< orderBy: hints.orderBy, limit: hints.limit, onLoadSubsetResult, + ...(includePendingDeletes && { includePendingDeletes }), }) return subscription @@ -269,9 +273,13 @@ export class CollectionSubscriber< // Subscribe to changes with onStatusChange - listener is registered before any snapshot // values bigger than what we've sent don't need to be sent because they can't affect the topK + const includePendingDeletes = + expressionReferencesPendingOperation(whereExpression) + const subscription = this.collection.subscribeChanges(sendChangesInRange, { whereExpression, onStatusChange, + ...(includePendingDeletes && { includePendingDeletes }), }) subscriptionHolder.current = subscription @@ -468,3 +476,23 @@ export class CollectionSubscriber< ) } } + +/** + * Checks if a BasicExpression tree references $pendingOperation. + * Used to auto-detect whether a query opts into seeing pending-delete items. + */ +export function expressionReferencesPendingOperation( + expr: BasicExpression | undefined, +): boolean { + if (!expr) return false + + if (expr.type === `ref`) { + return expr.path.some((segment) => segment === `$pendingOperation`) + } + + if (expr.type === `func`) { + return expr.args.some((arg) => expressionReferencesPendingOperation(arg)) + } + + return false +} diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 6087e234e..41fe78be1 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -864,6 +864,12 @@ export interface SubscribeChangesOptions< * @internal */ onLoadSubsetResult?: (result: Promise | true) => void + /** + * When true, optimistic delete events are converted to updates with $pendingOperation: 'delete' + * instead of being passed through as deletes. This keeps deleted items visible in query results. + * @internal + */ + includePendingDeletes?: boolean } export interface SubscribeChangesSnapshotOptions< diff --git a/packages/db/src/virtual-props.ts b/packages/db/src/virtual-props.ts index 3205d31c2..525def288 100644 --- a/packages/db/src/virtual-props.ts +++ b/packages/db/src/virtual-props.ts @@ -21,6 +21,16 @@ */ export type VirtualOrigin = 'local' | 'remote' +/** + * The type of pending optimistic operation for a row. + * + * - `'insert'`: Row was inserted in a pending transaction + * - `'update'`: Row was updated in a pending transaction + * - `'delete'`: Row was deleted in a pending transaction + * - `null`: No pending optimistic operation + */ +export type PendingOperationType = 'insert' | 'update' | 'delete' | null + /** * Virtual properties available on every row in TanStack DB collections. * @@ -94,6 +104,19 @@ export interface VirtualRowProps< * For live query collections, this is the ID of the upstream collection. */ readonly $collectionId: string + + /** + * The type of pending optimistic operation for this row. + * + * - `'insert'`: Row was inserted in a pending transaction + * - `'update'`: Row was updated in a pending transaction + * - `'delete'`: Row was deleted in a pending transaction + * - `null`: No pending optimistic operation + * + * For local-only collections, this is always `null`. + * For live query collections, this is passed through from the source collection. + */ + readonly $pendingOperation: PendingOperationType } /** @@ -171,12 +194,14 @@ export function createVirtualProps( collectionId: string, isSynced: boolean, origin: VirtualOrigin, + pendingOperation: PendingOperationType = null, ): VirtualRowProps { return { $synced: isSynced, $origin: origin, $key: key, $collectionId: collectionId, + $pendingOperation: pendingOperation, } } @@ -194,6 +219,7 @@ export function createVirtualProps( * @param collectionId - The collection's ID * @param computeSynced - Function to compute $synced if missing * @param computeOrigin - Function to compute $origin if missing + * @param computePendingOperation - Function to compute $pendingOperation if missing (defaults to null) * @returns The row with virtual properties (possibly the same object if already present) * * @internal @@ -207,6 +233,7 @@ export function enrichRowWithVirtualProps< collectionId: string, computeSynced: () => boolean, computeOrigin: () => VirtualOrigin, + computePendingOperation?: () => PendingOperationType, ): WithVirtualProps { // Use nullish coalescing to preserve existing virtual properties (pass-through) // This is the "add-if-missing" pattern described in the RFC @@ -218,6 +245,10 @@ export function enrichRowWithVirtualProps< $origin: existingRow.$origin ?? computeOrigin(), $key: existingRow.$key ?? key, $collectionId: existingRow.$collectionId ?? collectionId, + $pendingOperation: + existingRow.$pendingOperation !== undefined + ? existingRow.$pendingOperation + : (computePendingOperation?.() ?? null), } as WithVirtualProps } @@ -246,11 +277,16 @@ export function computeAggregateVirtualProps( // $origin = 'local' if ANY row is local (consistent with "local influence" semantics) const hasLocal = rows.some((row) => row.$origin === 'local') + // $pendingOperation = null if all rows are null, otherwise the first non-null value + const firstPendingOp = + rows.find((row) => row.$pendingOperation != null)?.$pendingOperation ?? null + return { $synced: allSynced, $origin: hasLocal ? 'local' : 'remote', $key: groupKey, $collectionId: collectionId, + $pendingOperation: firstPendingOp, } } @@ -263,6 +299,7 @@ export const VIRTUAL_PROP_NAMES = [ '$origin', '$key', '$collectionId', + '$pendingOperation', ] as const /** diff --git a/packages/db/tests/collection-events.test.ts b/packages/db/tests/collection-events.test.ts index 97c6538ff..3e3221b43 100644 --- a/packages/db/tests/collection-events.test.ts +++ b/packages/db/tests/collection-events.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createCollection } from '../src/collection/index.js' -import type { Collection } from '../src/collection/index.js' import { BTreeIndex } from '../src/indexes/btree-index.js' +import type { Collection } from '../src/collection/index.js' describe(`Collection Events System`, () => { let collection: Collection diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4f851f08a..dca8e94e1 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -2680,6 +2680,7 @@ describe(`Virtual properties`, () => { $origin: `remote`, $key: `row-1`, $collectionId: `collection-1`, + $pendingOperation: null, }), ).toBe(true) }) diff --git a/packages/db/tests/pending-operation.test.ts b/packages/db/tests/pending-operation.test.ts new file mode 100644 index 000000000..fa3954be2 --- /dev/null +++ b/packages/db/tests/pending-operation.test.ts @@ -0,0 +1,1310 @@ +import { describe, expect, it, vi } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { createLiveQueryCollection } from '../src/query/live-query-collection.js' +import { createEffect } from '../src/index.js' +import { localOnlyCollectionOptions } from '../src/local-only.js' +import { createTransaction } from '../src/transactions' +import { and, count, eq, isNull, not, or } from '../src/query/builder/functions' +import { mockSyncCollectionOptions, stripVirtualProps } from './utils' +import type { ChangeMessage } from '../src/types' + +const waitForChanges = () => new Promise((resolve) => setTimeout(resolve, 10)) + +type Item = { id: string; title: string } + +describe(`$pendingOperation virtual property`, () => { + // ── B1: Basic virtual prop behavior ───────────────────────────────── + + describe(`basic behavior`, () => { + it(`should be null for synced rows`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-synced`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Synced item` }], + }), + ) + + await collection.preload() + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(null) + }) + + it(`should be 'insert' for optimistic insert in pending transaction`, async () => { + const collection = createCollection({ + id: `pending-op-insert`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + // Never resolves during test — keeps transaction pending + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `new-1`, title: `New item` }) + await waitForChanges() + + const item = collection.get(`new-1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`insert`) + }) + + it(`should be 'update' for optimistic update in pending transaction`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-update`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`update`) + }) + + it(`should be 'delete' for optimistic delete in pending transaction`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-delete`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `To delete` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.delete(`1`) + }) + + // Note: get() returns undefined for deleted items (public API unchanged) + // but the internal state tracks pendingOperation as 'delete' + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`should return to null after transaction commit + sync confirmation`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `pending-op-confirm`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + onInsert: async () => {}, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(`insert`) + + // Simulate sync confirmation + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `New` } }) + syncFns!.commit() + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + + it(`should return to null after transaction rollback`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-rollback`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const mutationFn = vi.fn().mockRejectedValue(new Error(`Rollback test`)) + + const tx = createTransaction({ + autoCommit: false, + mutationFn, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + expect(collection.get(`1`)!.$pendingOperation).toBe(`update`) + + // Commit should fail and trigger rollback + try { + await tx.commit() + } catch { + // Expected + } + await waitForChanges() + + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + + it(`should always be null for local-only collections`, () => { + const collection = createCollection( + localOnlyCollectionOptions({ + id: `pending-op-local-only`, + getKey: (item: any) => item.id, + }), + ) + + collection.insert({ id: `1`, title: `Local item` }) + + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(null) + }) + + it(`should be null for non-optimistic mutations`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-non-optimistic`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, { optimistic: false }, (item) => { + item.title = `Non-optimistic update` + }) + }) + + // Non-optimistic mutations should not show pending operation + expect(collection.get(`1`)!.$pendingOperation).toBe(null) + }) + }) + + // ── B2: Transition and merging tests ──────────────────────────────── + + describe(`mutation merging`, () => { + it(`insert + update in same transaction shows $pendingOperation: 'insert'`, async () => { + const collection = createCollection({ + id: `pending-op-merge-insert-update`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.insert({ id: `1`, title: `New` }) + }) + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated new` + }) + }) + + // insert + update = insert (merged) + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`insert`) + expect(stripVirtualProps(item)).toEqual({ + id: `1`, + title: `Updated new`, + }) + }) + + it(`insert + delete in same transaction cancels out — item gone entirely`, async () => { + const collection = createCollection({ + id: `pending-op-merge-insert-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.insert({ id: `1`, title: `Temp` }) + }) + tx.mutate(() => { + collection.delete(`1`) + }) + + // insert + delete = cancelled + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(null) + }) + + it(`update + delete in same transaction shows $pendingOperation: 'delete'`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-merge-update-delete`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + tx.mutate(() => { + collection.delete(`1`) + }) + + // update + delete = delete + expect(collection.get(`1`)).toBeUndefined() + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`pending delete superseded by update in new transaction on same key`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-supersede`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + const tx1 = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx1.mutate(() => { + collection.delete(`1`) + }) + + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + + // Second transaction inserts the item back (or updates it) + const tx2 = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx2.mutate(() => { + collection.insert({ id: `1`, title: `Reinserted` }) + }) + + // tx2's insert supersedes tx1's delete. Since the item exists in syncedData, + // the net effect is an upsert of an existing server item — so it's 'update', not 'insert'. + const item = collection.get(`1`) + expect(item).toBeDefined() + expect(item!.$pendingOperation).toBe(`update`) + }) + + it(`subscribeChanges emits events when $pendingOperation transitions`, async () => { + const changes: Array> = [] + + const collection = createCollection({ + id: `pending-op-transitions`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise(() => {}) + }, + }) + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + // Should have an insert event with $pendingOperation: 'insert' + const insertChange = changes.find( + (c) => c.key === `1` && c.type === `insert`, + ) + expect(insertChange).toBeDefined() + expect(insertChange!.value.$pendingOperation).toBe(`insert`) + + subscription.unsubscribe() + }) + + it(`subscribeChanges still emits type: 'delete' for optimistic deletes (backward compat)`, async () => { + const changes: Array> = [] + + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-delete-event`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Item` }], + }), + ) + + await collection.preload() + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // subscribeChanges should still emit a delete event (collection layer unchanged) + const deleteChange = changes.find( + (c) => c.key === `1` && c.type === `delete`, + ) + expect(deleteChange).toBeDefined() + + subscription.unsubscribe() + }) + }) + + // ── B3: Edge case tests ───────────────────────────────────────────── + + describe(`edge cases`, () => { + it(`$pendingOperation change invalidates virtual props cache when $synced and $origin are unchanged`, async () => { + const collection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-cache`, + getKey: (item: Item) => item.id, + initialData: [{ id: `1`, title: `Original` }], + }), + ) + + await collection.preload() + + // First, do an update — item is $synced: false, $origin: 'local', $pendingOperation: 'update' + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + collection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + + const updatedItem = collection.get(`1`) + expect(updatedItem!.$synced).toBe(false) + expect(updatedItem!.$origin).toBe(`local`) + expect(updatedItem!.$pendingOperation).toBe(`update`) + + // Now delete in the same transaction — $synced stays false, $origin stays local + // but $pendingOperation should change to 'delete' + tx.mutate(() => { + collection.delete(`1`) + }) + + // The item is deleted so get() returns undefined, but we can check internal state + expect(collection._state.getPendingOperation(`1`)).toBe(`delete`) + }) + + it(`item in completed-but-awaiting-sync window still shows non-null $pendingOperation`, async () => { + let resolveSync: (() => void) | undefined + + const collection = createCollection({ + id: `pending-op-awaiting-sync`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + onInsert: async () => { + await new Promise((resolve) => { + resolveSync = resolve + }) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + // Item should have $pendingOperation: 'insert' before commit resolves + expect(collection.get(`1`)!.$pendingOperation).toBe(`insert`) + + // The onInsert creates a transaction that auto-commits. + // The mutationFn (onInsert) is still running — transaction is in 'persisting' state. + // $pendingOperation should still be non-null during this window. + expect(collection._state.getPendingOperation(`1`)).not.toBe(null) + + // Resolve the sync to clean up + resolveSync?.() + await waitForChanges() + }) + + it(`$pendingOperation transition triggers virtualChanged event during sync commit`, async () => { + const changes: Array> = [] + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `pending-op-virtual-changed`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + onInsert: async () => {}, + }) + + const subscription = collection.subscribeChanges( + (events) => changes.push(...events), + { includeInitialState: false }, + ) + await waitForChanges() + + collection.insert({ id: `1`, title: `New` }) + await waitForChanges() + + const insertEvent = changes.find((c) => c.key === `1`) + expect(insertEvent).toBeDefined() + expect(insertEvent!.value.$pendingOperation).toBe(`insert`) + + changes.length = 0 + + // Sync confirms the insert + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `New` } }) + syncFns!.commit() + await waitForChanges() + + // Should emit an update event for the virtual prop transition + // $pendingOperation goes from 'insert' to null + const updateEvent = changes.find( + (c) => c.key === `1` && c.type === `update`, + ) + expect(updateEvent).toBeDefined() + if (updateEvent) { + expect(updateEvent.value.$pendingOperation).toBe(null) + } + + subscription.unsubscribe() + }) + }) + + // ── B4: Delete visibility tests (should FAIL until Phase C) ───────── + + describe(`delete visibility`, () => { + it(`deleted items are hidden from default query results (backward compat)`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-query-default`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + getKey: (item) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Default query should NOT show deleted items + const results = Array.from(liveQuery.values()) + expect(results.map((r) => stripVirtualProps(r))).toEqual([ + { id: `1`, title: `Keep` }, + ]) + }) + + it(`deleted items are visible in query when where clause references $pendingOperation`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-query-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Query that references $pendingOperation — should show pending deletes + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // With $pendingOperation in where, deleted items should be visible + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) + }) + + it(`pending-delete items appear in initial query snapshot when opted in`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-snapshot`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + // Delete before creating the query + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Create a query AFTER the delete — initial snapshot should include the pending delete + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + }) + + it(`sync-confirmed delete removes item from opted-in query results`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const sourceCollection = createCollection({ + id: `pending-op-sync-delete`, + getKey: (item: any) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + begin() + write({ + type: `insert`, + value: { id: `1`, title: `Item 1` }, + }) + write({ + type: `insert`, + value: { id: `2`, title: `Item 2` }, + }) + commit() + markReady() + }, + }, + onDelete: async () => {}, + }) + + // Query with $pendingOperation reference + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Optimistically delete item 2 + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => {}, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + // Item 2 should be visible with $pendingOperation: 'delete' + let results = Array.from(liveQuery.values()) + let item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeDefined() + expect((item2 as any).$pendingOperation).toBe(`delete`) + + // Commit the transaction + await tx.commit() + await waitForChanges() + + // Sync confirms the delete + syncFns!.begin() + syncFns!.write({ type: `delete`, key: `2` }) + syncFns!.commit() + await waitForChanges() + + // Item 2 should now be GONE (sync confirmed the delete) + results = Array.from(liveQuery.values()) + item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect(item2).toBeUndefined() + }) + }) + + // ── B5: Live query integration ────────────────────────────────────── + + describe(`live query integration`, () => { + it(`live query default behavior unchanged — deletes vanish from results`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-live-default`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + getKey: (item) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const ids = results.map( + (r) => (stripVirtualProps(r) as unknown as Item).id, + ) + expect(ids).toEqual([`1`]) + }) + + it(`live query with $pendingOperation where clause shows deleted items inline`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-live-optin`, + getKey: (item: Item) => item.id, + initialData: [ + { id: `1`, title: `Keep` }, + { id: `2`, title: `Delete me` }, + { id: `3`, title: `Also keep` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .where(({ item }) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.delete(`2`) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const ids = results.map( + (r) => (stripVirtualProps(r) as unknown as Item).id, + ) + + // All 3 items should be present — item 2 with $pendingOperation: 'delete' + expect(ids).toContain(`1`) + expect(ids).toContain(`2`) + expect(ids).toContain(`3`) + + const item2 = results.find( + (r) => (stripVirtualProps(r) as unknown as Item).id === `2`, + ) + expect((item2 as any).$pendingOperation).toBe(`delete`) + }) + }) + + // ── GROUP BY ──────────────────────────────────────────────────────── + + describe(`GROUP BY`, () => { + type Task = { id: string; category: string; title: string } + + it(`$pendingOperation is null when all rows in group are null`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-groupby-null`, + getKey: (item: Task) => item.id, + initialData: [ + { id: `1`, category: `work`, title: `Task 1` }, + { id: `2`, category: `work`, title: `Task 2` }, + { id: `3`, category: `personal`, title: `Task 3` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ task: sourceCollection }) + .groupBy(({ task }: any) => task.category) + .select(({ task }: any) => ({ + category: task.category, + count: count(task.id), + })), + getKey: (item: any) => item.category, + }) + + await liveQuery.preload() + + const results = Array.from(liveQuery.values()) + for (const row of results) { + expect(row.$pendingOperation).toBe(null) + } + }) + + it(`$pendingOperation is non-null when any row in group has pending operation`, async () => { + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `pending-op-groupby-nonnull`, + getKey: (item: Task) => item.id, + initialData: [ + { id: `1`, category: `work`, title: `Task 1` }, + { id: `2`, category: `work`, title: `Task 2` }, + { id: `3`, category: `personal`, title: `Task 3` }, + ], + }), + ) + + await sourceCollection.preload() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ task: sourceCollection }) + .groupBy(({ task }: any) => task.category) + .select(({ task }: any) => ({ + category: task.category, + count: count(task.id), + })), + getKey: (item: any) => item.category, + }) + + await liveQuery.preload() + + // Update one item in the 'work' group + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + + tx.mutate(() => { + sourceCollection.update(`1`, (item) => { + item.title = `Updated` + }) + }) + await waitForChanges() + + const results = Array.from(liveQuery.values()) + const workGroup = results.find((r) => r.category === `work`) + const personalGroup = results.find((r) => r.category === `personal`) + + // Work group has one updated item — $pendingOperation should be non-null + expect(workGroup).toBeDefined() + expect(workGroup.$pendingOperation).not.toBe(null) + + // Personal group has no pending changes — $pendingOperation should be null + expect(personalGroup).toBeDefined() + expect(personalGroup.$pendingOperation).toBe(null) + }) + }) + + it(`$synced should be false when $pendingOperation is non-null (pendingOptimistic state)`, async () => { + let resolveDelete: (() => void) | undefined + + const collection = createCollection({ + id: `pending-op-synced-consistency`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ type: `insert`, value: { id: `1`, title: `Item 1` } }) + commit() + markReady() + }, + }, + onDelete: async () => { + await new Promise((resolve) => { + resolveDelete = resolve + }) + }, + }) + + collection.subscribeChanges(() => {}, { includeInitialState: true }) + await waitForChanges() + + // Delete item — auto-commits via onDelete, holds in persisting state + collection.delete(`1`) + await waitForChanges() + + // Resolve to move to 'completed' state (pending sync confirmation) + resolveDelete?.() + await waitForChanges() + + // $synced and $pendingOperation should be consistent: + // if there's a pending operation, $synced should be false + const pendingOp = collection._state.getPendingOperation(`1`) + const synced = collection._state.isRowSynced(`1`) + + expect(pendingOp).toBe(`delete`) + expect(synced).toBe(false) + }) + + it(`rollback of optimistic delete restores item in query with $pendingOperation`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `rollback-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + }) + + // Live query with $pendingOperation tautology (enables includePendingDeletes) + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .where(({ item }: any) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Sync an item into the collection + syncFns!.begin() + syncFns!.write({ + type: `insert`, + value: { id: `1`, title: `Existing` }, + }) + syncFns!.commit() + await waitForChanges() + + // Item should be visible with $pendingOperation: null + let results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect(results[0].$pendingOperation).toBe(null) + + // Delete via a pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // Item should still be visible with $pendingOperation: 'delete' + results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect(results[0].$pendingOperation).toBe(`delete`) + + // Rollback the delete + tx.rollback() + await waitForChanges() + + // Item should be restored with $pendingOperation: null + results = Array.from(liveQuery.values()) + expect(results).toHaveLength(1) + expect(results[0].$pendingOperation).toBe(null) + }) + + // Lazy source (join) — test that pending-delete items in joined collections + // are visible via live events (not snapshot, since lazy sources skip initial state) + it(`pending-delete items in joined collection are visible via live events`, async () => { + type Parent = { id: string } + type Child = { id: string; parentId: string; name: string } + + let parentSyncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + let childSyncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const parents = createCollection({ + id: `lazy-parent`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + parentSyncFns = { begin, write, commit } + markReady() + }, + }, + }) + + const children = createCollection({ + id: `lazy-child`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + childSyncFns = { begin, write, commit } + markReady() + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q.from({ p: parents }).select(({ p }: any) => ({ + ...p, + children: q + .from({ c: children }) + .where(({ c }: any) => + and( + eq(c.parentId, p.id), + or( + isNull(c.$pendingOperation), + not(isNull(c.$pendingOperation)), + ), + ), + ), + })), + getKey: (item: any) => item.id, + }) + + await liveQuery.preload() + await waitForChanges() + + // Sync parent and child + parentSyncFns!.begin() + parentSyncFns!.write({ type: `insert`, value: { id: `p1` } }) + parentSyncFns!.commit() + + childSyncFns!.begin() + childSyncFns!.write({ + type: `insert`, + value: { id: `c1`, parentId: `p1`, name: `Child` }, + }) + childSyncFns!.commit() + await waitForChanges() + + // Delete the child via pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + children.delete(`c1`) + }) + await waitForChanges() + + // Child should be visible in the join with $pendingOperation: 'delete' + const results = Array.from(liveQuery.values()) + const parent = results.find((r: any) => r.id === `p1`) + expect(parent).toBeDefined() + const childResults = Array.from(parent.children.values()) + expect(childResults).toHaveLength(1) + expect((childResults[0] as any).$pendingOperation).toBe(`delete`) + }) + + // createEffect support + it(`createEffect onEnter fires for pending-delete items when $pendingOperation is in where`, async () => { + let syncFns: { + begin: () => void + write: (msg: any) => void + commit: () => void + } + + const collection = createCollection({ + id: `effect-pending-delete`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + syncFns = { begin, write, commit } + markReady() + }, + }, + }) + + // Activate sync by subscribing + collection.subscribeChanges(() => {}, { includeInitialState: false }) + await waitForChanges() + + // Sync an item + syncFns!.begin() + syncFns!.write({ type: `insert`, value: { id: `1`, title: `Hello` } }) + syncFns!.commit() + await waitForChanges() + + // Delete via pending transaction + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise(() => {}) + }, + }) + tx.mutate(() => { + collection.delete(`1`) + }) + await waitForChanges() + + // createEffect with $pendingOperation in where — should find the deleted item + const entered: Array = [] + const effect = createEffect({ + query: (q: any) => + q + .from({ item: collection }) + .where(({ item }: any) => + or( + isNull(item.$pendingOperation), + not(isNull(item.$pendingOperation)), + ), + ), + onEnter: (event: any) => { + entered.push(event.value) + }, + }) + + await waitForChanges() + + // The deleted item should have triggered onEnter with $pendingOperation: 'delete' + const deletedItem = entered.find((v: any) => v.id === `1`) + expect(deletedItem).toBeDefined() + expect(deletedItem.$pendingOperation).toBe(`delete`) + + effect.dispose() + }) +}) diff --git a/packages/db/tests/utils.ts b/packages/db/tests/utils.ts index 2b5c5d0c8..d750ca3b9 100644 --- a/packages/db/tests/utils.ts +++ b/packages/db/tests/utils.ts @@ -23,6 +23,7 @@ export const stripVirtualProps = | undefined>( $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as T @@ -30,12 +31,16 @@ export const stripVirtualProps = | undefined>( export const omitVirtualProps = >( value: T, -): Omit => { +): Omit< + T, + '$synced' | '$origin' | '$key' | '$collectionId' | '$pendingOperation' +> => { const { $synced: _synced, $origin: _origin, $key: _key, $collectionId: _collectionId, + $pendingOperation: _pendingOperation, ...rest } = value as Record return rest as any