Skip to content

Commit cbc5baf

Browse files
committed
convert deduper to a class, with reset, and dedupe inflight
1 parent 2d98811 commit cbc5baf

File tree

3 files changed

+238
-77
lines changed

3 files changed

+238
-77
lines changed

packages/db/src/query/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,4 @@ export {
7070
unionPredicates,
7171
} from "./predicate-utils.js"
7272

73-
export { createDeduplicatedLoadSubset } from "./subset-dedupe.js"
73+
export { DeduplicatedLoadSubset } from "./subset-dedupe.js"

packages/db/src/query/subset-dedupe.ts

Lines changed: 178 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,50 +7,83 @@ import type { BasicExpression } from "./ir.js"
77
import type { LoadSubsetOptions } from "../types.js"
88

99
/**
10-
* Creates a deduplicated wrapper around a loadSubset function.
11-
* Tracks what data has been loaded and avoids redundant calls.
12-
*
13-
* @param loadSubset - The underlying loadSubset function to wrap
14-
* @returns A wrapped function that deduplicates calls based on loaded predicates
10+
* Deduplicated wrapper for a loadSubset function.
11+
* Tracks what data has been loaded and avoids redundant calls by applying
12+
* subset logic to predicates.
1513
*
1614
* @example
17-
* const deduplicatedLoadSubset = createDeduplicatedLoadSubset(myLoadSubset)
15+
* const dedupe = new DeduplicatedLoadSubset(myLoadSubset)
1816
*
1917
* // First call - fetches data
20-
* await deduplicatedLoadSubset({ where: gt(ref('age'), val(10)) })
18+
* await dedupe.loadSubset({ where: gt(ref('age'), val(10)) })
2119
*
2220
* // Second call - subset of first, returns true immediately
23-
* await deduplicatedLoadSubset({ where: gt(ref('age'), val(20)) })
21+
* await dedupe.loadSubset({ where: gt(ref('age'), val(20)) })
22+
*
23+
* // Clear state to start fresh
24+
* dedupe.reset()
2425
*/
25-
export function createDeduplicatedLoadSubset(
26-
loadSubset: (options: LoadSubsetOptions) => true | Promise<void>
27-
): (options: LoadSubsetOptions) => true | Promise<void> {
26+
export class DeduplicatedLoadSubset {
27+
// The underlying loadSubset function to wrap
28+
private readonly _loadSubset: (
29+
options: LoadSubsetOptions
30+
) => true | Promise<void>
31+
2832
// Combined where predicate for all unlimited calls (no limit)
29-
let unlimitedWhere: BasicExpression<boolean> | undefined = undefined
33+
private unlimitedWhere: BasicExpression<boolean> | undefined = undefined
3034

3135
// Flag to track if we've loaded all data (unlimited call with no where clause)
32-
let hasLoadedAllData = false
36+
private hasLoadedAllData = false
3337

3438
// List of all limited calls (with limit, possibly with orderBy)
35-
const limitedCalls: Array<LoadSubsetOptions> = []
39+
// We clone options before storing to prevent mutation of stored predicates
40+
private limitedCalls: Array<LoadSubsetOptions> = []
41+
42+
// Track in-flight calls to prevent concurrent duplicate requests
43+
// We store both the options and the promise so we can apply subset logic
44+
private inflightCalls: Array<{
45+
options: LoadSubsetOptions
46+
promise: Promise<void>
47+
}> = []
48+
49+
// Generation counter to invalidate in-flight requests after reset()
50+
// When reset() is called, this increments, and any in-flight completion handlers
51+
// check if their captured generation matches before updating tracking state
52+
private generation = 0
53+
54+
constructor(
55+
loadSubset: (options: LoadSubsetOptions) => true | Promise<void>
56+
) {
57+
this._loadSubset = loadSubset
58+
}
3659

37-
return (options: LoadSubsetOptions) => {
60+
/**
61+
* Load a subset of data, with automatic deduplication based on previously
62+
* loaded predicates and in-flight requests.
63+
*
64+
* This method is auto-bound, so it can be safely passed as a callback without
65+
* losing its `this` context (e.g., `loadSubset: dedupe.loadSubset` in a sync config).
66+
*
67+
* @param options - The predicate options (where, orderBy, limit)
68+
* @returns true if data is already loaded, or a Promise that resolves when data is loaded
69+
*/
70+
loadSubset = (options: LoadSubsetOptions): true | Promise<void> => {
3871
// If we've loaded all data, everything is covered
39-
if (hasLoadedAllData) {
72+
if (this.hasLoadedAllData) {
4073
return true
4174
}
4275

4376
// Check against unlimited combined predicate
4477
// If we've loaded all data matching a where clause, we don't need to refetch subsets
45-
if (unlimitedWhere !== undefined && options.where !== undefined) {
46-
if (isWhereSubset(options.where, unlimitedWhere)) {
78+
if (this.unlimitedWhere !== undefined && options.where !== undefined) {
79+
if (isWhereSubset(options.where, this.unlimitedWhere)) {
4780
return true // Data already loaded via unlimited call
4881
}
4982
}
5083

5184
// Check against limited calls
5285
if (options.limit !== undefined) {
53-
const alreadyLoaded = limitedCalls.some((loaded) =>
86+
const alreadyLoaded = this.limitedCalls.some((loaded) =>
5487
isPredicateSubset(options, loaded)
5588
)
5689

@@ -59,40 +92,152 @@ export function createDeduplicatedLoadSubset(
5992
}
6093
}
6194

95+
// Check against in-flight calls using the same subset logic as resolved calls
96+
// This prevents duplicate requests when concurrent calls have subset relationships
97+
const matchingInflight = this.inflightCalls.find((inflight) => {
98+
// For unlimited calls, check if the incoming where is a subset of the in-flight where
99+
if (inflight.options.limit === undefined && options.limit === undefined) {
100+
// Both unlimited - check where subset
101+
if (inflight.options.where === undefined) {
102+
// In-flight is loading all data, so incoming is covered
103+
return true
104+
}
105+
if (options.where !== undefined) {
106+
return isWhereSubset(options.where, inflight.options.where)
107+
}
108+
return false
109+
}
110+
111+
// For limited calls, use the full predicate subset check (where + orderBy + limit)
112+
if (inflight.options.limit !== undefined && options.limit !== undefined) {
113+
return isPredicateSubset(options, inflight.options)
114+
}
115+
116+
// Mixed unlimited/limited - limited calls can be covered by unlimited calls
117+
if (inflight.options.limit === undefined && options.limit !== undefined) {
118+
// In-flight is unlimited, incoming is limited
119+
if (inflight.options.where === undefined) {
120+
// In-flight is loading all data
121+
return true
122+
}
123+
if (options.where !== undefined) {
124+
return isWhereSubset(options.where, inflight.options.where)
125+
}
126+
}
127+
128+
return false
129+
})
130+
131+
if (matchingInflight !== undefined) {
132+
// An in-flight call will load data that covers this request
133+
// Return the same promise so this caller waits for the data to load
134+
// The in-flight promise already handles tracking updates when it completes
135+
return matchingInflight.promise
136+
}
137+
62138
// Not covered by existing data - call underlying loadSubset
63-
const resultPromise = loadSubset(options)
139+
const resultPromise = this._loadSubset(options)
64140

65141
// Handle both sync (true) and async (Promise<void>) return values
66142
if (resultPromise === true) {
67143
// Sync return - update tracking synchronously
68-
updateTracking(options)
144+
// Clone options before storing to protect against caller mutation
145+
this.updateTracking(cloneOptions(options))
69146
return true
70147
} else {
71-
// Async return - update tracking after promise resolves
72-
return resultPromise.then((result) => {
73-
updateTracking(options)
74-
return result
75-
})
148+
// Async return - track the promise and update tracking after it resolves
149+
// Clone options BEFORE entering async context to prevent mutation issues
150+
const clonedOptions = cloneOptions(options)
151+
152+
// Capture the current generation - this lets us detect if reset() was called
153+
// while this request was in-flight, so we can skip updating tracking state
154+
const capturedGeneration = this.generation
155+
156+
// We need to create a reference to the in-flight entry so we can remove it later
157+
const inflightEntry = {
158+
options: clonedOptions, // Store cloned options for subset matching
159+
promise: resultPromise
160+
.then((result) => {
161+
// Only update tracking if this request is still from the current generation
162+
// If reset() was called, the generation will have incremented and we should
163+
// not repopulate the state that was just cleared
164+
if (capturedGeneration === this.generation) {
165+
// Use the cloned options that we captured before any caller mutations
166+
// This ensures we track exactly what was loaded, not what the caller changed
167+
this.updateTracking(clonedOptions)
168+
}
169+
return result
170+
})
171+
.finally(() => {
172+
// Always remove from in-flight array on completion OR rejection
173+
// This ensures failed requests can be retried instead of being cached forever
174+
const index = this.inflightCalls.indexOf(inflightEntry)
175+
if (index !== -1) {
176+
this.inflightCalls.splice(index, 1)
177+
}
178+
}),
179+
}
180+
181+
// Store the in-flight entry so concurrent subset calls can wait for it
182+
this.inflightCalls.push(inflightEntry)
183+
return inflightEntry.promise
76184
}
77185
}
78186

79-
function updateTracking(options: LoadSubsetOptions) {
187+
/**
188+
* Reset all tracking state.
189+
* Clears the history of loaded predicates and in-flight calls.
190+
* Use this when you want to start fresh, for example after clearing the underlying data store.
191+
*
192+
* Note: Any in-flight requests will still complete, but they will not update the tracking
193+
* state after the reset. This prevents old requests from repopulating cleared state.
194+
*/
195+
reset(): void {
196+
this.unlimitedWhere = undefined
197+
this.hasLoadedAllData = false
198+
this.limitedCalls = []
199+
this.inflightCalls = []
200+
// Increment generation to invalidate any in-flight completion handlers
201+
// This ensures requests that were started before reset() don't repopulate the state
202+
this.generation++
203+
}
204+
205+
private updateTracking(options: LoadSubsetOptions): void {
80206
// Update tracking based on whether this was a limited or unlimited call
81207
if (options.limit === undefined) {
82208
// Unlimited call - update combined where predicate
83209
// We ignore orderBy for unlimited calls as mentioned in requirements
84210
if (options.where === undefined) {
85211
// No where clause = all data loaded
86-
hasLoadedAllData = true
87-
unlimitedWhere = undefined
88-
} else if (unlimitedWhere === undefined) {
89-
unlimitedWhere = options.where
212+
this.hasLoadedAllData = true
213+
this.unlimitedWhere = undefined
214+
} else if (this.unlimitedWhere === undefined) {
215+
this.unlimitedWhere = options.where
90216
} else {
91-
unlimitedWhere = unionWherePredicates([unlimitedWhere, options.where])
217+
this.unlimitedWhere = unionWherePredicates([
218+
this.unlimitedWhere,
219+
options.where,
220+
])
92221
}
93222
} else {
94223
// Limited call - add to list for future subset checks
95-
limitedCalls.push(options)
224+
// Options are already cloned by caller to prevent mutation issues
225+
this.limitedCalls.push(options)
96226
}
97227
}
98228
}
229+
230+
/**
231+
* Clones a LoadSubsetOptions object to prevent mutation of stored predicates.
232+
* This is crucial because callers often reuse the same options object and mutate
233+
* properties like limit or where between calls. Without cloning, our stored history
234+
* would reflect the mutated values rather than what was actually loaded.
235+
*/
236+
function cloneOptions(options: LoadSubsetOptions): LoadSubsetOptions {
237+
return {
238+
where: options.where,
239+
orderBy: options.orderBy,
240+
limit: options.limit,
241+
// Note: We don't clone subscription as it's not part of predicate matching
242+
}
243+
}

0 commit comments

Comments
 (0)