Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit a8fe398

Browse files
authored
fix: args that are inline weren't passed to callback functions (#64)
eg ```graphql subscription { greetings(name: "Jonas") } ``` Will now work as expected.
1 parent 2bbcab4 commit a8fe398

File tree

14 files changed

+215
-80
lines changed

14 files changed

+215
-80
lines changed

.eslintrc.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ module.exports = {
1414
},
1515
plugins: [
1616
'@typescript-eslint',
17+
'mocha-no-only',
1718
],
1819
rules: {
1920
'@typescript-eslint/member-delimiter-style': ['error', {
@@ -32,6 +33,7 @@ module.exports = {
3233
'object-curly-spacing': ['error', 'always'],
3334
'quote-props': ['error', 'as-needed'],
3435
'space-infix-ops': ['error'],
36+
"mocha-no-only/mocha-no-only": ["error"],
3537
indent: ['error', 2],
3638
quotes: ['error', 'single'],
3739
semi: 'off',

lib/messages/complete.ts

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,37 @@ import { getResolverAndArgs } from '../utils/getResolverAndArgs'
99
import { isArray } from '../utils/isArray'
1010

1111
/** Handler function for 'complete' message. */
12-
export const complete: MessageHandler<CompleteMessage> =
13-
async ({ server, event, message }) => {
14-
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
15-
try {
16-
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
17-
if (!subscription) {
18-
return
19-
}
20-
const execContext = buildExecutionContext(
21-
server.schema,
22-
parse(subscription.subscription.query),
23-
undefined,
24-
await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }),
25-
subscription.subscription.variables,
26-
subscription.subscription.operationName,
27-
undefined,
28-
)
12+
export const complete: MessageHandler<CompleteMessage> = async ({ server, event, message }) => {
13+
server.log('messages:complete', { connectionId: event.requestContext.connectionId })
14+
try {
15+
const subscription = await server.models.subscription.get({ id: `${event.requestContext.connectionId}|${message.id}` })
16+
if (!subscription) {
17+
return
18+
}
19+
const execContext = buildExecutionContext(
20+
server.schema,
21+
parse(subscription.subscription.query),
22+
undefined,
23+
await buildContext({ server, connectionInitPayload: subscription.connectionInitPayload, connectionId: subscription.connectionId }),
24+
subscription.subscription.variables,
25+
subscription.subscription.operationName,
26+
undefined,
27+
)
2928

30-
if (isArray(execContext)) {
31-
throw new AggregateError(execContext)
32-
}
29+
if (isArray(execContext)) {
30+
throw new AggregateError(execContext)
31+
}
3332

34-
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
33+
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })
3534

36-
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
37-
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
38-
await onComplete?.(root, args, context, info)
35+
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
36+
server.log('messages:complete:onComplete', { onComplete: !!onComplete })
37+
await onComplete?.(root, args, context, info)
3938

40-
await server.models.subscription.delete({ id: subscription.id })
41-
} catch (err) {
42-
server.log('messages:complete:onError', { err, event })
43-
await server.onError?.(err, { event, message })
44-
await deleteConnection(server)(event.requestContext)
45-
}
39+
await server.models.subscription.delete({ id: subscription.id })
40+
} catch (err) {
41+
server.log('messages:complete:onError', { err, event })
42+
await server.onError?.(err, { event, message })
43+
await deleteConnection(server)(event.requestContext)
4644
}
45+
}

lib/messages/disconnect.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export const disconnect: MessageHandler<null> = async ({ server, event }) => {
3838
throw new AggregateError(execContext)
3939
}
4040

41-
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
41+
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })
4242

4343
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
4444
server.log('messages:disconnect:onComplete', { onComplete: !!onComplete })
@@ -47,7 +47,7 @@ export const disconnect: MessageHandler<null> = async ({ server, event }) => {
4747
})
4848

