Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit 2bbcab4

Browse files
authored
fix: duplicate subscription ids now properly error & close the connection (#63)
1 parent 204c4bc commit 2bbcab4

File tree

7 files changed

+153
-20
lines changed

7 files changed

+153
-20
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,6 @@ const instance = makeServer({
599599

600600
</details>
601601

602-
603602
## Caveats
604603

605604
### Ping/Pong
@@ -609,3 +608,7 @@ For whatever reason, AWS API Gateway does not support WebSocket protocol level p
609608
### Socket idleness
610609

611610
API Gateway considers an idle connection to be one where no messages have been sent on the socket for a fixed duration [(currently 10 minutes)](https://docs.aws.amazon.com/apigateway/latest/developerguide/limits.html#apigateway-execution-service-websocket-limits-table). The WebSocket spec has support for detecting idle connections (ping/pong) but API Gateway doesn't use it. This means, in the case where both parties are connected, and no message is sent on the socket for the defined duration (direction agnostic), API Gateway will close the socket. A fix for this is to set up immediate reconnection on the client side.
611+
612+
### Socket Close Reasons
613+
614+
API Gateway doesn't support custom reasons or codes for WebSockets being closed. So the codes and reason strings wont match `graphql-ws`.

lib/ddb/DDB.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import { DynamoDB } from 'aws-sdk'
22
import { LoggerFunction, DDBType } from '../types'
33

44
export interface DDBClient<T extends DDBType, TKey> {
5-
get: (Key: TKey) => Promise<T|null>
6-
put: (obj: T) => Promise<T>
5+
get: (Key: TKey) => Promise<T | null>
6+
put: (obj: T, putOptions?: Partial<DynamoDB.DocumentClient.PutItemInput>) => Promise<T>
77
update: (Key: TKey, obj: Partial<T>) => Promise<T>
88
delete: (Key: TKey) => Promise<T>
99
query: (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => AsyncGenerator<T, void, undefined>
@@ -35,13 +35,14 @@ export const DDB = <T extends DDBType, TKey>({
3535
}
3636
}
3737

38-
const put = async (Item: T): Promise<T> => {
38+
const put = async (Item: T, putOptions?: Partial<DynamoDB.DocumentClient.PutItemInput>): Promise<T> => {
3939
log('put', { tableName: tableName, Item })
4040
try {
4141
const { Attributes } = await documentClient.put({
4242
TableName: tableName,
4343
Item,
4444
ReturnValues: 'ALL_OLD',
45+
...putOptions,
4546
}).promise()
4647
return Attributes as T
4748
} catch (e) {

lib/messages/subscribe.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ export const subscribe: MessageHandler<SubscribeMessage> =
1818
try {
1919
await setupSubscription({ server, event, message })
2020
} catch (err) {
21+
server.log('subscribe:error', { connectionId: event.requestContext.connectionId, error: err.message })
2122
await server.onError?.(err, { event, message })
2223
await deleteConnection(server)(event.requestContext)
2324
}
2425
}
2526

2627
const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, event, message }) => {
2728
const connectionId = event.requestContext.connectionId
28-
server.log('subscribe', { connectionId, query: message.payload.query })
29+
server.log('subscribe', { connectionId, messageId: message.id, query: message.payload.query })
2930

3031
const connection = await server.models.connection.get({ id: connectionId })
3132
if (!connection) {
@@ -83,7 +84,7 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
8384

8485
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
8586
const onSubscribeErrors = await onSubscribe?.(root, args, context, info)
86-
if (onSubscribeErrors){
87+
if (onSubscribeErrors) {
8788
server.log('onSubscribe', { onSubscribeErrors })
8889
return postToConnection(server)({
8990
...event.requestContext,
@@ -97,8 +98,9 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
9798

9899
const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter
99100

101+
const subscriptionId = `${connection.id}|${message.id}`
100102
const subscription: Subscription = {
101-
id: `${connection.id}|${message.id}`,
103+
id: subscriptionId,
102104
topic,
103105
filter: filterData || {},
104106
subscriptionId: message.id,
@@ -113,7 +115,22 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
113115
createdAt: Date.now(),
114116
}
115117
server.log('subscribe:putSubscription', subscription)
116-
await server.models.subscription.put(subscription)
118+
try{
119+
await server.models.subscription.put(subscription, {
120+
ConditionExpression: '#id <> :id',
121+
ExpressionAttributeNames: {
122+
'#id': 'id',
123+
},
124+
ExpressionAttributeValues: {
125+
':id': subscriptionId,
126+
},
127+
})
128+
} catch (error) {
129+
if (error.code === 'ConditionalCheckFailedException') {
130+
throw new Error(`Subscriber for ${message.id} already exists`)
131+
}
132+
throw error
133+
}
117134

118135
server.log('onAfterSubscribe', { onAfterSubscribe: !!onAfterSubscribe })
119136
await onAfterSubscribe?.(root, args, context, info)

lib/test/execute-helper.ts

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ export const executeQuery = async function* (query: string, {
1616
url = URL,
1717
stayConnected = false,
1818
timeout = 20_000,
19+
id = 1,
1920
}: {
2021
url?: string
2122
stayConnected?: boolean
2223
timeout?: number
24+
id?: number
2325
} = {}): AsyncGenerator<unknown, void, unknown> {
24-
let id = 1
2526
const ws = new WebSocket(url, 'graphql-transport-ws')
2627

2728
const incomingMessages = deferGenerator()
@@ -61,7 +62,7 @@ export const executeQuery = async function* (query: string, {
6162
}
6263

6364
await send({
64-
id: `${id++}`,
65+
id: `${id}`,
6566
type: 'subscribe',
6667
payload: { query },
6768
})
@@ -84,10 +85,11 @@ export const executeQuery = async function* (query: string, {
8485

8586
export const executeToComplete = async function (query: string, {
8687
url = URL,
88+
id = 1,
8789
}: {
8890
url?: string
91+
id?: number
8992
} = {}): Promise<() => Promise<void>> {
90-
let id = 1
9193
const ws = new WebSocket(url, 'graphql-transport-ws')
9294

9395
const incomingMessages = deferGenerator()
@@ -118,16 +120,14 @@ export const executeToComplete = async function (query: string, {
118120
throw new Error(`Bad ack ${messageToString(connectionAck)}`)
119121
}
120122

121-
const subId = id++
122-
123123
await send({
124-
id: `${subId}`,
124+
id: `${id}`,
125125
type: 'subscribe',
126126
payload: { query },
127127
})
128128

129129
return () => send({
130-
id: `${subId}`,
130+
id,
131131
type: 'complete',
132132
})
133133
}
@@ -136,10 +136,11 @@ export const executeToComplete = async function (query: string, {
136136

137137
export const executeToDisconnect = async function (query: string, {
138138
url = URL,
139+
id = 1,
139140
}: {
140141
url?: string
142+
id?: number
141143
} = {}): Promise<() => void> {
142-
let id = 1
143144
const ws = new WebSocket(url, 'graphql-transport-ws')
144145

145146
const incomingMessages = deferGenerator()
@@ -170,13 +171,91 @@ export const executeToDisconnect = async function (query: string, {
170171
throw new Error(`Bad ack ${messageToString(connectionAck)}`)
171172
}
172173

173-
const subId = id++
174-
175174
await send({
176-
id: `${subId}`,
175+
id: `${id}`,
177176
type: 'subscribe',
178177
payload: { query },
179178
})
180179

181180
return () => ws.close()
182181
}
182+
183+
export const executeDoubleQuery = async function* (query: string, {
184+
url = URL,
185+
stayConnected = false,
186+
timeout = 20_000,
187+
id = 1,
188+
}: {
189+
url?: string
190+
stayConnected?: boolean
191+
timeout?: number
192+
id?: number
193+
} = {}): AsyncGenerator<unknown, void, unknown> {
194+
const ws = new WebSocket(url, 'graphql-transport-ws')
195+
196+
const incomingMessages = deferGenerator()
197+
198+
ws.on('message', data => {
199+
const message = JSON.parse(data.toString())
200+
incomingMessages.queueValue(message)
201+
if (message.type === 'error' || message.type === 'complete') {
202+
incomingMessages.queueReturn()
203+
}
204+
})
205+
206+
ws.on('error', error => {
207+
incomingMessages.queueValue( { type: 'websocketError', value: error.message })
208+
incomingMessages.queueReturn()
209+
})
210+
ws.on('close', (code, reason) => {
211+
incomingMessages.queueValue({ type: 'close', code, reason: reason.toString() })
212+
incomingMessages.queueReturn()
213+
})
214+
215+
let timer: NodeJS.Timeout|null = null
216+
if (timeout) {
217+
timer = setTimeout(() => {
218+
incomingMessages.queueValue({ type: 'timeout', timeout })
219+
incomingMessages.queueReturn()
220+
}, timeout)
221+
}
222+
223+
const send = (data: any) => new Promise<void>(resolve => ws.send(JSON.stringify(data), () => resolve()))
224+
225+
await new Promise(resolve => ws.on('open', resolve))
226+
await send({ type: 'connection_init' })
227+
const connectionAck: any = (await incomingMessages.generator.next()).value
228+
if (connectionAck.type !== 'connection_ack') {
229+
throw new Error(`Bad ack ${messageToString(connectionAck)}`)
230+
}
231+
232+
await send({
233+
id: `${id}`,
234+
type: 'subscribe',
235+
payload: { query },
236+
})
237+
238+
const firstMessage = await incomingMessages.generator.next()
239+
if (firstMessage.done) {
240+
return
241+
}
242+
yield firstMessage.value
243+
244+
await send({
245+
id: `${id}`,
246+
type: 'subscribe',
247+
payload: { query },
248+
})
249+
250+
251+
for await (const message of incomingMessages.generator) {
252+
yield message
253+
}
254+
255+
if (!stayConnected){
256+
ws.close()
257+
}
258+
if (timer) {
259+
clearTimeout(timer)
260+
}
261+
}

lib/test/graphql-ws-schema.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const typeDefs = `
1414
greetings: String
1515
onSubscribeError: String
1616
onResolveError: String
17+
oneEvent: String
1718
}
1819
`
1920

@@ -43,6 +44,14 @@ const resolvers = {
4344
throw new Error('resolver error')
4445
},
4546
},
47+
oneEvent:{
48+
subscribe: async function*(){
49+
yield { oneEvent: 'lets start!' }
50+
// eslint-disable-next-line @typescript-eslint/no-empty-function
51+
await new Promise(() => {})
52+
},
53+
},
54+
4655
},
4756
}
4857

lib/test/integration-events-test.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { assert } from 'chai'
33
import { start as sandBoxStart, end as sandBoxStop } from '@architect/sandbox'
44
import { collect, map } from 'streaming-iterables'
5-
import { executeQuery, executeToComplete, executeToDisconnect } from './execute-helper'
5+
import { executeDoubleQuery, executeQuery, executeToComplete, executeToDisconnect } from './execute-helper'
66
import { startGqlWSServer } from './graphql-ws-schema'
77

88
describe('Events', () => {
@@ -64,6 +64,18 @@ describe('Events', () => {
6464
assert.deepEqual(lambdaResult, gqlWSResult)
6565
await stop()
6666
})
67+
68+
it('errors when duplicating subscription ids', async () => {
69+
const { url, stop } = await startGqlWSServer()
70+
71+
const lambdaError = await collect(executeDoubleQuery('subscription { oneEvent }', { id: 1 }))
72+
const gqlWSError = await collect(executeDoubleQuery('subscription { oneEvent }', { url, id: 1 }))
73+
assert.deepEqual(lambdaError[0], gqlWSError[0])
74+
// This would be exactly equal but apigateway doesn't support close reasons *eye roll*
75+
assert.containSubset(lambdaError[1], { type: 'close' })
76+
assert.containSubset(gqlWSError[1], { type: 'close' })
77+
await stop()
78+
})
6779
})
6880

6981
describe('Filter Events', () => {

mocks/arc-basic-events/lib/graphql.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const typeDefs = `
3434
onCompleteTestClientDisconnect: String
3535
onResolveError: String
3636
onCompleteServerComplete: String
37+
oneEvent: String
3738
}
3839
`
3940

@@ -139,6 +140,17 @@ const resolvers = {
139140
return payload.message
140141
},
141142
},
143+
oneEvent:{
144+
subscribe: subscribe('oneEvent', {
145+
async onAfterSubscribe(_, __, { publish }) {
146+
await publish({ topic: 'oneEvent', payload: { message: 'lets start!' } })
147+
},
148+
}),
149+
resolve: ({ payload }) => {
150+
return payload.message
151+
},
152+
},
153+
142154
},
143155
}
144156

0 commit comments

Comments
 (0)