Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit e93e318

Browse files
authored
feat: more logging and test onComplete (#33)
- cleanup some tests and types
1 parent b2a3842 commit e93e318

File tree

8 files changed

+191
-72
lines changed

8 files changed

+191
-72
lines changed

README.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,28 @@
33

44
This is a fork of [subscriptionless](https://github.com/andyrichardson/subscriptionless) that is built to work with [Architect](https://arc.codes) and tested with the [Architect Sandbox](https://arc.codes/docs/en/reference/cli/sandbox). There's no reason why it wont work with Serverless or other deploy tools but their support is not a goal.
55

6+
## API
67

7-
# Old Readme
8+
### `subscribe(topic: string, options?: SubscribeOptions): SubscribePseudoIterable`
9+
10+
Subscribe is the most important method in the library. It's the primary difference between `graphql-ws` and `graphql-lambda-subscriptions`. It returns a `SubscribePseudoIterable` that pretends to be an async iterator that you put on the `subscribe` resolver for your Subscription. In reality it includes a few properties that we use to subscribe to events and fire lifecycle functions.
11+
12+
```ts
13+
interface SubscribeOptions {
14+
filter?: (...args: TSubscribeArgs) => MaybePromise<Partial<Payload>|void>;
15+
onSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
16+
onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>;
17+
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>;
18+
}
19+
```
20+
21+
- `topic`: The you subscribe to the topic and can filter based upon the topics payload.
22+
- `filter`: An object that the payload will be matched against (or a function that produces the object). If the payload's field matches the subscription will receive the event. If the payload is missing the field the subscription will receive the event.
23+
- `onSubscribe`: A function that gets the subscription information (like arguments) it can throw if you don't want the subscription to subscribe.
24+
- `onAfterSubscribe`: A function that gets the subscription information (like arguments) and can fire initial events or record information.
25+
- `onComplete`: A function that fires at least once when a connection disconnects, a client sends a "complete" message, or the server sends a "complete" message. Because of the nature of aws lambda, it's possible for a client to send a "complete" message and disconnect and those events executing on lambda out of order. Which why this function can be called up to twice.
26+
27+
## Old Readme
828

929
## About
1030

@@ -282,7 +302,7 @@ Wrap any `subscribe` function call in a `withFilter` to provide filter condition
282302
> Note: If a function is provided, it will be called **on subscription start** and must return a serializable object.
283303

284304
```ts
285-
import { withFilter, subscribe } from 'subscriptionless/subscribe';
305+
import { subscribe } from 'subscriptionless/subscribe';
286306
287307
// Subscription agnostic filter
288308
withFilter(subscribe('MY_TOPIC'), {

lib/messages/complete.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { isArray } from '../utils/isArray'
1212
/** Handler function for 'complete' message. */
1313
export const complete: MessageHandler<CompleteMessage> =
1414
async ({ server, event, message }) => {
15+
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
1516
try {
1617
const topicSubscriptions = await collect(server.mapper.query(server.model.Subscription, {
1718
id: `${event.requestContext.connectionId}|${message.id}`,
@@ -38,12 +39,12 @@ export const complete: MessageHandler<CompleteMessage> =
3839
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
3940

4041
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
41-
if (onComplete) {
42-
await onComplete(root, args, context, info)
43-
}
42+
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
43+
await onComplete?.(root, args, context, info)
4444

4545
await Promise.all(topicSubscriptions.map(sub => server.mapper.delete(sub)))
4646
} catch (err) {
47+
server.log('messages:complete:onError', { err, event })
4748
await server.onError?.(err, { event, message })
4849
await deleteConnection(server)(event.requestContext)
4950
}

lib/messages/disconnect.ts

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,67 +10,67 @@ import { collect } from 'streaming-iterables'
1010
import { Connection } from '../model/Connection'
1111

1212
/** Handler function for 'disconnect' message. */
13-
export const disconnect: MessageHandler<null> =
14-
async ({ server, event }) => {
15-
try {
16-
await server.onDisconnect?.({ event })
13+
export const disconnect: MessageHandler<null> = async ({ server, event }) => {
14+
server.log('messages:disconnect', { connectionId: event.requestContext.connectionId })
15+
try {
16+
await server.onDisconnect?.({ event })
1717

18-
const topicSubscriptions = await collect(server.mapper.query(
19-
server.model.Subscription,
20-
{
21-
connectionId: equals(event.requestContext.connectionId),
22-
},
23-
{ indexName: 'ConnectionIndex' },
24-
))
18+
const topicSubscriptions = await collect(server.mapper.query(
19+
server.model.Subscription,
20+
{
21+
connectionId: equals(event.requestContext.connectionId),
22+
},
23+
{ indexName: 'ConnectionIndex' },
24+
))
2525

26-
const completed = {} as Record<string, boolean>
27-
const deletions = [] as Promise<void|Connection>[]
28-
for (const sub of topicSubscriptions) {
29-
deletions.push(
30-
(async () => {
31-
// only call onComplete per subscription
32-
if (!completed[sub.subscriptionId]) {
33-
completed[sub.subscriptionId] = true
26+
const completed = {} as Record<string, boolean>
27+
const deletions = [] as Promise<void|Connection>[]
28+
for (const sub of topicSubscriptions) {
29+
deletions.push(
30+
(async () => {
31+
// only call onComplete per subscription
32+
if (!completed[sub.subscriptionId]) {
33+
completed[sub.subscriptionId] = true
3434

35-
const execContext = buildExecutionContext(
36-
server.schema,
37-
parse(sub.subscription.query),
38-
undefined,
39-
await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }),
40-
sub.subscription.variables,
41-
sub.subscription.operationName,
42-
undefined,
43-
)
44-
45-
if (isArray(execContext)) {
46-
throw new AggregateError(execContext)
47-
}
35+
const execContext = buildExecutionContext(
36+
server.schema,
37+
parse(sub.subscription.query),
38+
undefined,
39+
await constructContext({ server, connectionParams: sub.connectionParams, connectionId: sub.connectionId }),
40+
sub.subscription.variables,
41+
sub.subscription.operationName,
42+
undefined,
43+
)
4844

45+
if (isArray(execContext)) {
46+
throw new AggregateError(execContext)
47+
}
4948

50-
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
5149

52-
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
53-
if (onComplete) {
54-
await onComplete(root, args, context, info)
55-
}
56-
}
50+
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
5751

58-
await server.mapper.delete(sub)
59-
})(),
60-
)
61-
}
52+
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
53+
server.log('messages:disconnect:onComplete', { onComplete: !!onComplete })
54+
await onComplete?.(root, args, context, info)
55+
}
6256

63-
await Promise.all([
64-
// Delete subscriptions
65-
...deletions,
66-
// Delete connection
67-
server.mapper.delete(
68-
Object.assign(new server.model.Connection(), {
69-
id: event.requestContext.connectionId,
70-
}),
71-
),
72-
])
73-
} catch (err) {
74-
await server.onError?.(err, { event })
57+
await server.mapper.delete(sub)
58+
})(),
59+
)
7560
}
61+
62+
await Promise.all([
63+
// Delete subscriptions
64+
...deletions,
65+
// Delete connection
66+
server.mapper.delete(
67+
Object.assign(new server.model.Connection(), {
68+
id: event.requestContext.connectionId,
69+
}),
70+
),
71+
])
72+
} catch (err) {
73+
server.log('messages:disconnect:onError', { err, event })
74+
await server.onError?.(err, { event })
7675
}
76+
}

lib/pubsub/complete.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@ export const complete = (server: ServerClosure): ServerInstance['complete'] => a
4141
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
4242

4343
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
44-
if (onComplete) {
45-
await onComplete(root, args, context, info)
46-
}
47-
44+
server.log('pubsub:complete:onComplete', { onComplete: !!onComplete })
45+
await onComplete?.(root, args, context, info)
4846
})
4947
await Promise.all(iters)
5048
}

lib/test/execute-helper.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,17 @@ export const executeQuery = async (query: string): Promise<unknown> => {
2323
})
2424
}
2525

26-
type SubscriptionResult = Promise<{
26+
type SubscriptionResult = {
2727
values: AsyncGenerator<unknown, unknown, unknown>
2828
unsubscribe: () => void
29-
}>
29+
close: () => Promise<void> | void
30+
}
3031

31-
export const executeSubscription = async (query: string): SubscriptionResult => {
32+
export const executeSubscription = (query: string, { lazy }: {lazy?: boolean} = {}): SubscriptionResult => {
3233
const client = createClient({
3334
url,
3435
webSocketImpl: WebSocket,
36+
lazy,
3537
})
3638

3739
const values = deferGenerator()
@@ -50,5 +52,7 @@ export const executeSubscription = async (query: string): SubscriptionResult =>
5052
},
5153
)
5254

53-
return { values: values.generator, unsubscribe }
55+
const close = () => client.dispose()
56+
57+
return { values: values.generator, unsubscribe, close }
5458
}

lib/test/integration-events-test.ts

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,56 @@ describe('Events', () => {
1919
})
2020

2121
it('subscribes', async () => {
22-
const { values } = await executeSubscription('subscription { greetings }')
22+
const { values } = executeSubscription('subscription { greetings }')
2323
const greetings = await collect(map((value: { greetings: string }) => value.greetings, values))
2424
assert.deepEqual(greetings, ['yoyo', 'hows it', 'howdy'])
2525
})
2626
})
2727

2828
describe('Filter Events', () => {
2929
it('subscribes', async () => {
30-
const { values } = await executeSubscription('subscription { filterTest }')
30+
const { values } = executeSubscription('subscription { filterTest }')
3131
const greetings = await collect(map((value: { filterTest: string }) => value.filterTest, values))
3232
assert.deepEqual(greetings, ['oh yes!', 'Missing fields also work'])
3333
})
3434
})
35+
36+
describe('onComplete', () => {
37+
it('fires when the client disconnects at least once', async () => {
38+
const { values } = executeSubscription('subscription { onCompleteSideChannel }')
39+
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
40+
const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }')
41+
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } })
42+
unsubscribe()
43+
assert.deepEqual((await collect(values))[0], { onCompleteSideChannel: 'onComplete' })
44+
})
45+
it('fires when the client completes', async () => {
46+
// non lazy connections don't disconnect when unsubscribed
47+
const { values, close } = executeSubscription('subscription { onCompleteSideChannel }', { lazy: false })
48+
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
49+
const { unsubscribe } = executeSubscription('subscription { onCompleteTestClientDisconnect }')
50+
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'subscribed' } })
51+
unsubscribe()
52+
assert.deepEqual((await collect(values)), [{ onCompleteSideChannel: 'onComplete' }])
53+
await close()
54+
})
55+
// confirm behavior with graphql-ws but we don't currently error
56+
// it('fires when the resolver errors', async () => {
57+
// const { values } = executeSubscription('subscription { onCompleteSideChannel }')
58+
// assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
59+
// const { values: errorValuesGen } = executeSubscription('subscription { onCompleteTestResolverError }')
60+
// assert.deepEqual(await collect(errorValuesGen), [])
61+
// assert.deepEqual(await collect(values), [{ onCompleteSideChannel: 'onComplete' }])
62+
// })
63+
it('fires when the server completes', async () => {
64+
const { values } = executeSubscription('subscription { onCompleteSideChannel }')
65+
assert.deepEqual(await values.next(), { done: false, value: { onCompleteSideChannel: 'start' } })
66+
const { values: completeValuesGen } = executeSubscription('subscription { onCompleteServerComplete }')
67+
assert.deepEqual(await collect(completeValuesGen), [])
68+
assert.deepEqual((await collect(values)), [
69+
{ onCompleteSideChannel: 'subscribed' },
70+
{ onCompleteSideChannel: 'onComplete' },
71+
])
72+
})
73+
})
3574
})

lib/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export type SubscribeArgs<TRoot = any, TArgs = Record<string, any>, TContext = a
7474
export type SubscriptionFilter<
7575
TSubscribeArgs extends SubscribeArgs = SubscribeArgs,
7676
TReturn extends Record<string, any> = Record<string, any>
77-
> = Partial<TReturn> | void | ((...args: TSubscribeArgs) => MaybePromise<Partial<TReturn>> | MaybePromise<Partial<void>>)
77+
> = Partial<TReturn> | void | ((...args: TSubscribeArgs) => MaybePromise<Partial<TReturn> | void>)
7878

7979
export type SubscriptionDefinition<
8080
T extends PubSubEvent,
@@ -90,8 +90,8 @@ export type SubscribePseudoIterable<T extends PubSubEvent, TSubscribeArgs extend
9090
(...args: TSubscribeArgs): AsyncGenerator<T, never, unknown>
9191
topicDefinitions: SubscriptionDefinition<T, TSubscribeArgs>[]
9292
onSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>
93-
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>
9493
onAfterSubscribe?: (...args: TSubscribeArgs) => MaybePromise<void>
94+
onComplete?: (...args: TSubscribeArgs) => MaybePromise<void>
9595
}
9696

9797

mocks/arc-basic-events/lib/graphql.js

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const typeDefs = `
3030
type Subscription {
3131
greetings: String
3232
filterTest: String
33+
onCompleteSideChannel: String
34+
onCompleteTestClientDisconnect: String
35+
onCompleteTestResolverError: String
36+
onCompleteServerComplete: String
3337
}
3438
`
3539

@@ -65,7 +69,60 @@ const resolvers = {
6569
await complete({ topic: 'filterTest' })
6670
},
6771
}),
68-
resolve: ({ payload }) => {
72+
resolve({ payload }) {
73+
return payload.message
74+
},
75+
},
76+
onCompleteTestClientDisconnect: {
77+
subscribe: subscribe('onCompleteTestResolverError', {
78+
async onComplete(_, __, { publish, complete }){
79+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
80+
await complete({ topic: 'onCompleteSideChannel' })
81+
},
82+
async onAfterSubscribe(_, __, { publish }) {
83+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } })
84+
},
85+
}),
86+
resolve({ payload }) {
87+
return payload.message
88+
},
89+
},
90+
onCompleteTestResolverError: {
91+
subscribe: subscribe('onCompleteTestResolverError', {
92+
async onComplete(_, __, { publish, complete }){
93+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
94+
await complete({ topic: 'onCompleteSideChannel' })
95+
},
96+
async onAfterSubscribe(_, __, { publish }) {
97+
await publish({ topic: 'onCompleteTestResolverError', payload: { message: 'doesnt really matter does it' } })
98+
},
99+
}),
100+
resolve() {
101+
throw new Error('oh no!')
102+
},
103+
},
104+
onCompleteServerComplete: {
105+
subscribe: subscribe('onCompleteServerComplete', {
106+
async onComplete(_, __, { publish, complete }){
107+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'onComplete' } })
108+
await complete({ topic: 'onCompleteSideChannel' })
109+
},
110+
async onAfterSubscribe(_, __, { publish, complete }) {
111+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'subscribed' } })
112+
await complete({ topic: 'onCompleteServerComplete' })
113+
},
114+
}),
115+
resolve({ payload }) {
116+
return payload.message
117+
},
118+
},
119+
onCompleteSideChannel: {
120+
subscribe: subscribe('onCompleteSideChannel', {
121+
async onAfterSubscribe(_, __, { publish }) {
122+
await publish({ topic: 'onCompleteSideChannel', payload: { message: 'start' } })
123+
},
124+
}),
125+
resolve({ payload }) {
69126
return payload.message
70127
},
71128
},

0 commit comments

Comments
 (0)