Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .changeset/where-callback-subscribe-changes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"@tanstack/db": patch
---

Add `where` callback option to `subscribeChanges` for ergonomic filtering

Instead of manually constructing IR with `PropRef`:

```ts
import { eq, PropRef } from "@tanstack/db"
collection.subscribeChanges(callback, {
whereExpression: eq(new PropRef(["status"]), "active"),
})
```

You can now use a callback with query builder functions:

```ts
import { eq } from "@tanstack/db"
collection.subscribeChanges(callback, {
where: (row) => eq(row.status, "active"),
})
```
23 changes: 21 additions & 2 deletions packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { NegativeActiveSubscribersError } from "../errors"
import {
createSingleRowRefProxy,
toExpression,
} from "../query/builder/ref-proxy.js"
import { CollectionSubscription } from "./subscription.js"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { ChangeMessage, SubscribeChangesOptions } from "../types"
Expand Down Expand Up @@ -94,13 +98,28 @@ export class CollectionChangesManager<
*/
public subscribeChanges(
callback: (changes: Array<ChangeMessage<TOutput>>) => void,
options: SubscribeChangesOptions = {}
options: SubscribeChangesOptions<TOutput> = {}
): CollectionSubscription {
// Start sync and track subscriber
this.addSubscriber()

// Compile where callback to whereExpression if provided
if (options.where && options.whereExpression) {
throw new Error(
`Cannot specify both 'where' and 'whereExpression' options. Use one or the other.`
)
}

let whereExpression = options.whereExpression
if (options.where) {
const proxy = createSingleRowRefProxy<TOutput>()
const result = options.where(proxy)
whereExpression = toExpression(result)
}

const subscription = new CollectionSubscription(this.collection, callback, {
...options,
includeInitialState: options.includeInitialState,
whereExpression,
onUnsubscribe: () => {
this.removeSubscriber()
this.changeSubscriptions.delete(subscription)
Expand Down
15 changes: 9 additions & 6 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,26 +848,29 @@ export class CollectionImpl<
* }, { includeInitialState: true })
*
* @example
* // Subscribe only to changes matching a condition
* // Subscribe only to changes matching a condition using where callback
* import { eq } from "@tanstack/db"
*
* const subscription = collection.subscribeChanges((changes) => {
* updateUI(changes)
* }, {
* includeInitialState: true,
* where: (row) => row.status === 'active'
* where: (row) => eq(row.status, "active")
* })
*
* @example
* // Subscribe using a pre-compiled expression
* // Using multiple conditions with and()
* import { and, eq, gt } from "@tanstack/db"
*
* const subscription = collection.subscribeChanges((changes) => {
* updateUI(changes)
* }, {
* includeInitialState: true,
* whereExpression: eq(row.status, 'active')
* where: (row) => and(eq(row.status, "active"), gt(row.priority, 5))
* })
*/
public subscribeChanges(
callback: (changes: Array<ChangeMessage<TOutput>>) => void,
options: SubscribeChangesOptions = {}
options: SubscribeChangesOptions<TOutput> = {}
): CollectionSubscription {
return this._changes.subscribeChanges(callback, options)
}
Expand Down
27 changes: 22 additions & 5 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { Transaction } from "./transactions"
import type { BasicExpression, OrderBy } from "./query/ir.js"
import type { EventEmitter } from "./event-emitter.js"
import type { SingleRowRefProxy } from "./query/builder/ref-proxy.js"

/**
* Interface for a collection-like object that provides the necessary methods
Expand Down Expand Up @@ -720,17 +721,33 @@ export type NamespacedAndKeyedStream = IStreamBuilder<KeyedNamespacedRow>
/**
* Options for subscribing to collection changes
*/
export interface SubscribeChangesOptions {
export interface SubscribeChangesOptions<
T extends object = Record<string, unknown>,
> {
/** Whether to include the current state as initial changes */
includeInitialState?: boolean
/**
* Callback function for filtering changes using a row proxy.
* The callback receives a proxy object that records property access,
* allowing you to use query builder functions like `eq`, `gt`, etc.
*
* @example
* ```ts
* import { eq } from "@tanstack/db"
*
* collection.subscribeChanges(callback, {
* where: (row) => eq(row.status, "active")
* })
* ```
*/
where?: (row: SingleRowRefProxy<T>) => any
/** Pre-compiled expression for filtering changes */
whereExpression?: BasicExpression<boolean>
}

export interface SubscribeChangesSnapshotOptions extends Omit<
SubscribeChangesOptions,
`includeInitialState`
> {
export interface SubscribeChangesSnapshotOptions<
T extends object = Record<string, unknown>,
> extends Omit<SubscribeChangesOptions<T>, `includeInitialState`> {
orderBy?: OrderBy
limit?: number
}
Expand Down
167 changes: 166 additions & 1 deletion packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { describe, expect, it, vi } from "vitest"
import mitt from "mitt"
import { createCollection } from "../src/collection/index.js"
import { createTransaction } from "../src/transactions"
import { eq } from "../src/query/builder/functions"
import { and, eq, gt } from "../src/query/builder/functions"
import { PropRef } from "../src/query/ir"
import type {
ChangeMessage,
Expand Down Expand Up @@ -1914,4 +1914,169 @@ describe(`Collection.subscribeChanges`, () => {
expect(collection.status).toBe(`ready`)
expect(collection.size).toBe(2)
})

it(`should support where callback for filtering changes`, () => {
const callback = vi.fn()

// Create collection with items that have a status field
const collection = createCollection<{
id: number
value: string
status: `active` | `inactive`
}>({
id: `where-callback-test`,
getKey: (item) => item.id,
sync: {
sync: ({ begin, write, commit }) => {
// Start with some initial data
begin()
write({
type: `insert`,
value: { id: 1, value: `item1`, status: `inactive` },
})
write({
type: `insert`,
value: { id: 2, value: `item2`, status: `active` },
})
commit()
},
},
})

const mutationFn: MutationFn = async () => {}

// Subscribe to changes with a where callback for active items only
const subscription = collection.subscribeChanges(callback, {
includeInitialState: true,
where: (row) => eq(row.status, `active`),
})

// Should only receive the active item in initial state
expect(callback).toHaveBeenCalledTimes(1)
const initialChanges = callback.mock.calls[0]![0] as ChangesPayload<{
id: number
value: string
status: `active` | `inactive`
}>
expect(initialChanges).toHaveLength(1)
expect(initialChanges[0]!.key).toBe(2)
expect(initialChanges[0]!.type).toBe(`insert`)

// Reset mock
callback.mockReset()

// Update an inactive item to active (should emit insert)
const tx1 = createTransaction({ mutationFn })
tx1.mutate(() =>
collection.update(1, (draft) => {
draft.status = `active`
})
)

// Should emit an insert event for the newly active item
expect(callback).toHaveBeenCalledTimes(1)
const insertChanges = callback.mock.calls[0]![0] as ChangesPayload<{
id: number
value: string
status: `active` | `inactive`
}>
expect(insertChanges).toHaveLength(1)
expect(insertChanges[0]!.type).toBe(`insert`)
expect(insertChanges[0]!.key).toBe(1)
expect(insertChanges[0]!.value.status).toBe(`active`)

// Reset mock
callback.mockReset()

// Update an active item to inactive (should emit delete)
const tx2 = createTransaction({ mutationFn })
tx2.mutate(() =>
collection.update(2, (draft) => {
draft.status = `inactive`
})
)

// Should emit a delete event for the newly inactive item
expect(callback).toHaveBeenCalledTimes(1)
const deleteChanges = callback.mock.calls[0]![0] as ChangesPayload<{
id: number
value: string
status: `active` | `inactive`
}>
expect(deleteChanges).toHaveLength(1)
expect(deleteChanges[0]!.type).toBe(`delete`)
expect(deleteChanges[0]!.key).toBe(2)

// Clean up
subscription.unsubscribe()
})

it(`should support where callback with multiple conditions`, () => {
const callback = vi.fn()

// Create collection with items
const collection = createCollection<{
id: number
value: string
status: `active` | `inactive`
priority: number
}>({
id: `where-callback-and-test`,
getKey: (item) => item.id,
sync: {
sync: ({ begin, write, commit }) => {
begin()
write({
type: `insert`,
value: { id: 1, value: `item1`, status: `active`, priority: 3 },
})
write({
type: `insert`,
value: { id: 2, value: `item2`, status: `active`, priority: 8 },
})
write({
type: `insert`,
value: { id: 3, value: `item3`, status: `inactive`, priority: 10 },
})
commit()
},
},
})

// Subscribe with where callback using and() for multiple conditions
const subscription = collection.subscribeChanges(callback, {
includeInitialState: true,
where: (row) => and(eq(row.status, `active`), gt(row.priority, 5)),
})

// Should only receive item2 (active AND priority > 5)
expect(callback).toHaveBeenCalledTimes(1)
const initialChanges = callback.mock.calls[0]![0] as ChangesPayload<any>
expect(initialChanges).toHaveLength(1)
expect(initialChanges[0]!.key).toBe(2)
expect(initialChanges[0]!.value).toEqual({
id: 2,
value: `item2`,
status: `active`,
priority: 8,
})

// Clean up
subscription.unsubscribe()
})

it(`should throw if both where and whereExpression are provided`, () => {
const collection = createCollection<{ id: number; status: string }>({
id: `where-both-error-test`,
getKey: (item) => item.id,
sync: { sync: () => {} },
})

expect(() => {
collection.subscribeChanges(() => {}, {
where: (row) => eq(row.status, `active`),
whereExpression: eq(new PropRef([`status`]), `active`),
})
}).toThrow(`Cannot specify both 'where' and 'whereExpression' options`)
})
})
Loading