Skip to content

Commit 9a9a09a

Browse files
committed
feat: flows
1 parent f74b780 commit 9a9a09a

2 files changed

Lines changed: 294 additions & 51 deletions

File tree

src/index.ts

Lines changed: 205 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,38 @@
11
import type { StandardSchemaV1 } from '@standard-schema/spec'
2-
import type { Job as BullJob, QueueOptions as BullQueueOptions } from 'bullmq'
32
import type {
3+
FlowJob as BullFlowJob,
4+
Job as BullJob,
5+
QueueOptions as BullQueueOptions,
6+
} from 'bullmq'
7+
import type {
8+
DefineFlowOptions,
49
DefineJobOptions,
10+
Flow,
11+
FlowStep,
512
Job,
613
JobDefinitionsObject,
14+
QueueFlowProxyAccessor,
715
QueueJobProxy,
816
QueueJobProxyAccessor,
917
WorkerOptions,
1018
} from './types'
11-
import {} from '@standard-schema/spec'
12-
import { Queue as BullQueue, Worker as BullWorker } from 'bullmq'
19+
import { Queue as BullQueue, Worker as BullWorker, FlowProducer } from 'bullmq'
1320
import IORedis from 'ioredis'
14-
import { internalSymbol, StandardSchemaV1Error } from './types'
21+
import { FlowBuilder, flowSymbol, jobSymbol, StandardSchemaV1Error } from './types'
1522

