Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit 26d491b

Browse files
authored
feat: new callbacks (#3)
No more do we need to prepare the resolver. The `subscribe` function now takes all the callbacks. We also now have - filter?: object | ((...args: SubscribeArgs) => object) - onSubscribe?: (...args: SubscribeArgs) => void | Promise<void> - onComplete?: (...args: SubscribeArgs) => void | Promise<void> - onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
1 parent 8c7dc70 commit 26d491b

File tree

20 files changed

+393
-465
lines changed

20 files changed

+393
-465
lines changed

README.md

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -419,23 +419,6 @@ Side effect handlers can be declared on subscription fields to handle `onSubscri
419419

420420
<details>
421421

422-
<summary>📖 Enabling side effects</summary>
423-
424-
For `onSubscribe` and `onComplete` side effects to work, resolvers must first be passed to `prepareResolvers` prior to schema construction.
425-
426-
```ts
427-
import { prepareResolvers } from 'subscriptionless/subscribe';
428-
429-
const schema = makeExecutableSchema({
430-
typedefs,
431-
resolvers: prepareResolvers(resolvers),
432-
});
433-
```
434-
435-
</details>
436-
437-
<details>
438-
439422
<summary>📖 Adding side-effect handlers</summary>
440423

441424
```ts
@@ -445,13 +428,12 @@ export const resolver = {
445428
resolve: (event, args, context) => {
446429
/* ... */
447430
},
448-
subscribe: subscribe('MY_TOPIC'),
449-
onSubscribe: (root, args) => {
450-
/* Do something on subscription start */
451-
},
452-
onComplete: (root, args) => {
453-
/* Do something on subscription stop */
454-
},
431+
subscribe: subscribe('MY_TOPIC', {
432+
// filter?: object | ((...args: SubscribeArgs) => object)
433+
// onSubscribe?: (...args: SubscribeArgs) => void | Promise<void>
434+
// onComplete?: (...args: SubscribeArgs) => void | Promise<void>
435+
// onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
436+
}),
455437
},
456438
},
457439
};

lib/gateway.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,39 +37,39 @@ export const handleGatewayEvent =
3737
const message = JSON.parse(event.body!)
3838

3939
if (message.type === MessageType.ConnectionInit) {
40-
await connection_init(c)({ event, message })
40+
await connection_init({ c, event, message })
4141
return {
4242
statusCode: 200,
4343
body: '',
4444
}
4545
}
4646

4747
if (message.type === MessageType.Subscribe) {
48-
await subscribe(c)({ event, message })
48+
await subscribe({ c, event, message })
4949
return {
5050
statusCode: 200,
5151
body: '',
5252
}
5353
}
5454

5555
if (message.type === MessageType.Complete) {
56-
await complete(c)({ event, message })
56+
await complete({ c, event, message })
5757
return {
5858
statusCode: 200,
5959
body: '',
6060
}
6161
}
6262

6363
if (message.type === MessageType.Ping) {
64-
await ping(c)({ event, message })
64+
await ping({ c, event, message })
6565
return {
6666
statusCode: 200,
6767
body: '',
6868
}
6969
}
7070

