Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit 6bcc957

Browse files
authored
feat: fix filters, add logging (#29)
- filter functions are now dynamic - fix types for subscribe options - payloads need to be Record<string, any> - logging is now in a lot of places, you can add your own if you like, it should act like npm's `debug` - you don't have to have payloads for complete messages
1 parent 0d64d44 commit 6bcc957

26 files changed

+387
-197
lines changed

lib/gateway.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import { pong } from './messages/pong'
1414

1515
export const handleGatewayEvent = (server: ServerClosure): ApiGatewayHandler<APIGatewayWebSocketEvent, WebsocketResponse> => async (event) => {
1616
if (!event.requestContext) {
17+
server.log('handleGatewayEvent unknown')
1718
return {
1819
statusCode: 200,
1920
body: '',
2021
}
2122
}
2223

2324
if (event.requestContext.eventType === 'CONNECT') {
25+
server.log('handleGatewayEvent CONNECT', { connectionId: event.requestContext.connectionId })
2426
await server.onConnect?.({ event })
2527
return {
2628
statusCode: 200,
@@ -33,6 +35,7 @@ export const handleGatewayEvent = (server: ServerClosure): ApiGatewayHandler<API
3335

3436
if (event.requestContext.eventType === 'MESSAGE') {
3537
const message = event.body === null ? null : JSON.parse(event.body)
38+
server.log('handleGatewayEvent MESSAGE', { connectionId: event.requestContext.connectionId, type: message.type })
3639

3740
if (message.type === MessageType.ConnectionInit) {
3841
await connection_init({ server, event, message })
@@ -76,13 +79,15 @@ export const handleGatewayEvent = (server: ServerClosure): ApiGatewayHandler<API
7679
}
7780

7881
if (event.requestContext.eventType === 'DISCONNECT') {
82+
server.log('handleGatewayEvent DISCONNECT', { connectionId: event.requestContext.connectionId })
7983
await disconnect({ server, event, message: null })
8084
return {
8185
statusCode: 200,
8286
body: '',
8387
}
8488
}
8589

90+
server.log('handleGatewayEvent UNKNOWN', { connectionId: event.requestContext.connectionId })
8691
return {
8792
statusCode: 200,
8893
body: '',

lib/index-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe('createInstance', () => {
1212
})
1313

1414
after(async () => {
15-
tables.end()
15+
await tables.end()
1616
})
1717

1818
it('is type compatible with aws-lambda handler', async () => {

lib/makeServerClosure.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import { ServerArgs, ServerClosure } from './types'
33
import { createModel } from './model/createModel'
44
import { Subscription } from './model/Subscription'
55
import { Connection } from './model/Connection'
6+
import { log } from './utils/logger'
67

78
export function makeServerClosure(opts: ServerArgs): ServerClosure {
89
return {
10+
log: log,
911
...opts,
1012
model: {
1113
Subscription: createModel({

lib/messages/complete.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { parse } from 'graphql'
33
import { CompleteMessage } from 'graphql-ws'
44
import { buildExecutionContext } from 'graphql/execution/execute'
55
import { collect } from 'streaming-iterables'
6-
import { SubscribePseudoIterable, MessageHandler } from '../types'
6+
import { SubscribePseudoIterable, MessageHandler, PubSubEvent } from '../types'
77
import { deleteConnection } from '../utils/deleteConnection'
88
import { constructContext } from '../utils/constructContext'
99
import { getResolverAndArgs } from '../utils/getResolverAndArgs'
@@ -37,7 +37,7 @@ export const complete: MessageHandler<CompleteMessage> =
3737

3838
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
3939

40-
const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete
40+
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
4141
if (onComplete) {
4242
await onComplete(root, args, context, info)
4343
}

lib/messages/disconnect.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { equals } from '@aws/dynamodb-expressions'
44
import { buildExecutionContext } from 'graphql/execution/execute'
55
import { constructContext } from '../utils/constructContext'
66
import { getResolverAndArgs } from '../utils/getResolverAndArgs'
7-
import { SubscribePseudoIterable, MessageHandler } from '../types'
7+
import { SubscribePseudoIterable, MessageHandler, PubSubEvent } from '../types'
88
import { isArray } from '../utils/isArray'
99
import { collect } from 'streaming-iterables'
1010
import { Connection } from '../model/Connection'
@@ -49,7 +49,7 @@ export const disconnect: MessageHandler<null> =
4949

5050
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
5151

52-
const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete
52+
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
5353
if (onComplete) {
5454
await onComplete(root, args, context, info)
5555
}

lib/messages/subscribe-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ describe('messages/subscribe', () => {
1919
})
2020

2121
afterEach(async () => {
22-
tables.end()
22+
await tables.end()
2323
})
2424

2525
it('executes a query/mutation', async () => {

lib/messages/subscribe.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
assertValidExecutionArguments,
77
execute,
88
} from 'graphql/execution/execute'
9-
import { APIGatewayWebSocketEvent, ServerClosure, SubscribeHandler, MessageHandler } from '../types'
9+
import { APIGatewayWebSocketEvent, ServerClosure, MessageHandler, SubscribePseudoIterable, PubSubEvent } from '../types'
1010
import { constructContext } from '../utils/constructContext'
1111
import { getResolverAndArgs } from '../utils/getResolverAndArgs'
1212
import { sendMessage } from '../utils/sendMessage'
@@ -25,9 +25,11 @@ export const subscribe: MessageHandler<SubscribeMessage> =
2525
}
2626

2727
const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, event, message }) => {
28+
const connectionId = event.requestContext.connectionId
29+
2830
const connection = await server.mapper.get(
2931
Object.assign(new server.model.Connection(), {
30-
id: event.requestContext.connectionId,
32+
id: connectionId,
3133
}),
3234
)
3335
const connectionParams = connection.payload || {}
@@ -39,7 +41,7 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
3941
throw new AggregateError(errors)
4042
}
4143

42-
const contextValue = await constructContext({ server, connectionParams, connectionId: connection.id })
44+
const contextValue = await constructContext({ server, connectionParams, connectionId })
4345

4446
const execContext = buildExecutionContext(
4547
server.schema,
@@ -74,33 +76,33 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
7476
throw new Error('No field')
7577
}
7678

77-
const { topicDefinitions, onSubscribe, onAfterSubscribe } = await (field.subscribe as SubscribeHandler)(
78-
root,
79-
args,
80-
context,
81-
info,
82-
)
79+
const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable<PubSubEvent>
8380

81+
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
8482
await onSubscribe?.(root, args, context, info)
8583

8684
await Promise.all(topicDefinitions.map(async ({ topic, filter }) => {
85+
const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter
86+
8787
const subscription = Object.assign(new server.model.Subscription(), {
88-
id: `${event.requestContext.connectionId}|${message.id}`,
88+
id: `${connectionId}|${message.id}`,
8989
topic,
90-
filter: filter || {},
90+
filter: filterData || {},
9191
subscriptionId: message.id,
9292
subscription: {
9393
variableValues: args,
9494
...message.payload,
9595
},
96-
connectionId: event.requestContext.connectionId,
96+
connectionId,
9797
connectionParams,
9898
requestContext: event.requestContext,
9999
ttl: connection.ttl,
100100
})
101+
server.log('subscribe:putSubscription %j', subscription)
101102
await server.mapper.put(subscription)
102103
}))
103104

105+
server.log('onAfterSubscribe', { onAfterSubscribe: !!onAfterSubscribe })
104106
await onAfterSubscribe?.(root, args, context, info)
105107
}
106108

@@ -125,6 +127,8 @@ const validateMessage = (server: ServerClosure) => (message: SubscribeMessage) =
125127

126128
// eslint-disable-next-line @typescript-eslint/no-explicit-any
127129
async function executeQuery(server: ServerClosure, message: SubscribeMessage, contextValue: any, event: APIGatewayWebSocketEvent) {
130+
server.log('executeQuery', { connectionId: event.requestContext.connectionId })
131+
128132
const result = await execute(
129133
server.schema,
130134
parse(message.payload.query),

lib/model/createModel.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { DynamoDbTable } from '@aws/dynamodb-data-mapper'
2-
import { Class } from '../types'
32

4-
export const createModel = <T extends Class>({
3+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
4+
export const createModel = <T extends { new(...args: any[]): any }>({
55
model,
66
table,
77
}: {

lib/pubsub/complete-test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { tables } from '@architect/sandbox'
2+
import { mockServerContext } from '../test/mockServer'
3+
import { complete } from './complete'
4+
5+
describe('pubsub:complete', () => {
6+
before(async () => {
7+
await tables.start({ cwd: './mocks/arc-basic-events', quiet: true })
8+
})
9+
10+
after(async () => {
11+
await tables.end()
12+
})
13+
14+
it('takes a topic', async () => {
15+
const server = await mockServerContext()
16+
await complete(server)({ topic: 'Topic12' })
17+
})
18+
})

lib/pubsub/complete.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@ import AggregateError from 'aggregate-error'
22
import { parse } from 'graphql'
33
import { CompleteMessage, MessageType } from 'graphql-ws'
44
import { buildExecutionContext } from 'graphql/execution/execute'
5-
import { ServerClosure, PubSubEvent, SubscribePseudoIterable } from '../types'
5+
import { ServerClosure, PubSubEvent, SubscribePseudoIterable, PartialBy } from '../types'
66
import { sendMessage } from '../utils/sendMessage'
77
import { constructContext } from '../utils/constructContext'
88
import { getResolverAndArgs } from '../utils/getResolverAndArgs'
99
import { isArray } from '../utils/isArray'
1010
import { getFilteredSubs } from './getFilteredSubs'
1111

12-
export const complete = (server: ServerClosure) => async (event: PubSubEvent): Promise<void> => {
12+
export const complete = (server: ServerClosure) => async (event: PartialBy<PubSubEvent, 'payload'>): Promise<void> => {
1313
const subscriptions = await getFilteredSubs({ server, event })
14+
server.log('pubsub:complete %j', { event, subscriptions })
15+
1416
const iters = subscriptions.map(async (sub) => {
1517
const message: CompleteMessage = {
1618
id: sub.subscriptionId,
@@ -38,7 +40,7 @@ export const complete = (server: ServerClosure) => async (event: PubSubEvent): P
3840

3941
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
4042

41-
const onComplete = (field?.subscribe as SubscribePseudoIterable)?.onComplete
43+
const onComplete = (field?.subscribe as SubscribePseudoIterable<PubSubEvent>)?.onComplete
4244
if (onComplete) {
4345
await onComplete(root, args, context, info)
4446
}

0 commit comments

Comments
 (0)