16-
export function defineJob<Schema extends StandardSchemaV1>(
17-
opts: DefineJobOptions<Schema>,
18-
): Job<Schema> {
23+
export function defineJob<Schema extends StandardSchemaV1, Output>(
24+
opts: DefineJobOptions<Schema, Output>,
25+
): Job<Schema, Output> {
1926
return {
2027
...opts,
21-
[internalSymbol]: {
22-
async addToQueue(queue: BullQueue, payload: unknown) {
28+
[jobSymbol]: <Job<Schema, Output>[typeof jobSymbol]>{
29+
async addToQueue(queue, payload) {
2330
const parsed = await opts.schema['~standard'].validate(payload)
2431
if (parsed.issues) throw new StandardSchemaV1Error(parsed.issues)
2532

2633
return queue.add(queue.name, parsed.value)
2734
},
28-
async addToQueueBulk(queue: BullQueue, payloads: unknown[]) {
35+
async addToQueueBulk(queue, payloads) {
2936
return queue.addBulk(
3037
await Promise.all(
3138
payloads.map(async (payload) => {
@@ -44,47 +51,149 @@ export function defineJob<Schema extends StandardSchemaV1>(
4451
}
4552
}
4653

54+
// const flowProducter = new FlowProducer()
55+
function buildFlowJobStack(opts: {
56+
rootInputPayload: unknown
57+
steps: FlowStep<any, any>[]
58+
flowName: string
59+
}) {
60+
if (opts.steps.length === 0) throw new Error(`Flow ${opts.flowName} has no steps`)
61+
const firstStep = opts.steps[0]!
62+
63+
let currentStep: BullFlowJob = {
64+
name: `${opts.flowName}_${firstStep.name}`,
65+
queueName: `${opts.flowName}_${firstStep.name}`,
66+
data: opts.rootInputPayload,
67+
}
68+
69+
for (const step of opts.steps.slice(1)) {
70+
currentStep = {
71+
name: `${opts.flowName}_${step.name}`,
72+
queueName: `${opts.flowName}_${step.name}`,
73+
children: [currentStep],
74+
}
75+
}
76+
77+
return {
78+
name: `${opts.flowName}`,
79+
queueName: `${opts.flowName}`,
80+
children: [currentStep],
81+
} satisfies BullFlowJob
82+
}
83+
84+
export function defineFlow<Schema extends StandardSchemaV1, Output>(
85+
opts: DefineFlowOptions<Schema, Output>,
86+
): Flow<Schema, Output> {
87+
const steps = opts.flow(new FlowBuilder()).steps
88+
return {
89+
...opts,
90+
[flowSymbol]: <Flow<Schema, Output>[typeof flowSymbol]>{
91+
steps,
92+
async addToQueue(flowName, flowProducer, payload) {
93+
const parsed = await opts.schema['~standard'].validate(payload)
94+
if (parsed.issues) throw new StandardSchemaV1Error(parsed.issues)
95+
96+
return flowProducer.add(
97+
buildFlowJobStack({
98+
rootInputPayload: parsed.value,
99+
steps,
100+
flowName,
101+
}),
102+
)
103+
},
104+
async addToQueueBulk(flowName, flowProducer, payloads) {
105+
return flowProducer.addBulk(
106+
await Promise.all(
107+
payloads.map(async (payload) => {
108+
const parsed = await opts.schema['~standard'].validate(payload)
109+
if (parsed.issues) throw new StandardSchemaV1Error(parsed.issues)
110+
111+
return buildFlowJobStack({
112+
flowName,
113+
rootInputPayload: parsed.value,
114+
steps,
115+
})
116+
}),
117+
),
118+
)
119+
},
120+
},
121+
}
122+
}
123+
47124
export function defineQueues<J extends JobDefinitionsObject>(jobs: J, opts?: BullQueueOptions) {
48125
const connection =
49126
opts?.connection ??
50127
new IORedis({
51128
maxRetriesPerRequest: null,
52129
})
53-
const queues = new Map<string, BullQueue>()
54130

131+
const queues = new Map<string, BullQueue<BullJob<any, any, string>>>()
55132
async function getQueue(jobName: string) {
56-
let queue = queues.get(jobName)
57-
if (!queue) {
58-
queue = new BullQueue(jobName, {
59-
...opts,
60-
connection,
61-
})
62-
queues.set(jobName, queue)
63-
await queue.waitUntilReady()
64-
}
133+
if (queues.has(jobName)) return queues.get(jobName)!
134+
135+
const queue = new BullQueue<BullJob<any, any, string>>(jobName, {
136+
...opts,
137+
defaultJobOptions: {
138+
removeOnComplete: true,
139+
attempts: 3,
140+
backoff: {
141+
type: 'exponential',
142+
delay: 2000,
143+
},
144+
...opts?.defaultJobOptions,
145+
},
146+
connection,
147+
})
148+
queues.set(jobName, queue)
149+
await queue.waitUntilReady()
65150

66151
return queue
67152
}
68153

154+
let flowProducer: FlowProducer | null = null
155+
async function getFlowProducer() {
156+
if (flowProducer) return flowProducer
157+
158+
flowProducer = new FlowProducer({
159+
connection,
160+
})
161+
await flowProducer.waitUntilReady()
162+
return flowProducer
163+
}
164+
69165
function createProxy(obj: JobDefinitionsObject, path: string[]) {
70166
return new Proxy(obj, {
71167
get(target, p, receiver) {
72168
if (typeof p !== 'string') return
73169

74-
const fullPath = [...path, p]
75170
const jobOrJobs = Reflect.get(target, p, receiver)
171+
if (typeof jobOrJobs !== 'object') return
76172

77-
if (jobOrJobs && typeof jobOrJobs === 'object' && internalSymbol in jobOrJobs) {
173+
const fullPath = [...path, p]
174+
175+
if (jobSymbol in jobOrJobs) {
78176
const jobName = fullPath.join('-')
79177

80178
const accessor = async (payload: unknown) => {
81-
return jobOrJobs[internalSymbol].addToQueue(await getQueue(jobName), payload)
179+
return jobOrJobs[jobSymbol].addToQueue(await getQueue(jobName), payload)
180+
}
181+
accessor.bulk = async (payloads: unknown[]) => {
182+
return jobOrJobs[jobSymbol].addToQueueBulk(await getQueue(jobName), payloads)
183+
}
184+
185+
return accessor satisfies QueueJobProxyAccessor<any, any>
186+
} else if (flowSymbol in jobOrJobs) {
187+
const flowName = fullPath.join('-')
188+
189+
const accessor = async (payload: unknown) => {
190+
return jobOrJobs[flowSymbol].addToQueue(flowName, await getFlowProducer(), payload)
82191
}
83192
accessor.bulk = async (payloads: unknown[]) => {
84-
return jobOrJobs[internalSymbol].addToQueueBulk(await getQueue(jobName), payloads)
193+
return jobOrJobs[flowSymbol].addToQueueBulk(flowName, await getFlowProducer(), payloads)
85194
}
86195

87-
return accessor satisfies QueueJobProxyAccessor<any>
196+
return accessor satisfies QueueFlowProxyAccessor<any>
88197
}
89198

90199
return createProxy(jobOrJobs, fullPath)
@@ -94,7 +203,10 @@ export function defineQueues<J extends JobDefinitionsObject>(jobs: J, opts?: Bul
94203
return createProxy(jobs, []) as unknown as QueueJobProxy<J>
95204
}
96205

97-
export async function startWorkers<J extends JobDefinitionsObject>(jobs: J, opts?: WorkerOptions) {
206+
export async function startWorkers<J extends JobDefinitionsObject>(
207+
jobs: J,
208+
opts?: WorkerOptions<any, any>,
209+
) {
98210
const connection =
99211
opts?.connection ??
100212
new IORedis({
@@ -108,15 +220,14 @@ export async function startWorkers<J extends JobDefinitionsObject>(jobs: J, opts
108220

109221
const fullPath = [...path, key]
110222

111-
if (typeof value === 'object' && internalSymbol in value) {
223+
if (jobSymbol in value) {
112224
const jobName = fullPath.join('-')
113225

114-
const hooks = value.workerOptions?.hooks ?? opts?.hooks
115-
116226
const worker = new BullWorker(
117227
jobName,
118228
async (job) => {
119-
return value.handler(job.data, job as BullJob<any, void, string>)
229+
// eslint-disable-next-line ts/no-unsafe-return
230+
return value.run(job.data, job)
120231
},
121232
{
122233
...opts,
@@ -125,12 +236,76 @@ export async function startWorkers<J extends JobDefinitionsObject>(jobs: J, opts
125236
},
126237
)
127238

239+
const hooks = value.workerOptions?.hooks ?? opts?.hooks
128240
if (hooks)
129241
for (const [hookName, hook] of Object.entries(hooks)) {
130242
worker.addListener(hookName, hook)
131243
}
132244

133245
workers.set(jobName, worker)
246+
} else if (flowSymbol in value) {
247+
const flowName = fullPath.join('-')
248+
249+
const worker = new BullWorker(
250+
flowName,
251+
async (job) => {
252+
// eslint-disable-next-line ts/no-unsafe-return
253+
const results = await job.getChildrenValues().then((res) => Object.values(res))
254+
if (results.length !== 1)
255+
throw new Error('Flow root job should have exactly one child job')
256+
257+
// eslint-disable-next-line ts/no-unsafe-return
258+
return results[0]
259+
},
260+
{
261+
...opts,
262+
...value.workerOptions,
263+
connection,
264+
},
265+
)
266+
267+
const hooks = value.workerOptions?.hooks ?? opts?.hooks
268+
if (hooks)
269+
for (const [hookName, hook] of Object.entries(hooks)) {
270+
worker.addListener(hookName, hook)
271+
}
272+
273+
workers.set(flowName, worker)
274+
275+
// add workers for each step
276+
for (const step of value[flowSymbol].steps) {
277+
const jobName = `${flowName}_${step.name}`
278+
const stepWorker = new BullWorker(
279+
jobName,
280+
async (job) => {
281+
// first step gets job data as input
282+
// eslint-disable-next-line ts/no-unsafe-return
283+
if (value[flowSymbol].steps.indexOf(step) === 0) return step.run(job.data, job)
284+
285+
// eslint-disable-next-line ts/no-unsafe-return
286+
const results = await job.getChildrenValues().then((res) => Object.values(res))
287+
if (results.length !== 1)
288+
throw new Error('Flow job should have exactly one child job')
289+
290+
// eslint-disable-next-line ts/no-unsafe-assignment
291+
const input = results[0]
292+
// eslint-disable-next-line ts/no-unsafe-return
293+
return step.run(input, job)
294+
},
295+
{
296+
...opts,
297+
...step.workerOptions,
298+
connection,
299+
},
300+
)
301+
302+
if (hooks)
303+
for (const [hookName, hook] of Object.entries(hooks)) {
304+
stepWorker.addListener(hookName, hook)
305+
}
306+
307+
workers.set(jobName, stepWorker)
308+
}
134309
} else {
135310
traverse(value, fullPath)
136311
}

0 commit comments

Comments
 (0)