Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit b78b997

Browse files
committed
feat: complete() to end subscriptions
Sends a complete message and deletes the subscription. It's pretty cool.
1 parent d3ad470 commit b78b997

File tree

20 files changed

+2001
-2430
lines changed

20 files changed

+2001
-2430
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ jobs:
1212
with:
1313
node-version: 14
1414
- run: npm ci
15+
- run: npm run build # needs to happen until I figure out a way to hotload it
1516
- name: Test
1617
run: npm run test
1718
lint:

lib/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { DataMapper } from '@aws/dynamodb-data-mapper'
22
import { ServerArgs } from './types'
33
import { publish } from './pubsub/publish'
4+
import { complete } from './pubsub/complete'
45
import { createModel } from './model/createModel'
56
import { Subscription } from './model/Subscription'
67
import { handleGatewayEvent } from './gateway'
@@ -28,6 +29,7 @@ export const createInstance = (opts: ServerArgs) => {
2829
gatewayHandler: handleGatewayEvent(closure),
2930
stateMachineHandler: handleStateMachineEvent(closure),
3031
publish: publish(closure),
32+
complete: complete(closure),
3133
}
3234
}
3335

lib/messages/subscribe.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,12 @@ export const subscribe: MessageHandler<SubscribeMessage> =
9898
await onSubscribe(root, args, context, info)
9999
}
100100