4949
// do this first so that we don't create any more subscriptions for this connection
50-
await server.models.connection.delete({ id: connectionId }),
50+
await server.models.connection.delete({ id: connectionId })
5151
await Promise.all(deletions)
5252
} catch (err) {
5353
server.log('messages:disconnect:onError', { err, event })

lib/messages/subscribe-test.ts

Lines changed: 106 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,15 @@ describe('messages/subscribe', () => {
8585
assert.deepEqual(state, {
8686
post: [
8787
{ ConnectionId, Data: JSON.stringify({ type: 'connection_ack' }) },
88-
{ ConnectionId, Data: JSON.stringify({ type: 'error', id: 'abcdefg', payload: [{
89-
message: 'Cannot query field "HIHOWEAREYOU" on type "Query".',
90-
locations: [{ line:1, column:3 }],
88+
{
89+
ConnectionId, Data: JSON.stringify({
90+
type: 'error', id: 'abcdefg', payload: [{
91+
message: 'Cannot query field "HIHOWEAREYOU" on type "Query".',
92+
locations: [{ line: 1, column: 3 }],
93+
},
94+
],
95+
}),
9196
},
92-
] }) },
9397
],
9498
delete: [],
9599
})
@@ -102,7 +106,7 @@ describe('messages/subscribe', () => {
102106
const server = await mockServerContext({
103107
apiGatewayManagementApi: {
104108
// eslint-disable-next-line @typescript-eslint/no-empty-function
105-
postToConnection: () => ({ promise: async () => { if(sendErr) { throw new Error('postToConnection Error') } } }),
109+
postToConnection: () => ({ promise: async () => { if (sendErr) { throw new Error('postToConnection Error') } } }),
106110
// eslint-disable-next-line @typescript-eslint/no-empty-function
107111
deleteConnection: () => ({ promise: async () => { } }),
108112
},
@@ -112,7 +116,7 @@ describe('messages/subscribe', () => {
112116
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
113117
sendErr = true
114118
await subscribe({ server, event, message: JSON.parse(event.body) })
115-
assert.match(error.message, /postToConnection Error/ )
119+
assert.match(error.message, /postToConnection Error/)
116120
})
117121

118122
describe('callbacks', () => {
@@ -132,7 +136,7 @@ describe('messages/subscribe', () => {
132136
hello: () => 'Hello World!',
133137
},
134138
Subscription: {
135-
greetings:{
139+
greetings: {
136140
subscribe: pubsubSubscribe('greetings', {
137141
onSubscribe() {
138142
onSubscribe.push('We did it!')
@@ -146,13 +150,8 @@ describe('messages/subscribe', () => {
146150
},
147151
}
148152

149-
const schema = makeExecutableSchema({
150-
typeDefs,
151-
resolvers,
152-
})
153-
const server = await mockServerContext({
154-
schema,
155-
})
153+
const schema = makeExecutableSchema({ typeDefs, resolvers })
154+
const server = await mockServerContext({ schema })
156155
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
157156

158157
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
@@ -172,6 +171,96 @@ describe('messages/subscribe', () => {
172171
assert.isEmpty(subscriptions)
173172
})
174173

174+
it('fires onSubscribe with variable args', async () => {
175+
const collectedArgs: any[] = []
176+
177+
const typeDefs = `
178+
type Query {
179+
hello(name: String!): String
180+
}
181+
type Subscription {
182+
greetings(name: String!): String
183+
}
184+
`
185+
const resolvers = {
186+
Query: {
187+
hello: (_, { name }) => `Hello ${name}!`,
188+
},
189+
Subscription: {
190+
greetings: {
191+
subscribe: pubsubSubscribe('greetings', {
192+
onSubscribe(_, args) {
193+
collectedArgs.push(args)
194+
},
195+
}),
196+
resolve: ({ payload }) => {
197+
return payload
198+
},
199+
},
200+
},
201+
}
202+
203+
const schema = makeExecutableSchema({ typeDefs, resolvers })
204+
const server = await mockServerContext({ schema })
205+
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription($name: String!) { greetings(name: $name) }", "variables":{"name":"Jonas"}}}' }
206+
207+
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
208+
await subscribe({ server, event, message: JSON.parse(event.body) })
209+
assert.deepEqual(collectedArgs[0], { name: 'Jonas' })
210+
const [subscription] = await collect(server.models.subscription.query({
211+
IndexName: 'ConnectionIndex',
212+
ExpressionAttributeNames: { '#a': 'connectionId' },
213+
ExpressionAttributeValues: { ':1': event.requestContext.connectionId },
214+
KeyConditionExpression: '#a = :1',
215+
}))
216+
assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload })
217+
})
218+
219+
it('fires onSubscribe with inline args', async () => {
220+
const collectedArgs: any[] = []
221+
222+
const typeDefs = `
223+
type Query {
224+
hello(name: String!): String
225+
}
226+
type Subscription {
227+
greetings(name: String!): String
228+
}
229+
`
230+
const resolvers = {
231+
Query: {
232+
hello: (_, { name }) => `Hello ${name}!`,
233+
},
234+
Subscription: {
235+
greetings: {
236+
subscribe: pubsubSubscribe('greetings', {
237+
onSubscribe(_, args) {
238+
collectedArgs.push(args)
239+
},
240+
}),
241+
resolve: ({ payload }) => {
242+
return payload
243+
},
244+
},
245+
},
246+
}
247+
248+
const schema = makeExecutableSchema({ typeDefs, resolvers })
249+
const server = await mockServerContext({ schema })
250+
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings(name: \\"Jonas\\") }"}}' }
251+
252+
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
253+
await subscribe({ server, event, message: JSON.parse(event.body) })
254+
assert.deepEqual(collectedArgs[0], { name: 'Jonas' })
255+
const [subscription] = await collect(server.models.subscription.query({
256+
IndexName: 'ConnectionIndex',
257+
ExpressionAttributeNames: { '#a': 'connectionId' },
258+
ExpressionAttributeValues: { ':1': event.requestContext.connectionId },
259+
KeyConditionExpression: '#a = :1',
260+
}))
261+
assert.containSubset(subscription, { connectionId, subscriptionId: '1234', subscription: JSON.parse(event.body).payload })
262+
})
263+
175264
it('fires onAfterSubscribe after subscribing', async () => {
176265
const events: string[] = []
177266

@@ -188,7 +277,7 @@ describe('messages/subscribe', () => {
188277
hello: () => 'Hello World!',
189278
},
190279
Subscription: {
191-
greetings:{
280+
greetings: {
192281
subscribe: pubsubSubscribe('greetings', {
193282
onSubscribe() {
194283
events.push('onSubscribe')
@@ -204,13 +293,8 @@ describe('messages/subscribe', () => {
204293
},
205294
}
206295

207-
const schema = makeExecutableSchema({
208-
typeDefs,
209-
resolvers,
210-
})
211-
const server = await mockServerContext({
212-
schema,
213-
})
296+
const schema = makeExecutableSchema({ typeDefs, resolvers })
297+
const server = await mockServerContext({ schema })
214298
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
215299

216300
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })

lib/messages/subscribe.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,17 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
7070
})
7171
}
7272

73+
const subscriptionId = `${connection.id}|${message.id}`
74+
if (await server.models.subscription.get({ id: subscriptionId })) {
75+
throw new Error(`Subscriber for ${message.id} already exists`)
76+
}
77+
7378
if (execContext.operation.operation !== 'subscription') {
7479
await executeQuery(server, message, contextValue, event)
7580
return
7681
}
7782

78-
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
83+
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })
7984
if (!field) {
8085
throw new Error('No field')
8186
}
@@ -98,24 +103,20 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
98103

99104
const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter
100105

101-
const subscriptionId = `${connection.id}|${message.id}`
102106
const subscription: Subscription = {
103107
id: subscriptionId,
104108
topic,
105109
filter: filterData || {},
106110
subscriptionId: message.id,
107-
subscription: {
108-
variableValues: args,
109-
...message.payload,
110-
},
111+
subscription: message.payload,
111112
connectionId: connection.id,
112113
connectionInitPayload: connection.payload,
113114
requestContext: event.requestContext,
114115
ttl: connection.ttl,
115116
createdAt: Date.now(),
116117
}
117118
server.log('subscribe:putSubscription', subscription)
118-
try{
119+
try {
119120
await server.models.subscription.put(subscription, {
120121
ConditionExpression: '#id <> :id',
121122
ExpressionAttributeNames: {

lib/pubsub/complete.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export const complete = (serverPromise: Promise<ServerClosure> | ServerClosure):
3939
throw new AggregateError(execContext)
4040
}
4141

42-
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
42+
const { field, root, args, context, info } = getResolverAndArgs({ server, execContext })
4343

4444
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
4545
server.log('pubsub:complete:onComplete', { onComplete: !!onComplete })

lib/test/execute-helper.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,13 @@ export const executeDoubleQuery = async function* (query: string, {
185185
stayConnected = false,
186186
timeout = 20_000,
187187
id = 1,
188+
skipWaitingForFirstMessage = false,
188189
}: {
189190
url?: string
190191
stayConnected?: boolean
191192
timeout?: number
192193
id?: number
194+
skipWaitingForFirstMessage?: boolean
193195
} = {}): AsyncGenerator<unknown, void, unknown> {
194196
const ws = new WebSocket(url, 'graphql-transport-ws')
195197

@@ -235,11 +237,13 @@ export const executeDoubleQuery = async function* (query: string, {
235237
payload: { query },
236238
})
237239

238-
const firstMessage = await incomingMessages.generator.next()
239-
if (firstMessage.done) {
240-
return
240+
if (!skipWaitingForFirstMessage) {
241+
const firstMessage = await incomingMessages.generator.next()
242+
if (firstMessage.done) {
243+
return
244+
}
245+
yield firstMessage.value
241246
}
242-
yield firstMessage.value
243247

244248
await send({
245249
id: `${id}`,

lib/test/graphql-ws-schema.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const PORT = 4000
99
const typeDefs = `
1010
type Query {
1111
hello: String
12+
dontResolve: String
1213
}
1314
type Subscription {
1415
greetings: String
@@ -21,6 +22,8 @@ const typeDefs = `
2122
const resolvers = {
2223
Query: {
2324
hello: () => 'Hello World!',
25+
// eslint-disable-next-line @typescript-eslint/no-empty-function
26+
dontResolve: () => new Promise(() => {}),
2427
},
2528
Subscription: {
2629
greetings:{

0 commit comments

Comments
 (0)