diff --git a/.changeset/witty-animals-agree.md b/.changeset/witty-animals-agree.md new file mode 100644 index 000000000..be350c75e --- /dev/null +++ b/.changeset/witty-animals-agree.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Support tagged rows and move out events in Electric collection. diff --git a/packages/db-collection-e2e/docker/docker-compose.yml b/packages/db-collection-e2e/docker/docker-compose.yml index 4f8ee4549..ef8523498 100644 --- a/packages/db-collection-e2e/docker/docker-compose.yml +++ b/packages/db-collection-e2e/docker/docker-compose.yml @@ -29,6 +29,7 @@ services: environment: DATABASE_URL: postgresql://postgres:password@postgres:5432/e2e_test?sslmode=disable ELECTRIC_INSECURE: true + ELECTRIC_FEATURE_FLAGS: "allow_subqueries,tagged_subqueries" ports: - "3000:3000" depends_on: diff --git a/packages/db-collection-e2e/src/index.ts b/packages/db-collection-e2e/src/index.ts index ae6031196..05ae6ea55 100644 --- a/packages/db-collection-e2e/src/index.ts +++ b/packages/db-collection-e2e/src/index.ts @@ -26,3 +26,4 @@ export { createCollationTestSuite } from "./suites/collation.suite" export { createMutationsTestSuite } from "./suites/mutations.suite" export { createLiveUpdatesTestSuite } from "./suites/live-updates.suite" export { createProgressiveTestSuite } from "./suites/progressive.suite" +export { createMovesTestSuite as createTagsTestSuite } from "./suites/moves.suite" diff --git a/packages/db-collection-e2e/src/suites/moves.suite.ts b/packages/db-collection-e2e/src/suites/moves.suite.ts new file mode 100644 index 000000000..eed4832c4 --- /dev/null +++ b/packages/db-collection-e2e/src/suites/moves.suite.ts @@ -0,0 +1,673 @@ +/** + * Tags Test Suite + * + * Tests Electric collection tag behavior with subqueries + * Only Electric collection supports tags (via shapes with subqueries) + */ + +import { randomUUID } from "node:crypto" +import { beforeAll, describe, expect, it } from "vitest" +import { createCollection } from "@tanstack/db" +import { electricCollectionOptions } from "@tanstack/electric-db-collection" +import { waitFor } from "../utils/helpers" +import type { E2ETestConfig } from "../types" +import type { Client } from "pg" +import type { Collection } from "@tanstack/db" +import type { ElectricCollectionUtils } from "@tanstack/electric-db-collection" + +interface TagsTestConfig extends E2ETestConfig { + tagsTestSetup: { + dbClient: Client + baseUrl: string + testSchema: string + usersTable: string + postsTable: string + } +} + +export function createMovesTestSuite(getConfig: () => Promise) { + describe(`Moves Suite`, () => { + let usersTable: string + let postsTable: string + let dbClient: Client + let baseUrl: string + let testSchema: string + let config: TagsTestConfig + + beforeAll(async () => { + config = await getConfig() + const setup = config.tagsTestSetup + dbClient = setup.dbClient + baseUrl = setup.baseUrl + testSchema = setup.testSchema + usersTable = setup.usersTable + postsTable = setup.postsTable + }) + + // Helper to create a collection on posts table with WHERE clause that has nested subquery + // This creates a shape: posts WHERE userId IN (SELECT id FROM users WHERE isActive = true) + // When a user's isActive changes, posts will move in/out of this shape + function createPostsByActiveUsersCollection( + id: string = `tags-posts-active-users-${Date.now()}` + ): Collection { + // Remove quotes from table names for the WHERE clause SQL + const usersTableUnquoted = usersTable.replace(/"/g, ``) + + return createCollection( + electricCollectionOptions({ + id, + shapeOptions: { + url: `${baseUrl}/v1/shape`, + params: { + table: `${testSchema}.${postsTable}`, + // WHERE clause with nested subquery + // Posts will move in/out when users' isActive changes + // Column reference should be just the column name, not the full table path + where: `"userId" IN (SELECT id FROM ${testSchema}.${usersTableUnquoted} WHERE "isActive" = true)`, + }, + }, + syncMode: `eager`, + getKey: (item: any) => item.id, + startSync: true, + }) + ) as any + } + + // Helper to wait for collection to be ready + async function waitForReady( + collection: Collection + ) { + await collection.preload() + await waitFor(() => collection.status === `ready`, { + timeout: 30000, + message: `Collection did not become ready`, + }) + } + + // Helper to wait for a specific item to appear + async function waitForItem( + collection: Collection, + itemId: string, + timeout: number = 10000 + ) { + await waitFor(() => collection.has(itemId), { + timeout, + message: `Item ${itemId} did not appear in collection`, + }) + } + + // Helper to wait for a specific item to disappear + async function waitForItemRemoved( + collection: Collection, + itemId: string, + timeout: number = 2000 + ) { + await waitFor(() => !collection.has(itemId), { + timeout, + message: `Item ${itemId} was not removed from collection`, + }) + } + + it(`Initial snapshot contains only posts from active users`, async () => { + // Create collection on posts with WHERE clause: userId IN (SELECT id FROM users WHERE isActive = true) + const collection = createPostsByActiveUsersCollection() + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert 2 active users and 1 inactive user + const userId1 = randomUUID() + const userId2 = randomUUID() + const userId3 = randomUUID() + + await config.mutations.insertUser({ + id: userId1, + name: `Active User 1`, + email: `user1@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + await config.mutations.insertUser({ + id: userId2, + name: `Active User 2`, + email: `user2@test.com`, + age: 30, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + await config.mutations.insertUser({ + id: userId3, + name: `Inactive User`, + email: `user3@test.com`, + age: 42, + isActive: false, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert posts for these users + const postId1 = randomUUID() + const postId2 = randomUUID() + const postId3 = randomUUID() + + await config.mutations.insertPost({ + id: postId1, + userId: userId1, + title: `Post 1`, + content: `Content 1`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await config.mutations.insertPost({ + id: postId2, + userId: userId2, + title: `Post 2`, + content: `Content 2`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await config.mutations.insertPost({ + id: postId3, + userId: userId3, + title: `Post 3`, + content: `Content 3`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + // Wait for collection to sync + await waitForReady(collection) + + // Wait for both posts to appear (users are active, so posts match the subquery) + await waitForItem(collection, postId1) + await waitForItem(collection, postId2) + + // Verify only posts 1 and 2 are in the collection + expect(collection.has(postId1)).toBe(true) + expect(collection.has(postId2)).toBe(true) + expect(collection.has(postId3)).toBe(false) + + // Wait a bit to make sure post 3 is not coming in later + await new Promise((resolve) => setTimeout(resolve, 50)) + expect(collection.has(postId3)).toBe(false) + + // Note: Tags are internal to Electric and may not be directly accessible + // The test verifies that posts with matching conditions appear in snapshot + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable}`) + await dbClient.query(`DELETE FROM ${usersTable}`) + await collection.cleanup() + }) + + it(`Move-in: row becomes eligible for subquery`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert user with isActive = false + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Inactive User`, + email: `inactive@test.com`, + age: 25, + isActive: false, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `Inactive User Post`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + // Wait a bit to ensure post doesn't appear (user is inactive, so post doesn't match subquery) + await new Promise((resolve) => setTimeout(resolve, 500)) + expect(collection.has(postId)).toBe(false) + + // Update user to isActive = true (move-in for the post) + await config.mutations.updateUser(userId, { isActive: true }) + + // Wait for post to appear (move-in) + await waitForItem(collection, postId, 1000) + expect(collection.has(postId)).toBe(true) + expect(collection.get(postId)?.title).toBe(`Inactive User Post`) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + await config.mutations.deleteUser(userId) + await collection.cleanup() + }) + + it(`Move-out: row becomes ineligible for subquery`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert user with isActive = true + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Active User`, + email: `active@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `Active User Post`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + // Wait for post to appear (user is active, so post matches subquery) + await waitForItem(collection, postId) + expect(collection.has(postId)).toBe(true) + + // Update user to isActive = false (move-out for the post) + await config.mutations.updateUser(userId, { isActive: false }) + + // Wait for post to be removed (move-out) + await waitForItemRemoved(collection, postId) + expect(collection.has(postId)).toBe(false) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + await config.mutations.deleteUser(userId) + await collection.cleanup() + }) + + it(`Move-out → move-in cycle ("flapping row")`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert user with isActive = true + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Flapping User`, + email: `flapping@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `Flapping Post`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await waitForItem(collection, postId) + expect(collection.has(postId)).toBe(true) + + // Move-out: isActive = false + await config.mutations.updateUser(userId, { isActive: false }) + await waitForItemRemoved(collection, postId, 15000) + expect(collection.has(postId)).toBe(false) + + // Move-in: isActive = true + await config.mutations.updateUser(userId, { isActive: true }) + await waitForItem(collection, postId, 15000) + expect(collection.has(postId)).toBe(true) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + await config.mutations.deleteUser(userId) + await collection.cleanup() + }) + + it(`Tags-only update (row stays within subquery)`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert user with isActive = true + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Active User`, + email: `active@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `Tagged Post`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await waitForItem(collection, postId) + expect(collection.has(postId)).toBe(true) + + // Update post title (tags might change but post stays in subquery since user is still active) + await dbClient.query( + `UPDATE ${postsTable} SET title = $1 WHERE id = $2`, + [`Updated Tagged Post`, postId] + ) + + // Wait a bit and verify post still exists + await new Promise((resolve) => setTimeout(resolve, 500)) + expect(collection.has(postId)).toBe(true) + expect(collection.get(postId)?.title).toBe(`Updated Tagged Post`) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + await config.mutations.deleteUser(userId) + await collection.cleanup() + }) + + it(`Database DELETE leads to row being removed from collection`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert user with isActive = true + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Active User`, + email: `active@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `To Be Deleted`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await waitForItem(collection, postId) + expect(collection.has(postId)).toBe(true) + + // Delete post in Postgres + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + + // Wait for post to be removed + await waitForItemRemoved(collection, postId) + expect(collection.has(postId)).toBe(false) + + // Clean up + await config.mutations.deleteUser(userId) + await collection.cleanup() + }) + + it(`Snapshot after move-out should not re-include removed rows`, async () => { + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Create first collection + const collection1 = createPostsByActiveUsersCollection() + await waitForReady(collection1) + + // Insert user with isActive = true + const userId = randomUUID() + await config.mutations.insertUser({ + id: userId, + name: `Snapshot Test User`, + email: `snapshot@test.com`, + age: 25, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert post for this user + const postId = randomUUID() + await config.mutations.insertPost({ + id: postId, + userId, + title: `Snapshot Test Post`, + content: `Content`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await waitForItem(collection1, postId) + expect(collection1.has(postId)).toBe(true) + + // Update user → post moves out + await config.mutations.updateUser(userId, { isActive: false }) + + await waitForItemRemoved(collection1, postId) + expect(collection1.has(postId)).toBe(false) + + // Clean up first collection + await collection1.cleanup() + + // Create fresh collection (new subscription) + const collection2 = createPostsByActiveUsersCollection() + await waitForReady(collection2) + + // Wait a bit to ensure snapshot is complete + await new Promise((resolve) => setTimeout(resolve, 1000)) + + // Snapshot should NOT include the removed post (user is inactive) + expect(collection2.has(postId)).toBe(false) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId]) + await config.mutations.deleteUser(userId) + await collection2.cleanup() + }) + + it(`Multi-row transaction: some rows move in, some move out`, async () => { + const collection = createPostsByActiveUsersCollection() + await waitForReady(collection) + + if (!config.mutations) { + throw new Error(`Mutations not configured`) + } + + // Insert 3 users all with isActive = true + const userId1 = randomUUID() + const userId2 = randomUUID() + const userId3 = randomUUID() + + await config.mutations.insertUser({ + id: userId1, + name: `User 1`, + email: `user1@test.com`, + age: 25, + isActive: false, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + await config.mutations.insertUser({ + id: userId2, + name: `User 2`, + email: `user2@test.com`, + age: 30, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + await config.mutations.insertUser({ + id: userId3, + name: `User 3`, + email: `user3@test.com`, + age: 35, + isActive: true, + createdAt: new Date(), + metadata: null, + deletedAt: null, + }) + + // Insert posts for these users + const postId1 = randomUUID() + const postId2 = randomUUID() + const postId3 = randomUUID() + + await config.mutations.insertPost({ + id: postId1, + userId: userId1, + title: `Post 1`, + content: `Content 1`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await config.mutations.insertPost({ + id: postId2, + userId: userId2, + title: `Post 2`, + content: `Content 2`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + await config.mutations.insertPost({ + id: postId3, + userId: userId3, + title: `Post 3`, + content: `Content 3`, + viewCount: 0, + largeViewCount: BigInt(0), + publishedAt: null, + deletedAt: null, + }) + + // Wait for posts 2 and 3 to appear + await waitForItem(collection, postId2) + await waitForItem(collection, postId3) + + expect(collection.has(postId1)).toBe(false) + + // In one SQL transaction: + // user1: isActive → true (post1 moves in) + // post2: title change (stays in since user2 is still active) + // user3: isActive → false (post3 moves out) + await dbClient.query(`BEGIN`) + try { + await dbClient.query( + `UPDATE ${usersTable} SET "isActive" = $1 WHERE id = $2`, + [true, userId1] + ) + await dbClient.query( + `UPDATE ${postsTable} SET title = $1 WHERE id = $2`, + [`Updated Post 2`, postId2] + ) + await dbClient.query( + `UPDATE ${usersTable} SET "isActive" = $1 WHERE id = $2`, + [false, userId3] + ) + await dbClient.query(`COMMIT`) + } catch (error) { + await dbClient.query(`ROLLBACK`) + throw error + } + + // Wait for changes to propagate + await waitForItemRemoved(collection, postId3) + expect(collection.has(postId1)).toBe(true) // post1: moved in (user1 active) + expect(collection.has(postId2)).toBe(true) // post2: still in (user2 active) + expect(collection.get(postId2)?.title).toBe(`Updated Post 2`) + expect(collection.has(postId3)).toBe(false) // post3: moved out (user3 inactive) + + // Clean up + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId1]) + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId2]) + await dbClient.query(`DELETE FROM ${postsTable} WHERE id = $1`, [postId3]) + await config.mutations.deleteUser(userId1) + await config.mutations.deleteUser(userId2) + await config.mutations.deleteUser(userId3) + await collection.cleanup() + }) + }) +} diff --git a/packages/electric-db-collection/e2e/electric.e2e.test.ts b/packages/electric-db-collection/e2e/electric.e2e.test.ts index 67154bf21..d16552d86 100644 --- a/packages/electric-db-collection/e2e/electric.e2e.test.ts +++ b/packages/electric-db-collection/e2e/electric.e2e.test.ts @@ -17,6 +17,7 @@ import { createPaginationTestSuite, createPredicatesTestSuite, createProgressiveTestSuite, + createTagsTestSuite, generateSeedData, } from "../../db-collection-e2e/src/index" import { waitFor } from "../../db-collection-e2e/src/utils/helpers" @@ -31,7 +32,15 @@ declare module "vitest" { } describe(`Electric Collection E2E Tests`, () => { - let config: E2ETestConfig + let config: E2ETestConfig & { + tagsTestSetup?: { + dbClient: Client + baseUrl: string + testSchema: string + usersTable: string + postsTable: string + } + } let dbClient: Client let usersTable: string let postsTable: string @@ -433,6 +442,13 @@ describe(`Electric Collection E2E Tests`, () => { commentsUpToDateControl.current?.() }, }, + tagsTestSetup: { + dbClient, + baseUrl, + testSchema, + usersTable, + postsTable, + }, getTxid: async () => { // Get the current transaction ID from the last operation // This uses pg_current_xact_id_if_assigned() which returns the txid @@ -578,4 +594,5 @@ describe(`Electric Collection E2E Tests`, () => { createMutationsTestSuite(getConfig) createLiveUpdatesTestSuite(getConfig) createProgressiveTestSuite(getConfig) + createTagsTestSuite(getConfig as any) }) diff --git a/packages/electric-db-collection/package.json b/packages/electric-db-collection/package.json index a4ff4c24d..165869fd0 100644 --- a/packages/electric-db-collection/package.json +++ b/packages/electric-db-collection/package.json @@ -3,7 +3,7 @@ "description": "ElectricSQL collection for TanStack DB", "version": "0.2.10", "dependencies": { - "@electric-sql/client": "^1.2.0", + "@electric-sql/client": "https://pkg.pr.new/@electric-sql/client@3497", "@standard-schema/spec": "^1.0.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 1ec043fd2..e2c965b49 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -14,8 +14,25 @@ import { TimeoutWaitingForTxIdError, } from "./errors" import { compileSQL } from "./sql-compiler" +import { + addTagToIndex, + findRowsMatchingPattern, + getTagLength, + isMoveOutMessage, + removeTagFromIndex, + tagMatchesPattern, +} from "./tag-index" +import type { + MoveOutPattern, + MoveTag, + ParsedMoveTag, + RowId, + TagIndex, +} from "./tag-index" import type { BaseCollectionConfig, + ChangeMessage, + Collection, CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, @@ -806,6 +823,235 @@ function createElectricSync>( // Store for the relation schema information const relationSchema = new Store(undefined) + const tagCache = new Map() + + // Parses a tag string into a MoveTag. + // It memoizes the result parsed tag such that future calls + // for the same tag string return the same MoveTag array. + const parseTag = (tag: MoveTag): ParsedMoveTag => { + const cachedTag = tagCache.get(tag) + if (cachedTag) { + return cachedTag + } + + const parsedTag = tag.split(`|`) + tagCache.set(tag, parsedTag) + return parsedTag + } + + // Tag tracking state + const rowTagSets = new Map>() + const tagIndex: TagIndex = [] + let tagLength: number | undefined = undefined + + /** + * Initialize the tag index with the correct length + */ + const initializeTagIndex = (length: number): void => { + if (tagIndex.length < length) { + // Extend the index array to the required length + for (let i = tagIndex.length; i < length; i++) { + tagIndex[i] = new Map() + } + } + } + + /** + * Add tags to a row and update the tag index + */ + const addTagsToRow = ( + tags: Array, + rowId: RowId, + rowTagSet: Set + ): void => { + for (const tag of tags) { + const parsedTag = parseTag(tag) + + // Infer tag length from first tag + if (tagLength === undefined) { + tagLength = getTagLength(parsedTag) + initializeTagIndex(tagLength) + } + + // Validate tag length matches + const currentTagLength = getTagLength(parsedTag) + if (currentTagLength !== tagLength) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Tag length mismatch: expected ${tagLength}, got ${currentTagLength}` + ) + continue + } + + rowTagSet.add(tag) + addTagToIndex(parsedTag, rowId, tagIndex, tagLength) + } + } + + /** + * Remove tags from a row and update the tag index + */ + const removeTagsFromRow = ( + removedTags: Array, + rowId: RowId, + rowTagSet: Set + ): void => { + if (tagLength === undefined) { + return + } + + for (const tag of removedTags) { + const parsedTag = parseTag(tag) + rowTagSet.delete(tag) + removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength) + // We aggresively evict the tag from the cache + // if this tag is shared with another row + // and is not removed from that other row + // then next time we encounter the tag it will be parsed again + tagCache.delete(tag) + } + } + + /** + * Process tags for a change message (add and remove tags) + */ + const processTagsForChangeMessage = ( + tags: Array | undefined, + removedTags: Array | undefined, + rowId: RowId + ): Set => { + // Initialize tag set for this row if it doesn't exist (needed for checking deletion) + if (!rowTagSets.has(rowId)) { + rowTagSets.set(rowId, new Set()) + } + const rowTagSet = rowTagSets.get(rowId)! + + // Add new tags + if (tags) { + addTagsToRow(tags, rowId, rowTagSet) + } + + // Remove tags + if (removedTags) { + removeTagsFromRow(removedTags, rowId, rowTagSet) + } + + return rowTagSet + } + + /** + * Clear all tag tracking state (used when truncating) + */ + const clearTagTrackingState = (): void => { + rowTagSets.clear() + tagIndex.length = 0 + tagLength = undefined + } + + /** + * Remove all tags for a row from both the tag set and the index + * Used when a row is deleted + */ + const clearTagsForRow = (rowId: RowId): void => { + if (tagLength === undefined) { + return + } + + const rowTagSet = rowTagSets.get(rowId) + if (!rowTagSet) { + return + } + + // Remove each tag from the index + for (const tag of rowTagSet) { + const parsedTag = parseTag(tag) + const currentTagLength = getTagLength(parsedTag) + if (currentTagLength === tagLength) { + removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength) + } + tagCache.delete(tag) + } + + // Remove the row from the tag sets map + rowTagSets.delete(rowId) + } + + /** + * Remove matching tags from a row based on a pattern + * Returns true if the row's tag set is now empty + */ + const removeMatchingTagsFromRow = ( + rowId: RowId, + pattern: MoveOutPattern + ): boolean => { + const rowTagSet = rowTagSets.get(rowId) + if (!rowTagSet) { + return false + } + + // Find tags that match this pattern and remove them + for (const tag of rowTagSet) { + const parsedTag = parseTag(tag) + if (tagMatchesPattern(parsedTag, pattern)) { + rowTagSet.delete(tag) + removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!) + } + } + + // Check if row's tag set is now empty + if (rowTagSet.size === 0) { + rowTagSets.delete(rowId) + return true + } + + return false + } + + /** + * Process move-out event: remove matching tags from rows and delete rows with empty tag sets + */ + const processMoveOutEvent = ( + patterns: Array, + collection: Collection, + begin: () => void, + write: (message: Omit, `key`>) => void, + transactionStarted: boolean + ): boolean => { + if (tagLength === undefined) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Received move-out message but no tag length set yet, ignoring` + ) + return transactionStarted + } + + let txStarted = transactionStarted + + // Process all patterns and collect rows to delete + for (const pattern of patterns) { + // Find all rows that match this pattern + const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex) + + for (const rowId of affectedRowIds) { + if (removeMatchingTagsFromRow(rowId, pattern)) { + // Delete rows with empty tag sets + if (!txStarted) { + begin() + txStarted = true + } + + const rowValue = collection.get(rowId) + if (rowValue !== undefined) { + write({ + type: `delete`, + value: rowValue, + }) + } + } + } + } + + return txStarted + } + /** * Get the sync metadata for insert operations * @returns Record containing relation information @@ -918,6 +1164,36 @@ function createElectricSync>( syncMode === `progressive` && !hasReceivedUpToDate const bufferedMessages: Array> = [] // Buffer change messages during initial sync + /** + * Process a change message: handle tags and write the mutation + */ + const processChangeMessage = (changeMessage: Message) => { + if (!isChangeMessage(changeMessage)) { + return + } + + // Process tags if present + const tags = changeMessage.headers.tags + const removedTags = changeMessage.headers.removed_tags + const hasTags = tags || removedTags + + const rowId = collection.getKeyFromItem(changeMessage.value) + + if (changeMessage.headers.operation === `delete`) { + clearTagsForRow(rowId) + } else if (hasTags) { + processTagsForChangeMessage(tags, removedTags, rowId) + } + + write({ + type: changeMessage.headers.operation, + value: changeMessage.value, + metadata: { + ...changeMessage.headers, + }, + }) + } + // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates @@ -994,14 +1270,7 @@ function createElectricSync>( transactionStarted = true } - write({ - type: message.headers.operation, - value: message.value, - // Include the primary key and relation info in the metadata - metadata: { - ...message.headers, - }, - }) + processChangeMessage(message) } } else if (isSnapshotEndMessage(message)) { // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) @@ -1011,6 +1280,15 @@ function createElectricSync>( hasSnapshotEnd = true } else if (isUpToDateMessage(message)) { hasUpToDate = true + } else if (isMoveOutMessage(message)) { + // Handle move-out event: remove matching tags from rows + transactionStarted = processMoveOutEvent( + message.headers.patterns, + collection, + begin, + write, + transactionStarted + ) } else if (isMustRefetchMessage(message)) { debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate` @@ -1024,6 +1302,9 @@ function createElectricSync>( truncate() + // Clear tag tracking state + clearTagTrackingState() + // Reset the loadSubset deduplication state since we're starting fresh // This ensures that previously loaded predicates don't prevent refetching after truncate loadSubsetDedupe?.reset() @@ -1049,16 +1330,13 @@ function createElectricSync>( // Truncate to clear all snapshot data truncate() + // Clear tag tracking state for atomic swap + clearTagTrackingState() + // Apply all buffered change messages and extract txids/snapshots for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { - write({ - type: bufferedMsg.headers.operation, - value: bufferedMsg.value, - metadata: { - ...bufferedMsg.headers, - }, - }) + processChangeMessage(bufferedMsg) // Extract txids from buffered messages (will be committed to store after transaction) if (hasTxids(bufferedMsg)) { diff --git a/packages/electric-db-collection/src/tag-index.ts b/packages/electric-db-collection/src/tag-index.ts new file mode 100644 index 000000000..80b1fc737 --- /dev/null +++ b/packages/electric-db-collection/src/tag-index.ts @@ -0,0 +1,160 @@ +// Import Row and Message types for the isEventMessage function +import type { Message, Row } from "@electric-sql/client" + +export type RowId = string | number +export type MoveTag = string +export type ParsedMoveTag = Array +export type Position = number +export type Value = string +export type MoveOutPattern = { + pos: Position + value: Value +} + +const TAG_WILDCARD = `_` + +/** + * Event message type for move-out events + */ +export interface EventMessage { + headers: { + event: `move-out` + patterns: Array + } +} + +/** + * Tag index structure: array indexed by position, maps value to set of row IDs. + * For example: + * ```example + * const tag1 = [a, b, c] + * const tag2 = [a, b, d] + * const tag3 = [a, d, e] + * + * // Index is: + * [ + * new Map([a -> ]) + * new Map([b -> , d -> ]) + * new Map([c -> , d -> , e -> ]) + * ] + * ``` + */ +export type TagIndex = Array>> + +/** + * Abstraction to get the value at a specific position in a tag + */ +export function getValue(tag: ParsedMoveTag, position: Position): Value { + if (position >= tag.length) { + throw new Error(`Position out of bounds`) + } + return tag[position]! +} + +/** + * Abstraction to extract position and value from a pattern. + */ +function getPositionalValue(pattern: MoveOutPattern): { + pos: number + value: string +} { + return pattern +} + +/** + * Abstraction to get the length of a tag + */ +export function getTagLength(tag: ParsedMoveTag): number { + return tag.length +} + +/** + * Check if a tag matches a pattern. + * A tag matches if the value at the pattern's position equals the pattern's value, + * or if the value at that position is "_" (wildcard). + */ +export function tagMatchesPattern( + tag: ParsedMoveTag, + pattern: MoveOutPattern +): boolean { + const { pos, value } = getPositionalValue(pattern) + const tagValue = getValue(tag, pos) + return tagValue === value || tagValue === TAG_WILDCARD +} + +/** + * Add a tag to the index for efficient pattern matching + */ +export function addTagToIndex( + tag: ParsedMoveTag, + rowId: RowId, + index: TagIndex, + tagLength: number +): void { + for (let i = 0; i < tagLength; i++) { + const value = getValue(tag, i) + + // Only index non-wildcard values + if (value !== TAG_WILDCARD) { + const positionIndex = index[i]! + if (!positionIndex.has(value)) { + positionIndex.set(value, new Set()) + } + + const tags = positionIndex.get(value)! + tags.add(rowId) + } + } +} + +/** + * Remove a tag from the index + */ +export function removeTagFromIndex( + tag: ParsedMoveTag, + rowId: RowId, + index: TagIndex, + tagLength: number +): void { + for (let i = 0; i < tagLength; i++) { + const value = getValue(tag, i) + + // Only remove non-wildcard values + if (value !== TAG_WILDCARD) { + const positionIndex = index[i] + if (positionIndex) { + const rowSet = positionIndex.get(value) + if (rowSet) { + rowSet.delete(rowId) + + // Clean up empty sets + if (rowSet.size === 0) { + positionIndex.delete(value) + } + } + } + } + } +} + +/** + * Find all rows that match a given pattern + */ +export function findRowsMatchingPattern( + pattern: MoveOutPattern, + index: TagIndex +): Set { + const { pos, value } = getPositionalValue(pattern) + const positionIndex = index[pos] + const rowSet = positionIndex?.get(value) + return rowSet ?? new Set() +} + +/** + * Check if a message is an event message with move-out event + */ +export function isMoveOutMessage>( + message: Message +): message is Message & EventMessage { + return message.headers.event === `move-out` +} diff --git a/packages/electric-db-collection/tests/tags.test.ts b/packages/electric-db-collection/tests/tags.test.ts new file mode 100644 index 000000000..794fce7d3 --- /dev/null +++ b/packages/electric-db-collection/tests/tags.test.ts @@ -0,0 +1,1241 @@ +import { beforeEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "@tanstack/db" +import { electricCollectionOptions } from "../src/electric" +import type { ElectricCollectionUtils } from "../src/electric" +import type { Collection } from "@tanstack/db" +import type { Message, Row } from "@electric-sql/client" +import type { StandardSchemaV1 } from "@standard-schema/spec" +import type { MoveOutPattern } from "../src/tag-index" + +// Mock the ShapeStream module +const mockSubscribe = vi.fn() +const mockRequestSnapshot = vi.fn() +const mockFetchSnapshot = vi.fn() +const mockStream = { + subscribe: mockSubscribe, + requestSnapshot: mockRequestSnapshot, + fetchSnapshot: mockFetchSnapshot, +} + +vi.mock(`@electric-sql/client`, async () => { + const actual = await vi.importActual(`@electric-sql/client`) + return { + ...actual, + ShapeStream: vi.fn(() => mockStream), + } +}) + +describe(`Electric Tag Tracking and GC`, () => { + let collection: Collection< + Row, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + Row + > + let subscriber: (messages: Array>) => void + + beforeEach(() => { + vi.clearAllMocks() + + // Reset mock subscriber + mockSubscribe.mockImplementation((callback) => { + subscriber = callback + return () => {} + }) + + // Reset mock requestSnapshot + mockRequestSnapshot.mockResolvedValue(undefined) + + // Create collection with Electric configuration + const config = { + id: `test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + } + + // Get the options with utilities + const options = electricCollectionOptions(config) + + // Create collection with Electric configuration + collection = createCollection(options) as unknown as Collection< + Row, + string | number, + ElectricCollectionUtils, + StandardSchemaV1, + Row + > + }) + + it(`should track tags when rows are inserted with tags`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + + // Insert row with tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1, tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + expect(collection.status).toEqual(`ready`) + + // Remove first tag - row should still exist + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `update`, + removed_tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Remove last tag - row should be garbage collected + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual(new Map()) + }) + + it(`should track tags when rows are updated with new tags`, () => { + const tag1 = `hash1|hash2|hash3` + + // Insert row with tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Update with additional tags + const tag2 = `hash4|hash5|hash6` + subscriber([ + { + key: `1`, + value: { id: 1, name: `Updated User` }, + headers: { + operation: `update`, + tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Updated User` }]]) + ) + + // Remove first tag - row should still exist + subscriber([ + { + key: `1`, + value: { id: 1, name: `Updated User` }, + headers: { + operation: `update`, + removed_tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Updated User` }]]) + ) + + // Remove last tag - row should be garbage collected + subscriber([ + { + key: `1`, + value: { id: 1, name: `Updated User` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual(new Map()) + }) + + it(`should track tags that are structurally equal`, () => { + const tag1 = `hash1|hash2|hash3` + const tag1Copy = `hash1|hash2|hash3` + + // Insert row with tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Remove first tag - row should be gone + subscriber([ + { + key: `1`, + value: { id: 1, name: `Updated User` }, + headers: { + operation: `delete`, + removed_tags: [tag1Copy], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual(new Map()) + }) + + it(`should not interfere between rows with distinct tags`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + const tag3 = `hash7|hash8|hash9` + const tag4 = `hash10|hash11|hash12` + + // Insert multiple rows with some shared tags + // Row 1: tag1, tag2 + // Row 2: tag2 (shared with row 1), tag3 + // Row 3: tag3 (shared with row 2), tag4 + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [tag1, tag2], + }, + }, + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [tag2, tag3], + }, + }, + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `insert`, + tags: [tag3, tag4], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // All rows should exist + expect(collection.state.size).toBe(3) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove tag1 from row 1 - row 1 should still exist (has tag2), others unaffected + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `update`, + removed_tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 1 should still exist (has tag2), rows 2 and 3 unaffected + expect(collection.state.size).toBe(3) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove tag2 from row 1 (shared tag) - row 1 should be deleted + // Row 2 should still exist because it has tag3 (tag2 removal only affects row 1) + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 1 should be garbage collected, rows 2 and 3 should remain + // Row 2 still has tag2 and tag3, so removing tag2 from row 1 doesn't affect it + expect(collection.state.size).toBe(2) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove tag3 from row 2 - row 2 should still exist (has tag2) + // Row 3 should still exist because it has tag4 (tag3 removal only affects row 2) + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `update`, + removed_tags: [tag3], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 2 should still exist (has tag3), row 3 unaffected + expect(collection.state.size).toBe(2) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove tag2 from row 2 (shared tag) - row 2 should be deleted + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 2 should be garbage collected, row 3 should remain + // Row 3 still has tag3 and tag4 + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.has(2)).toBe(false) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + }) + + it(`should require exact match in removed_tags for tags with wildcards (underscore)`, () => { + const tagWithWildcard = `hash1|_|hash3` + const tagWithoutWildcard = `hash1|hash2|hash3` + + // Insert row with wildcard tag + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [tagWithWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + + // Try to remove with non-matching tag (has specific value instead of wildcard) + // Should NOT remove because it doesn't match exactly + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `update`, + removed_tags: [tagWithoutWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist because the tag didn't match exactly + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + + // Remove with exact match (wildcard tag) + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `delete`, + removed_tags: [tagWithWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be garbage collected because exact match was removed + expect(collection.state.size).toBe(0) + expect(collection.state.has(1)).toBe(false) + + // Insert row with specific value tag (no wildcard) + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [tagWithoutWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + + // Try to remove with wildcard tag - should NOT match + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `update`, + removed_tags: [tagWithWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist because wildcard doesn't match specific value + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + + // Remove with exact match (specific value tag) + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `delete`, + removed_tags: [tagWithoutWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be garbage collected + expect(collection.state.size).toBe(0) + expect(collection.state.has(2)).toBe(false) + + // Test with multiple tags including wildcards + const tagWildcard1 = `hash1|_|hash3` + const tagWildcard2 = `hash4|_|hash6` + const tagSpecific = `hash1|hash2|hash3` + + subscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `insert`, + tags: [tagWildcard1, tagWildcard2, tagSpecific], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove one wildcard tag with exact match + subscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `update`, + removed_tags: [tagWildcard1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist (has tagWildcard2 and tagSpecific) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Try to remove wildcard tag with non-matching specific value + subscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `update`, + removed_tags: [tagWithoutWildcard], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist because tagWithoutWildcard doesn't match tagWildcard2 + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove specific tag with exact match + subscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `update`, + removed_tags: [tagSpecific], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist (has tagWildcard2) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Remove last wildcard tag with exact match + subscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `delete`, + removed_tags: [tagWildcard2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be garbage collected + expect(collection.state.size).toBe(0) + expect(collection.state.has(3)).toBe(false) + }) + + it(`should handle move-out events that remove matching tags`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash1|hash2|hash4` + const tag3 = `hash5|hash6|hash1` + + // Insert rows with tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [tag2], + }, + }, + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `insert`, + tags: [tag3], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.size).toBe(3) + + // Send move-out event with pattern matching hash1 at position 0 + const pattern: MoveOutPattern = { + pos: 0, + value: `hash1`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [pattern], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Rows 1 and 2 should be deleted (they have hash1 at position 0) + // Row 3 should remain (has hash5 at position 0) + expect(collection.state.size).toBe(1) + expect(collection.state.has(3)).toBe(true) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + }) + + it(`should remove shared tags from all rows when move-out pattern matches`, () => { + // Create tags where some are shared between rows + const sharedTag1 = `hash1|hash2|hash3` // Shared by rows 1 and 2 + const sharedTag2 = `hash4|hash5|hash6` // Shared by rows 2 and 3 + const uniqueTag1 = `hash7|hash8|hash9` // Only in row 1 + const uniqueTag2 = `hash10|hash11|hash12` // Only in row 3 + + // Insert rows with multiple tags, some shared + // Row 1: sharedTag1, uniqueTag1 + // Row 2: sharedTag1, sharedTag2 + // Row 3: sharedTag2, uniqueTag2 + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [sharedTag1, uniqueTag1], + }, + }, + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [sharedTag1, sharedTag2], + }, + }, + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `insert`, + tags: [sharedTag2, uniqueTag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.size).toBe(3) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Send move-out event matching sharedTag1 (hash1 at position 0) + // This should remove sharedTag1 from both row 1 and row 2 + const pattern: MoveOutPattern = { + pos: 0, + value: `hash1`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [pattern], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 1 should be deleted (only had sharedTag1 and uniqueTag1, sharedTag1 removed, but uniqueTag1 should remain... wait) + // Actually, if sharedTag1 matches the pattern, it should be removed from row 1 + // Row 1 has [sharedTag1, uniqueTag1], so after removing sharedTag1, it still has uniqueTag1 + // Row 2 has [sharedTag1, sharedTag2], so after removing sharedTag1, it still has sharedTag2 + // So both rows should still exist + expect(collection.state.size).toBe(3) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Send move-out event matching sharedTag2 (hash4 at position 0) + // This should remove sharedTag2 from both row 2 and row 3 + const pattern2: MoveOutPattern = { + pos: 0, + value: `hash4`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [pattern2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 2 should be deleted (had sharedTag1 and sharedTag2, both removed) + // Row 3 should still exist (has uniqueTag2) + // Row 1 should still exist (has uniqueTag1) + expect(collection.state.size).toBe(2) + expect(collection.state.has(2)).toBe(false) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + + // Send move-out event matching uniqueTag1 (hash7 at position 0) + // This should remove uniqueTag1 from row 1 + const pattern3: MoveOutPattern = { + pos: 0, + value: `hash7`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [pattern3], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row 1 should be deleted (no tags left) + // Row 3 should still exist (has uniqueTag2) + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.has(2)).toBe(false) + expect(collection.state.get(3)).toEqual({ id: 3, name: `User 3` }) + }) + + it(`should not remove tags with underscores when pattern matches non-indexed position`, () => { + // Tag with underscore at position 1: a|_|c + // This tag is NOT indexed at position 1 (because of underscore) + const tagWithUnderscore = `a|_|c` + + // Insert row with tag containing underscore + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [tagWithUnderscore], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.size).toBe(1) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + + // Send move-out event with pattern matching position 1 (where underscore is) + // Since the tag is not indexed at position 1, it won't be found in the index + // and the tag should remain + const patternNonIndexed: MoveOutPattern = { + pos: 1, + value: `b`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [patternNonIndexed], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist because the tag wasn't found in the index + expect(collection.state.size).toBe(1) + expect(collection.state.get(1)).toEqual({ id: 1, name: `User 1` }) + + // Send move-out event with pattern matching position 2 (where 'c' is) + // Position 2 is indexed (has value 'c'), so it will be found in the index + // The pattern matching position 2 with value 'c' matches the tag a|_|c, so the tag is removed + const patternIndexed: MoveOutPattern = { + pos: 2, + value: `c`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [patternIndexed], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be garbage collected because the tag was removed + // (tagset becomes empty) + expect(collection.state.size).toBe(0) + expect(collection.state.has(1)).toBe(false) + }) + + it(`should handle move-out events with multiple patterns`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + const tag3 = `hash7|hash8|hash9` + + // Insert rows with tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [tag2], + }, + }, + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { + operation: `insert`, + tags: [tag3], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state.size).toBe(3) + + // Send move-out event with multiple patterns + const pattern1: MoveOutPattern = { + pos: 0, + value: `hash1`, + } + const pattern2: MoveOutPattern = { + pos: 0, + value: `hash4`, + } + + subscriber([ + { + headers: { + event: `move-out`, + patterns: [pattern1, pattern2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Rows 1 and 2 should be deleted, row 3 should remain + expect(collection.state.size).toBe(1) + expect(collection.state.has(3)).toBe(true) + }) + + it(`should clear tag state on must-refetch`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + + // Insert row with tag + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Send must-refetch + subscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // The collection should still have old data because truncate is in pending + // transaction. This is the intended behavior of the collection, you should have + // the old data until the next up-to-date message. + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(true) + expect(collection.state.get(1)).toEqual({ id: 1, name: `Test User` }) + + // Send new data after must-refetch + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Collection should now have the new data + expect(collection.state).toEqual(new Map([[2, { id: 2, name: `User 2` }]])) + + // Re-insert with new tag + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([ + [1, { id: 1, name: `Test User` }], + [2, { id: 2, name: `User 2` }], + ]) + ) + + // Remove tag2 and check that the row is gone + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be garbage collected + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(false) + expect(collection.state.has(2)).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, name: `User 2` }) + }) + + it(`should handle rows with no tags (not deleted)`, () => { + // Insert row without tags + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should exist even without tags + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Update the row without tags + subscriber([ + { + key: `1`, + old_value: { id: 1, name: `Test User` }, + value: { id: 1, name: `Updated Test User` }, + headers: { + operation: `update`, + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Updated Test User` }]]) + ) + + // Insert a row with tags + const tag = `hash1|hash2|hash3` + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { + operation: `insert`, + tags: [tag], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should exist + expect(collection.state).toEqual( + new Map([ + [1, { id: 1, name: `Updated Test User` }], + [2, { id: 2, name: `User 2` }], + ]) + ) + + // Move out that matches the tag + const pattern: MoveOutPattern = { + pos: 1, + value: `hash2`, + } + + subscriber([ + { + headers: { event: `move-out`, patterns: [pattern] }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // User 2 should be gine but user 1 should still exist because it was never tagged + expect(collection.state.size).toBe(1) + expect(collection.state.has(1)).toBe(true) + expect(collection.state.has(2)).toBe(false) + expect(collection.state.get(1)).toEqual({ + id: 1, + name: `Updated Test User`, + }) + }) + + it(`should handle adding and removing tags in same update`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + + // Insert row with tag1 + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Update: remove tag1, add tag2 + subscriber([ + { + key: `1`, + value: { id: 1, name: `Updated User` }, + headers: { + operation: `update`, + tags: [tag2], + removed_tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should still exist (has tag2) + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Updated User` }]]) + ) + }) + + it(`should not recover old tags when row is deleted and re-inserted`, () => { + const tag1 = `hash1|hash2|hash3` + const tag2 = `hash4|hash5|hash6` + + // Insert row with tag1 + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `insert`, + tags: [tag1], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Test User` }]]) + ) + + // Delete the row (without tags) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { + operation: `delete`, + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be deleted + expect(collection.state.size).toBe(0) + expect(collection.state.has(1)).toBe(false) + + // Insert the row again with a new tag (tag2) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Re-inserted User` }, + headers: { + operation: `insert`, + tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should exist with new tag + expect(collection.state).toEqual( + new Map([[1, { id: 1, name: `Re-inserted User` }]]) + ) + + // Update the row with removed_tags including its new tag (tag2) + // The row should NOT have the old tag1, only tag2 + subscriber([ + { + key: `1`, + value: { id: 1, name: `Re-inserted User` }, + headers: { + operation: `delete`, + removed_tags: [tag2], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Row should be gone because tag2 was removed and it doesn't have old tag1 + expect(collection.state.size).toBe(0) + expect(collection.state.has(1)).toBe(false) + }) +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2aa84b14d..88a35f0a9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -603,7 +603,7 @@ importers: version: 0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7) drizzle-zod: specifier: ^0.8.3 - version: 0.8.3(drizzle-orm@0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7))(zod@3.25.76) + version: 0.8.3(drizzle-orm@0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7))(zod@4.1.11) express: specifier: ^4.21.2 version: 4.21.2 @@ -790,8 +790,8 @@ importers: packages/electric-db-collection: dependencies: '@electric-sql/client': - specifier: ^1.2.0 - version: 1.2.0 + specifier: https://pkg.pr.new/@electric-sql/client@3497 + version: https://pkg.pr.new/@electric-sql/client@3497 '@standard-schema/spec': specifier: ^1.0.0 version: 1.0.0 @@ -1587,6 +1587,10 @@ packages: '@electric-sql/client@1.2.0': resolution: {integrity: sha512-K/MEjti3UF4aPKJJqO6Tp4f5noqc2/3icU1NPdpKfQaHwbzGtEX2aJmL2vxTEUJbfyrISkPKbOPnrz/lAvw1Vg==} + '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3497': + resolution: {tarball: https://pkg.pr.new/@electric-sql/client@3497} + version: 1.1.5 + '@emnapi/core@1.5.0': resolution: {integrity: sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==} @@ -9919,6 +9923,12 @@ snapshots: optionalDependencies: '@rollup/rollup-darwin-arm64': 4.52.5 + '@electric-sql/client@https://pkg.pr.new/@electric-sql/client@3497': + dependencies: + '@microsoft/fetch-event-source': 2.0.1 + optionalDependencies: + '@rollup/rollup-darwin-arm64': 4.52.5 + '@emnapi/core@1.5.0': dependencies: '@emnapi/wasi-threads': 1.1.0 @@ -14043,11 +14053,6 @@ snapshots: pg: 8.16.3 postgres: 3.4.7 - drizzle-zod@0.8.3(drizzle-orm@0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7))(zod@3.25.76): - dependencies: - drizzle-orm: 0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7) - zod: 3.25.76 - drizzle-zod@0.8.3(drizzle-orm@0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7))(zod@4.1.11): dependencies: drizzle-orm: 0.44.7(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(gel@2.1.1)(kysely@0.28.8)(pg@8.16.3)(postgres@3.4.7)