101-
const topicDefinitions = (field.subscribe as SubscribeHandler)(
101+
const {definitions: topicDefinitions} = await (field.subscribe as SubscribeHandler)(
102102
root,
103103
args,
104104
context,
105105
info,
106-
).definitions // Access subscribe instance
106+
)
107107
await Promise.all(
108108
topicDefinitions.map(async ({ topic, filter }) => {
109109
const subscription = Object.assign(new c.model.Subscription(), {

lib/pubsub/complete.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { CompleteMessage, MessageType } from 'graphql-ws'
2+
import { ServerClosure, PubSubEvent } from '../types'
3+
import { sendMessage } from '../utils/aws'
4+
import { getFilteredSubs } from './getFilteredSubs'
5+
6+
export const complete = (c: ServerClosure) => async (event: PubSubEvent): Promise<void> => {
7+
const subscriptions = await getFilteredSubs(c)(event)
8+
const iters = subscriptions.map(async (sub) => {
9+
const message: CompleteMessage = {
10+
id: sub.subscriptionId,
11+
type: MessageType.Complete,
12+
}
13+
await sendMessage(c)({
14+
...sub.requestContext,
15+
message,
16+
})
17+
await c.mapper.delete(sub)
18+
})
19+
await Promise.all(iters)
20+
}

lib/pubsub/getFilteredSubs.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/* eslint-disable @typescript-eslint/ban-types */
2+
import {
3+
attributeNotExists,
4+
equals,
5+
ConditionExpression,
6+
} from '@aws/dynamodb-expressions'
7+
import { Subscription } from '../model/Subscription'
8+
import { ServerClosure, PubSubEvent } from '../types'
9+
10+
export const getFilteredSubs = (c: Omit<ServerClosure, 'gateway'>) =>
11+
async (event: PubSubEvent): Promise<Subscription[]> => {
12+
const flattenPayload = flatten(event.payload)
13+
const iterator = c.mapper.query(
14+
c.model.Subscription,
15+
{ topic: equals(event.topic) },
16+
{
17+
filter: {
18+
type: 'And',
19+
conditions: Object.entries(flattenPayload).reduce(
20+
(p, [key, value]) => [
21+
...p,
22+
{
23+
type: 'Or',
24+
conditions: [
25+
{
26+
...attributeNotExists(),
27+
subject: `filter.${key}`,
28+
},
29+
{
30+
...equals(value),
31+
subject: `filter.${key}`,
32+
},
33+
],
34+
},
35+
],
36+
[] as ConditionExpression[],
37+
),
38+
},
39+
indexName: 'TopicIndex',
40+
},
41+
)
42+
43+
// Aggregate all targets
44+
const subs: Subscription[] = []
45+
for await (const sub of iterator) {
46+
subs.push(sub)
47+
}
48+
49+
return subs
50+
}
51+
52+
export const flatten = (
53+
obj: object,
54+
): Record<string, number | string | boolean> => Object.entries(obj).reduce((p, [k1, v1]) => {
55+
if (v1 && typeof v1 === 'object') {
56+
const next = Object.entries(v1).reduce(
57+
(prev, [k2, v2]) => ({
58+
...prev,
59+
[`${k1}.${k2}`]: v2,
60+
}),
61+
{},
62+
)
63+
return {
64+
...p,
65+
...flatten(next),
66+
}
67+
}
68+
69+
if (typeof v1 === 'string' ||
70+
typeof v1 === 'number' ||
71+
typeof v1 === 'boolean') {
72+
return { ...p, [k1]: v1 }
73+
}
74+
75+
return p
76+
}, {})

lib/pubsub/publish.ts

Lines changed: 12 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,11 @@
1-
/* eslint-disable @typescript-eslint/ban-types */
2-
import {
3-
attributeNotExists,
4-
equals,
5-
ConditionExpression,
6-
} from '@aws/dynamodb-expressions'
71
import { parse, execute } from 'graphql'
8-
import { MessageType } from 'graphql-ws'
9-
import { Subscription } from '../model/Subscription'
10-
import { ServerClosure } from '../types'
2+
import { MessageType, NextMessage } from 'graphql-ws'
3+
import { PubSubEvent, ServerClosure } from '../types'
114
import { sendMessage } from '../utils/aws'
125
import { constructContext } from '../utils/graphql'
6+
import { getFilteredSubs } from './getFilteredSubs'
137

14-
type PubSubEvent = {
15-
topic: string
16-
payload: any
17-
}
18-
19-
export const publish = (c: ServerClosure) => async (event: PubSubEvent) => {
8+
export const publish = (c: ServerClosure) => async (event: PubSubEvent): Promise<void> => {
209
const subscriptions = await getFilteredSubs(c)(event)
2110
const iters = subscriptions.map(async (sub) => {
2211
const payload = await execute(
@@ -29,86 +18,16 @@ export const publish = (c: ServerClosure) => async (event: PubSubEvent) => {
2918
undefined,
3019
)
3120

21+
const message: NextMessage = {
22+
id: sub.subscriptionId,
23+
type: MessageType.Next,
24+
payload,
25+
}
26+
3227
await sendMessage(c)({
3328
...sub.requestContext,
34-
message: {
35-
id: sub.subscriptionId,
36-
type: MessageType.Next,
37-
payload,
38-
},
29+
message,
3930
})
4031
})
41-
return await Promise.all(iters)
32+
await Promise.all(iters)
4233
}
43-
44-
const getFilteredSubs =
45-
(c: Omit<ServerClosure, 'gateway'>) =>
46-
async (event: PubSubEvent): Promise<Subscription[]> => {
47-
const flattenPayload = flatten(event.payload)
48-
const iterator = c.mapper.query(
49-
c.model.Subscription,
50-
{ topic: equals(event.topic) },
51-
{
52-
filter: {
53-
type: 'And',
54-
conditions: Object.entries(flattenPayload).reduce(
55-
(p, [key, value]) => [
56-
...p,
57-
{
58-
type: 'Or',
59-
conditions: [
60-
{
61-
...attributeNotExists(),
62-
subject: `filter.${key}`,
63-
},
64-
{
65-
...equals(value),
66-
subject: `filter.${key}`,
67-
},
68-
],
69-
},
70-
],
71-
[] as ConditionExpression[],
72-
),
73-
},
74-
indexName: 'TopicIndex',
75-
},
76-
)
77-
78-
// Aggregate all targets
79-
const subs: Subscription[] = []
80-
for await (const sub of iterator) {
81-
subs.push(sub)
82-
}
83-
84-
return subs
85-
}
86-
87-
export const flatten = (
88-
obj: object,
89-
): Record<string, number | string | boolean> =>
90-
Object.entries(obj).reduce((p, [k1, v1]) => {
91-
if (v1 && typeof v1 === 'object') {
92-
const next = Object.entries(v1).reduce(
93-
(prev, [k2, v2]) => ({
94-
...prev,
95-
[`${k1}.${k2}`]: v2,
96-
}),
97-
{},
98-
)
99-
return {
100-
...p,
101-
...flatten(next),
102-
}
103-
}
104-
105-
if (
106-
typeof v1 === 'string' ||
107-
typeof v1 === 'number' ||
108-
typeof v1 === 'boolean'
109-
) {
110-
return { ...p, [k1]: v1 }
111-
}
112-
113-
return p
114-
}, {})

lib/serial-test.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

lib/tests/basic-events-test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { assert } from 'chai'
2+
import fetch from 'node-fetch'
3+
import { start as sandBoxStart, end as sandBoxEnd } from '@architect/sandbox'
4+
import { createClient } from 'graphql-ws'
5+
import WebSocket from 'ws'
6+
import { deferGenerator } from 'inside-out-async'
7+
import { collect, map } from 'streaming-iterables'
8+
9+
before(async () => {
10+
await sandBoxStart({ port: '3339', cwd: './mocks/arc-basic-events' } as any)
11+
})
12+
13+
after(async () => {
14+
await sandBoxEnd()
15+
})
16+
17+
const executeQuery = async (query: string) => {
18+
const client = createClient({
19+
url: 'ws://localhost:3339',
20+
webSocketImpl: WebSocket,
21+
})
22+
23+
return new Promise((resolve, reject) => {
24+
let result
25+
client.subscribe(
26+
{ query },
27+
{
28+
next: ({ data }) => (result = data),
29+
error: reject,
30+
complete: () => resolve(result),
31+
},
32+
)
33+
})
34+
}
35+
36+
const executeSubscription = async (query: string) => {
37+
const client = createClient({
38+
url: 'ws://localhost:3339',
39+
webSocketImpl: WebSocket,
40+
})
41+
42+
const values = deferGenerator()
43+
44+
const unsubscribe = client.subscribe(
45+
{ query },
46+
{
47+
next: ({data}) => {
48+
values.queueValue(data)
49+
},
50+
error: (error: Error) => {
51+
values.queueError(error)
52+
},
53+
complete: () => values.queueReturn(),
54+
},
55+
)
56+
57+
return { values: values.generator, unsubscribe }
58+
}
59+
60+
describe('Basic Events', () => {
61+
it('queries', async () => {
62+
const result = await executeQuery('{ hello }')
63+
assert.deepEqual(result, { hello: 'Hello World!' })
64+
})
65+
66+
it('subscribes', async () => {
67+
const { values, unsubscribe } = await executeSubscription('subscription { greetings }')
68+
// this timeout sucks
69+
await new Promise(resolve => setTimeout(resolve, 2000))
70+
await fetch('http://localhost:3339/')
71+
const greetings = await collect(map((value: { greetings: string }) => value.greetings, values))
72+
assert.deepEqual(greetings, ['hi', 'hey!'])
73+
unsubscribe()
74+
})
75+
})

lib/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,8 @@ export interface APIGatewayWebSocketRequestContext
107107
export interface APIGatewayWebSocketEvent extends APIGatewayProxyEvent {
108108
requestContext: APIGatewayWebSocketRequestContext
109109
}
110+
111+
export type PubSubEvent = {
112+
topic: string
113+
payload: any
114+
}

lib/utils/graphql.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ export const prepareResolvers = <T extends object | object[]>(arg: T) =>
7171

7272
const visit = <T = object>(node: T, handler: (node: T) => any) =>
7373
Object.values(node).forEach((value) => {
74-
console.log(value)
7574
if (typeof value !== 'object') {
7675
return
7776
}

0 commit comments

Comments
 (0)