diff --git a/.changeset/add-offline-persistence.md b/.changeset/add-offline-persistence.md new file mode 100644 index 000000000..5502acaaf --- /dev/null +++ b/.changeset/add-offline-persistence.md @@ -0,0 +1,29 @@ +--- +"@tanstack/electric-db-collection": minor +--- + +feat: Add offline persistence adapter for Electric collections + +This adds a localStorage-based persistence layer that enables offline-first data access and faster initial loads. + +**What changed:** +- New `persistence` configuration option in `electricCollectionOptions` +- Collection data is automatically saved to localStorage as it syncs +- Persisted data is restored on collection initialization +- Shape handle and offset are persisted for resumable sync +- In `on-demand` mode, collections are marked ready immediately after loading from persistence + +**Why:** +Enables offline-first applications where users can see their data immediately on app launch, even before network sync completes. This improves perceived performance and allows the app to function without network connectivity. + +**How to use:** +```typescript +electricCollectionOptions({ + id: 'my-collection', + syncMode: 'on-demand', + persistence: { + storageKey: 'my-collection-storage', + }, + // ... other options +}) +``` diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 1ec043fd2..7604af755 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1,12 +1,12 @@ +import DebugModule from "debug" +import { Store } from "@tanstack/store" +import { DeduplicatedLoadSubset } from "@tanstack/db" import { ShapeStream, isChangeMessage, isControlMessage, isVisibleInSnapshot, } from "@electric-sql/client" -import { Store } from "@tanstack/store" -import DebugModule from "debug" -import { DeduplicatedLoadSubset } from "@tanstack/db" import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -14,6 +14,19 @@ import { TimeoutWaitingForTxIdError, } from "./errors" import { compileSQL } from "./sql-compiler" +import { validateJsonSerializable } from "./persistence/persistenceAdapter" +import { createPersistence } from "./persistence/createPersistence" +import type { ElectricPersistenceConfig } from "./persistence/createPersistence" +import type { + ControlMessage, + GetExtensions, + Message, + Offset, + PostgresSnapshot, + Row, + ShapeStreamOptions, +} from "@electric-sql/client" + import type { BaseCollectionConfig, CollectionConfig, @@ -26,14 +39,6 @@ import type { UtilsRecord, } from "@tanstack/db" import type { StandardSchemaV1 } from "@standard-schema/spec" -import type { - ControlMessage, - GetExtensions, - Message, - PostgresSnapshot, - Row, - ShapeStreamOptions, -} from "@electric-sql/client" // Re-export for user convenience in custom match functions export { isChangeMessage, isControlMessage } from "@electric-sql/client" @@ -98,8 +103,7 @@ type InferSchemaOutput = T extends StandardSchemaV1 : Record : Record -/** - * The mode of sync to use for the collection. +/** The mode of sync to use for the collection. * @default `eager` * @description * - `eager`: @@ -125,19 +129,25 @@ export interface ElectricCollectionConfig< T extends Row = Row, TSchema extends StandardSchemaV1 = never, > extends Omit< - BaseCollectionConfig< - T, - string | number, - TSchema, - ElectricCollectionUtils, - any - >, - `onInsert` | `onUpdate` | `onDelete` | `syncMode` -> { + BaseCollectionConfig< + T, + string | number, + TSchema, + ElectricCollectionUtils, + any + >, + `onInsert` | `onUpdate` | `onDelete` | `syncMode` + > { /** * Configuration options for the ElectricSQL ShapeStream */ shapeOptions: ShapeStreamOptions> + + /** + * Optional persistence configuration for localStorage storage + * When provided, data will be persisted to localStorage or the specified storage and loaded on startup + */ + persistence?: ElectricPersistenceConfig syncMode?: ElectricSyncMode /** @@ -395,6 +405,15 @@ function createLoadSubsetDedupe>({ */ export type AwaitTxIdFn = (txId: Txid, timeout?: number) => Promise +/** + * Type for the clearPersistence utility function + */ +export type ClearPersistenceFn = () => Promise + +/** + * Type for the getPersistenceSize utility function + */ +export type GetPersistenceSizeFn = () => Promise /** * Type for the awaitMatch utility function */ @@ -406,13 +425,21 @@ export type AwaitMatchFn> = ( /** * Electric collection utilities type */ -export interface ElectricCollectionUtils< - T extends Row = Row, -> extends UtilsRecord { +export interface ElectricCollectionUtils = Row> + extends UtilsRecord { awaitTxId: AwaitTxIdFn awaitMatch: AwaitMatchFn } +/** + * Electric collection utilities type with persistence + */ +export interface ElectricCollectionUtilsWithPersistence + extends ElectricCollectionUtils { + clearPersistence: ClearPersistenceFn + getPersistenceSize: GetPersistenceSizeFn +} + /** * Creates Electric collection options for use with a standard Collection * @@ -456,6 +483,9 @@ export function electricCollectionOptions>( schema?: any } { const seenTxids = new Store>(new Set([])) + const persistence = + config.persistence && createPersistence(config.persistence) + const seenSnapshots = new Store>([]) const internalSyncMode = config.syncMode ?? `eager` const finalSyncMode = @@ -510,6 +540,7 @@ export function electricCollectionOptions>( const sync = createElectricSync(config.shapeOptions, { seenTxids, seenSnapshots, + persistence: config.persistence, syncMode: internalSyncMode, pendingMatches, currentBatchMessages, @@ -705,12 +736,25 @@ export function electricCollectionOptions>( ElectricCollectionUtils > ) => { + // Validate that all values in the transaction can be JSON serialized (if persistence enabled) + if (config.persistence) { + params.transaction.mutations.forEach((m) => + validateJsonSerializable(m.modified, `insert`) + ) + } const handlerResult = await config.onInsert!(params) await processMatchingStrategy(handlerResult) + + if (persistence) { + // called outside stream -> snapshot rows, keep prior cursor + persistence.saveCollectionSnapshot(params.collection) + } + return handlerResult } : undefined + // Create wrapper handlers for direct persistence operations that handle txid awaiting const wrappedOnUpdate = config.onUpdate ? async ( params: UpdateMutationFnParams< @@ -719,8 +763,19 @@ export function electricCollectionOptions>( ElectricCollectionUtils > ) => { + // Validate that all values in the transaction can be JSON serialized (if persistence enabled) + if (config.persistence) { + params.transaction.mutations.forEach((m) => + validateJsonSerializable(m.modified, `update`) + ) + } const handlerResult = await config.onUpdate!(params) await processMatchingStrategy(handlerResult) + + if (persistence) { + persistence.saveCollectionSnapshot(params.collection) + } + return handlerResult } : undefined @@ -735,19 +790,44 @@ export function electricCollectionOptions>( ) => { const handlerResult = await config.onDelete!(params) await processMatchingStrategy(handlerResult) + + // Persist to storage if configured + if (persistence) { + // Save collection state to storage adapter + persistence.saveCollectionSnapshot(params.collection) + } + return handlerResult } : undefined + const clearPersistence: ClearPersistenceFn = async () => { + if (!persistence) { + throw new Error(`Persistence is not configured for this collection`) + } + persistence.clear() + } + + const getPersistenceSize: GetPersistenceSizeFn = async () => + persistence ? persistence.size() : 0 + // Extract standard Collection config properties const { shapeOptions: _shapeOptions, + persistence: _persistence, onInsert: _onInsert, onUpdate: _onUpdate, onDelete: _onDelete, ...restConfig } = config + // Build utils object based on whether persistence is configured + const utils: + | ElectricCollectionUtils + | ElectricCollectionUtilsWithPersistence = persistence + ? { awaitTxId, awaitMatch, clearPersistence, getPersistenceSize } + : { awaitTxId, awaitMatch } + return { ...restConfig, syncMode: finalSyncMode, @@ -755,10 +835,7 @@ export function electricCollectionOptions>( onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, onDelete: wrappedOnDelete, - utils: { - awaitTxId, - awaitMatch, - }, + utils: utils as ElectricCollectionUtils, } } @@ -787,6 +864,7 @@ function createElectricSync>( removePendingMatches: (matchIds: Array) => void resolveMatchedPendingMatches: () => void collectionId?: string + persistence?: ElectricPersistenceConfig testHooks?: ElectricTestHooks } ): SyncConfig { @@ -799,8 +877,11 @@ function createElectricSync>( removePendingMatches, resolveMatchedPendingMatches, collectionId, + persistence: persistenceConfig, testHooks, } = options + const persistence = + persistenceConfig && createPersistence(persistenceConfig) const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer // Store for the relation schema information @@ -827,6 +908,31 @@ function createElectricSync>( sync: (params: Parameters[`sync`]>[0]) => { const { begin, write, commit, markReady, truncate, collection } = params + // Load from persistance adapter if persistence is configured + if (persistence) { + try { + const persistedData = persistence.read() + const hasPersistedData = + !!persistedData?.value && + Object.keys(persistedData.value).length > 0 + + persistence.loadSnapshotInto( + begin, + (op) => write({ ...op, metadata: {} }), + commit + ) + + // In on-demand mode, mark the collection as ready immediately after loading + // from persistence since on-demand works with partial/incremental data + // and doesn't require waiting for server sync to be usable + if (syncMode === `on-demand` && hasPersistedData) { + markReady() + } + } catch (e) { + console.warn(`[ElectricPersistence] load error`, e) + } + } + // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise | null = null const wrappedMarkReady = (isBuffering: boolean) => { @@ -876,14 +982,27 @@ function createElectricSync>( }) }) + // Read from persistence if available + const prev = persistence?.read() + + const computedOffset: Offset | undefined = (() => { + const offset = shapeOptions.offset + if (offset != null) return offset + const lastOffset = prev?.lastOffset as Offset | undefined + if (lastOffset != null) return lastOffset + if (syncMode === `on-demand`) return `now` + return undefined + })() + + const computedHandle: string | undefined = + shapeOptions.handle ?? prev?.shapeHandle + const stream = new ShapeStream({ ...shapeOptions, // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` log: syncMode === `on-demand` ? `changes_only` : undefined, - // In on-demand mode, we only need the changes from the point of time the collection was created - // so we default to `now` when there is no saved offset. - offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + offset: computedOffset, + handle: computedHandle, signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -934,6 +1053,7 @@ function createElectricSync>( unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false let hasSnapshotEnd = false + let hasSyncedChanges = false for (const message of messages) { // Add message to current batch buffer (for race condition handling) @@ -1009,6 +1129,7 @@ function createElectricSync>( newSnapshots.push(parseSnapshotMessage(message)) } hasSnapshotEnd = true + if (persistenceConfig) hasSyncedChanges = true } else if (isUpToDateMessage(message)) { hasUpToDate = true } else if (isMustRefetchMessage(message)) { @@ -1024,6 +1145,9 @@ function createElectricSync>( truncate() + // Clear persistence storage on truncate + if (persistence) persistence.clear() + // Reset the loadSubset deduplication state since we're starting fresh // This ensures that previously loaded predicates don't prevent refetching after truncate loadSubsetDedupe?.reset() @@ -1096,6 +1220,10 @@ function createElectricSync>( } } + if (persistence && hasSyncedChanges) { + persistence.saveCollectionSnapshot(collection, stream) + } + // Clear the current batch buffer since we're now up-to-date currentBatchMessages.setState(() => []) @@ -1110,7 +1238,7 @@ function createElectricSync>( } // Always commit txids when we receive up-to-date, regardless of transaction state - seenTxids.setState((currentTxids) => { + seenTxids.setState((currentTxids: Set) => { const clonedSeen = new Set(currentTxids) if (newTxids.size > 0) { debug( diff --git a/packages/electric-db-collection/src/index.ts b/packages/electric-db-collection/src/index.ts index ac436bcf7..194292c00 100644 --- a/packages/electric-db-collection/src/index.ts +++ b/packages/electric-db-collection/src/index.ts @@ -2,8 +2,15 @@ export { electricCollectionOptions, type ElectricCollectionConfig, type ElectricCollectionUtils, + type ElectricCollectionUtilsWithPersistence, type Txid, type AwaitTxIdFn, + type ClearPersistenceFn, + type GetPersistenceSizeFn, } from "./electric" +export type { ElectricPersistenceConfig } from "./persistence/createPersistence" + export * from "./errors" + +export type { StorageApi } from "./persistence/persistenceAdapter" diff --git a/packages/electric-db-collection/src/persistence/createPersistence.ts b/packages/electric-db-collection/src/persistence/createPersistence.ts new file mode 100644 index 000000000..a5db1bd1c --- /dev/null +++ b/packages/electric-db-collection/src/persistence/createPersistence.ts @@ -0,0 +1,114 @@ +import type { StorageApi } from "./persistenceAdapter" + +/** + * Configuration interface for Electric collection persistence + * @template T - The type of items in the collection + */ +export interface ElectricPersistenceConfig { + /** + * The key to use for storing the collection data in localStorage/sessionStorage + */ + storageKey: string + + /** + * Storage API to use (defaults to window.localStorage) + * Can be any object that implements the Storage interface (e.g., sessionStorage) + */ + storage?: StorageApi +} + +// Envelope we persist to storage +type PersistedEnvelope = { + v: 1 + value: Record + lastOffset?: number + shapeHandle?: string +} + +export interface ElectricPersistenceConfig { + storageKey: string + storage?: StorageApi +} + +export function createPersistence(cfg: ElectricPersistenceConfig) { + const key = cfg.storageKey + const storage = + cfg.storage || (typeof window !== `undefined` ? window.localStorage : null) + + const safeParse = (raw: string | null): PersistedEnvelope | null => { + if (!raw) return null + try { + const parsed = JSON.parse(raw) + if (parsed && typeof parsed === `object` && parsed.v === 1) { + return parsed as PersistedEnvelope + } + return null + } catch { + return null + } + } + + const read = (): PersistedEnvelope | null => { + if (!storage) return null + return safeParse(storage.getItem(key)) + } + + const write = (next: PersistedEnvelope) => { + if (!storage) return + storage.setItem(key, JSON.stringify(next)) + } + + const clear = () => { + if (!storage) return + storage.removeItem(key) + } + + const size = (): number => { + if (!storage) return 0 + const data = storage.getItem(key) + return data ? new Blob([data]).size : 0 + } + + const saveCollectionSnapshot = (collection: any, stream?: any) => { + if (!storage) return + // 1) snapshot collection state + const value: Record = {} + for (const [k, v] of collection.state) value[String(k)] = v as T + + // 2) load previous envelope (to preserve cursor when no stream present) + const prev = read() ?? { v: 1, value: {} as Record } + + // 3) only advance cursor if we’re called from the stream + const lastOffset = + (stream?.lastOffset as number | undefined) ?? prev.lastOffset + const shapeHandle = stream?.shapeHandle ?? prev.shapeHandle + + const next: PersistedEnvelope = { v: 1, value, lastOffset, shapeHandle } + write(next) + } + + const loadSnapshotInto = ( + begin: () => void, + writeOp: (op: { type: `insert`; value: T }) => void, + commit: () => void + ) => { + const env = read() + if (!env?.value) return + const entries = Object.entries(env.value) + if (!entries.length) return + begin() + for (const [, row] of entries) { + writeOp({ type: `insert`, value: row }) + } + commit() + } + + return { + read, + write, + clear, + size, + saveCollectionSnapshot, + loadSnapshotInto, + } +} diff --git a/packages/electric-db-collection/src/persistence/persistenceAdapter.ts b/packages/electric-db-collection/src/persistence/persistenceAdapter.ts new file mode 100644 index 000000000..b93801a41 --- /dev/null +++ b/packages/electric-db-collection/src/persistence/persistenceAdapter.ts @@ -0,0 +1,127 @@ +import type { Row } from "@electric-sql/client" + +/** + * Storage API interface - subset of DOM Storage that we need + * Matches the pattern used in @tanstack/db local-storage implementation + */ +export type StorageApi = Pick + +/** + * Internal storage format that includes version tracking + * Matches the pattern used in localStorage implementation + */ +export interface StoredItem { + versionKey: string + data: T +} + +/** + * Generate a UUID for version tracking + * @returns A unique identifier string for tracking data versions + */ +export function generateVersionKey(): string { + return crypto.randomUUID() +} + +/** + * Load data from storage and return as a Map + * @param storageKey - The key used to store data in the storage API + * @param storage - The storage API to load from (localStorage, sessionStorage, etc.) + * @returns Map of stored items with version tracking, or empty Map if loading fails + */ +export function loadFromStorage>( + storageKey: string, + storage: StorageApi +): Map> { + try { + const rawData = storage.getItem(storageKey) + if (!rawData) { + return new Map() + } + + const parsed = JSON.parse(rawData) + const dataMap = new Map>() + + // Handle object format where keys map to StoredItem values + if ( + typeof parsed === `object` && + parsed !== null && + !Array.isArray(parsed) + ) { + Object.entries(parsed).forEach(([key, value]) => { + // Runtime check to ensure the value has the expected StoredItem structure + if ( + value && + typeof value === `object` && + `versionKey` in value && + `data` in value + ) { + const storedItem = value as StoredItem + dataMap.set(key, storedItem) + } else { + console.warn( + `[ElectricPersistence] Invalid data format for key "${key}" in storage key "${storageKey}"` + ) + } + }) + } else { + console.warn( + `[ElectricPersistence] Invalid storage object format for key "${storageKey}"` + ) + } + + return dataMap + } catch (error) { + console.warn( + `[ElectricPersistence] Error loading data from storage key "${storageKey}":`, + error + ) + return new Map() + } +} + +/** + * Save data to storage + * @param storageKey - The key to use for storing data + * @param storage - The storage API to save to + * @param dataMap - Map of items with version tracking to save to storage + */ +export function saveToStorage>( + storageKey: string, + storage: StorageApi, + dataMap: Map> +): void { + try { + // Convert Map to object format for storage + const objectData: Record> = {} + dataMap.forEach((storedItem, key) => { + objectData[String(key)] = storedItem + }) + const serialized = JSON.stringify(objectData) + storage.setItem(storageKey, serialized) + } catch (error) { + console.error( + `[ElectricPersistence] Error saving data to storage key "${storageKey}":`, + error + ) + throw error + } +} + +/** + * Validates that a value can be JSON serialized + * @param value - The value to validate for JSON serialization + * @param operation - The operation type being performed (for error messages) + * @throws Error if the value cannot be JSON serialized + */ +export function validateJsonSerializable(value: any, operation: string): void { + try { + JSON.stringify(value) + } catch (error) { + throw new Error( + `Cannot serialize value for ${operation}: ${ + error instanceof Error ? error.message : String(error) + }` + ) + } +}