7171
if (message.type === MessageType.Pong) {
72-
await pong(c)({ event, message })
72+
await pong({ c, event, message })
7373
return {
7474
statusCode: 200,
7575
body: '',
@@ -78,7 +78,7 @@ export const handleGatewayEvent =
7878
}
7979

8080
if (event.requestContext.eventType === 'DISCONNECT') {
81-
await disconnect(c)({ event, message: null })
81+
await disconnect({ c, event, message: null })
8282
return {
8383
statusCode: 200,
8484
body: '',

lib/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ export const createInstance = (opts: ServerArgs) => {
3333
}
3434
}
3535

36-
export { prepareResolvers } from './utils/graphql'
3736
export * from './pubsub/subscribe'
3837

3938
export * from './types'

lib/messages/complete.ts

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,55 @@
11
import { parse } from 'graphql'
22
import { CompleteMessage } from 'graphql-ws'
33
import { buildExecutionContext } from 'graphql/execution/execute'
4+
import { SubscribePsuedoIterable } from '../types'
45
import { deleteConnection } from '../utils/aws'
56
import { constructContext, getResolverAndArgs } from '../utils/graphql'
67
import { MessageHandler } from './types'
78

89
/** Handler function for 'complete' message. */
910
export const complete: MessageHandler<CompleteMessage> =
10-
(c) =>
11-
async ({ event, message }) => {
12-
try {
13-
await c.onComplete?.({ event, message })
14-
15-
const topicSubscriptions = await c.mapper.query(c.model.Subscription, {
16-
id: `${event.requestContext.connectionId!}|${message.id}`,
17-
})
18-
19-
let deletions = [] as Promise<any>[]
20-
for await (const entity of topicSubscriptions) {
21-
deletions = [
22-
...deletions,
23-
(async () => {
11+
async ({ c, event, message }) => {
12+
try {
13+
const topicSubscriptions = await c.mapper.query(c.model.Subscription, {
14+
id: `${event.requestContext.connectionId!}|${message.id}`,
15+
})
16+
let deletions = [] as Promise<any>[]
17+
for await (const entity of topicSubscriptions) {
18+
deletions = [
19+
...deletions,
20+
(async () => {
2421
// only call onComplete per subscription
25-
if (deletions.length === 0) {
26-
const execContext = buildExecutionContext(
27-
c.schema,
28-
parse(entity.subscription.query),
29-
undefined,
30-
await constructContext(c)(entity),
31-
entity.subscription.variables,
32-
entity.subscription.operationName,
33-
undefined,
34-
)
35-
36-
if (!('operation' in execContext)) {
37-
throw execContext
38-
}
22+
if (deletions.length === 0) {
23+
const execContext = buildExecutionContext(
24+
c.schema,
25+
parse(entity.subscription.query),
26+
undefined,
27+
await constructContext(c)(entity),
28+
entity.subscription.variables,
29+
entity.subscription.operationName,
30+
undefined,
31+
)
32+
33+
if (!('operation' in execContext)) {
34+
throw execContext
35+
}
3936

40-
const [field, root, args, context, info] =
41-
getResolverAndArgs(c)(execContext)
37+
const [field, root, args, context, info] = getResolverAndArgs(c)(execContext)
4238

43-
const onComplete = field.resolve.onComplete
44-
if (onComplete) {
45-
await onComplete(root, args, context, info)
46-
}
39+
const onComplete = (field?.subscribe as SubscribePsuedoIterable)?.onComplete
40+
if (onComplete) {
41+
await onComplete(root, args, context, info)
4742
}
43+
}
4844

49-
await c.mapper.delete(entity)
50-
})(),
51-
]
52-
}
53-
54-
await Promise.all(deletions)
55-
} catch (err) {
56-
await c.onError?.(err, { event, message })
57-
await deleteConnection(c)(event.requestContext)
45+
await c.mapper.delete(entity)
46+
})(),
47+
]
5848
}
49+
50+
await Promise.all(deletions)
51+
} catch (err) {
52+
await c.onError?.(err, { event, message })
53+
await deleteConnection(c)(event.requestContext)
5954
}
55+
}

lib/messages/connection_init.ts

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,42 @@ import { MessageHandler } from './types'
66

77
/** Handler function for 'connection_init' message. */
88
export const connection_init: MessageHandler<ConnectionInitMessage> =
9-
(c) =>
10-
async ({ event, message }) => {
11-
try {
12-
const res = c.onConnectionInit
13-
? await c.onConnectionInit({ event, message })
14-
: message.payload
9+
async ({ c, event, message }) => {
10+
try {
11+
const res = c.onConnectionInit
12+
? await c.onConnectionInit({ event, message })
13+
: message.payload
1514

16-
if (c.pingpong) {
17-
await new StepFunctions()
18-
.startExecution({
19-
stateMachineArn: c.pingpong.machine,
20-
name: event.requestContext.connectionId!,
21-
input: JSON.stringify({
22-
connectionId: event.requestContext.connectionId!,
23-
domainName: event.requestContext.domainName!,
24-
stage: event.requestContext.stage,
25-
state: 'PING',
26-
choice: 'WAIT',
27-
seconds: c.pingpong.delay,
28-
} as StateFunctionInput),
29-
})
30-
.promise()
31-
}
32-
33-
// Write to persistence
34-
const connection = Object.assign(new c.model.Connection(), {
35-
id: event.requestContext.connectionId!,
36-
requestContext: event.requestContext,
37-
payload: res,
38-
})
39-
await c.mapper.put(connection)
40-
return sendMessage(c)({
41-
...event.requestContext,
42-
message: { type: MessageType.ConnectionAck },
43-
})
44-
} catch (err) {
45-
await c.onError?.(err, { event, message })
46-
await deleteConnection(c)(event.requestContext)
15+
if (c.pingpong) {
16+
await new StepFunctions()
17+
.startExecution({
18+
stateMachineArn: c.pingpong.machine,
19+
name: event.requestContext.connectionId!,
20+
input: JSON.stringify({
21+
connectionId: event.requestContext.connectionId!,
22+
domainName: event.requestContext.domainName!,
23+
stage: event.requestContext.stage,
24+
state: 'PING',
25+
choice: 'WAIT',
26+
seconds: c.pingpong.delay,
27+
} as StateFunctionInput),
28+
})
29+
.promise()
4730
}
31+
32+
// Write to persistence
33+
const connection = Object.assign(new c.model.Connection(), {
34+
id: event.requestContext.connectionId!,
35+
requestContext: event.requestContext,
36+
payload: res,
37+
})
38+
await c.mapper.put(connection)
39+
return sendMessage(c)({
40+
...event.requestContext,
41+
message: { type: MessageType.ConnectionAck },
42+
})
43+
} catch (err) {
44+
await c.onError?.(err, { event, message })
45+
await deleteConnection(c)(event.requestContext)
4846
}
47+
}

0 commit comments

Comments
 (0)