From abbdbc39ae985f305d1d9aeec5e788eda8f654a0 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 13 May 2024 16:40:49 -0400 Subject: [PATCH 01/21] basic structure for RealtimeChannel --- realtime/realtime_channel.go | 44 ++++++++++++++++++++++++++++++++++++ realtime/realtime_client.go | 12 ++++++++++ 2 files changed, 56 insertions(+) create mode 100644 realtime/realtime_channel.go diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go new file mode 100644 index 0000000..ad3ad8e --- /dev/null +++ b/realtime/realtime_channel.go @@ -0,0 +1,44 @@ +package realtime + +import "fmt" + +type realtimeTopic string // internal string type for representing topics + +type RealtimeChannel struct { + topic realtimeTopic + client *RealtimeClient + hasJoined bool +} + +// Initialize a new channel +func CreateRealtimeChannel(client *RealtimeClient, topic realtimeTopic) *RealtimeChannel { + return &RealtimeChannel{ + client: client, + topic: topic, + } +} + +// Perform callbacks on specific events. Successive calls to On() +// will result in multiple callbacks acting at the event +func (channel *RealtimeChannel) On() { + +} + +// Subscribe to the channel and start listening to events +func (channel *RealtimeChannel) Subscribe() error { + if channel.hasJoined { + return fmt.Errorf("The channel has already been subscribed") + } + + if channel.client.isClientAlive() { + + } + + return nil +} + +func (channel *RealtimeChannel) Unsubscribe() { + if channel.client.isClientAlive() { + + } +} diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 8a3603e..cb66c1b 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -25,6 +25,8 @@ type RealtimeClient struct { reconnectInterval time.Duration heartbeatDuration time.Duration heartbeatInterval time.Duration + + topics map[realtimeTopic]*RealtimeChannel } // Create a new RealtimeClient with user's speicfications @@ -44,6 +46,7 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { heartbeatDuration: 5 * time.Second, heartbeatInterval: 20 * time.Second, reconnectInterval: 500 * time.Millisecond, + topics: make(map[realtimeTopic]*RealtimeChannel), } } @@ -93,6 +96,15 @@ func (client *RealtimeClient) Disconnect() error { return nil } +// Create a new channel with given topic string +func (client *RealtimeClient) Channel(topicStr string) *RealtimeChannel { + newTopic := realtimeTopic(topicStr) + newChannel := CreateRealtimeChannel(client, newTopic) + client.topics[newTopic] = newChannel + + return newChannel +} + // Start sending heartbeats to the server to maintain connection func (client *RealtimeClient) startHeartbeats() { for client.isClientAlive() { From ff241df47899967076b8a1d66577bd626e10ac85 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Tue, 14 May 2024 22:59:12 -0400 Subject: [PATCH 02/21] un-export event constants --- realtime/events.go | 24 +++++++++++++----------- realtime/realtime_client.go | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 5ef5018..941073b 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -1,18 +1,20 @@ -package realtime; +package realtime -// Channel Events -const JOIN_EVENT = "phx_join" -const REPLY_EVENT = "phx_reply" +import ( + "fmt" + "reflect" +) -// DB Subscription Events -const POSTGRES_CHANGE_EVENT = "postgres_changes" +// Events that are used to communicate with the server +const ( + joinEvent string = "phx_join" + replyEvent string = "phx_reply" -// Broadcast Events -const BROADCAST_EVENT = "broadcast" + // DB Subscription Events + postgresChangesEvent string = "postgres_changes" -// Presence Events -const PRESENCE_STATE_EVENT = "presence_state" -const PRESENCE_DIFF_EVENT ="presence_diff" + // Broadcast Events + broadcastEvent string = "broadcast" // Other Events const SYS_EVENT = "system" diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index cb66c1b..010a210 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -136,7 +136,7 @@ func (client *RealtimeClient) startHeartbeats() { func (client *RealtimeClient) sendHeartbeat() error { msg := HearbeatMsg{ TemplateMsg: TemplateMsg{ - Event: HEARTBEAT_EVENT, + Event: heartbeatEvent, Topic: "phoenix", Ref: "", }, From 5d3880a420b7c772b9174e89527d279808d00caa Mon Sep 17 00:00:00 2001 From: Minh Au Date: Tue, 14 May 2024 23:47:20 -0400 Subject: [PATCH 03/21] add eventType and filter criteria checking --- realtime/events.go | 93 ++++++++++++++++++++++++++++++++++-- realtime/realtime_channel.go | 10 +++- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 941073b..8d3b6cb 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -3,6 +3,7 @@ package realtime import ( "fmt" "reflect" + "strings" ) // Events that are used to communicate with the server @@ -16,7 +17,91 @@ const ( // Broadcast Events broadcastEvent string = "broadcast" -// Other Events -const SYS_EVENT = "system" -const HEARTBEAT_EVENT = "heartbeat" -const ACCESS_TOKEN_EVENT = "access_token" + // Presence Events + presenceStateEvent string = "presence_state" + presenceDiffEvent string ="presence_diff" + + // Other Events + systemEvent string = "system" + heartbeatEvent string = "heartbeat" + accessTokennEvent string = "access_token" +) + +// Event "type" that the user can specify for channel to listen to +const ( + presenceEventType string = "presence" + broadcastEventType string = "broadcast" + postgresChangesEventType string = "postgres_changes" +) + +type eventFilter interface { + verifyFilter(map[string]string) +} + +type postgresFilter struct { + Event string + Schema string + Table string + Filter string +} + +type broadcastFilter struct { + event string +} + +type presenceFilter struct { + event string +} + +// Verify if the given event type is supported +func verifyEventType(eventType string) bool { + switch eventType { + case presenceEventType: + case broadcastEventType: + case postgresChangesEventType: + return true + } + + return false +} + + +// Enforce client's filter object to follow a specific message +// structure of certain events. Check messages.go for more +// information on the struct of each event. By default, +// non-supported events will return an error +// Only the following events are currently supported: +// + postgres_changes, broadcast, presence +func verifyFilter(eventType string, filter map[string]string) error { + var filterType reflect.Type + var missingFields []string + + switch eventType { + case postgresChangesEvent: + filterType = reflect.TypeOf(postgresFilter{}) + break + case broadcastEvent: + filterType = reflect.TypeOf(broadcastFilter{}) + break + case presenceEventType: + filterType = reflect.TypeOf(presenceFilter{}) + default: + return fmt.Errorf("Unsupported event type: %s", eventType) + } + + missingFields = make([]string, 0, filterType.NumField()) + for i := 0; i < filterType.NumField(); i++ { + currFieldName := filterType.Field(i).Name + currFieldName = strings.ToLower(currFieldName) + + if _, ok := filter[currFieldName]; !ok { + missingFields = append(missingFields, currFieldName) + } + } + + if len(missingFields) != 0 { + return fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields) + } + + return nil +} diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index ad3ad8e..920d435 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -20,8 +20,16 @@ func CreateRealtimeChannel(client *RealtimeClient, topic realtimeTopic) *Realtim // Perform callbacks on specific events. Successive calls to On() // will result in multiple callbacks acting at the event -func (channel *RealtimeChannel) On() { +func (channel *RealtimeChannel) On(eventType string, filter map[string]string, callback func(interface{})) error { + if !verifyEventType(eventType) { + return fmt.Errorf("invalid event type: %s", eventType) + } + if err := verifyFilter(eventType, filter); err != nil { + return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) + } + + return nil } // Subscribe to the channel and start listening to events From 71fe108c880ae4d7edee3927a15acabf225649c4 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Tue, 14 May 2024 23:53:29 -0400 Subject: [PATCH 04/21] allow optional criteria --- realtime/events.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 8d3b6cb..0a1a722 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -34,23 +34,19 @@ const ( postgresChangesEventType string = "postgres_changes" ) -type eventFilter interface { - verifyFilter(map[string]string) -} - type postgresFilter struct { - Event string - Schema string - Table string - Filter string + Event string `supabase:"required"` + Schema string `supabase:"required"` + Table string `supabase:"optional"` + Filter string `supabase:"optional"` } type broadcastFilter struct { - event string + event string `supabase:"required"` } type presenceFilter struct { - event string + event string `supabase:"required"` } // Verify if the given event type is supported @@ -91,10 +87,11 @@ func verifyFilter(eventType string, filter map[string]string) error { missingFields = make([]string, 0, filterType.NumField()) for i := 0; i < filterType.NumField(); i++ { - currFieldName := filterType.Field(i).Name - currFieldName = strings.ToLower(currFieldName) + currField := filterType.Field(i) + currFieldName := strings.ToLower(currField.Name) + isRequired := currField.Tag.Get("supabase") == "required" - if _, ok := filter[currFieldName]; !ok { + if _, ok := filter[currFieldName]; !ok && isRequired { missingFields = append(missingFields, currFieldName) } } From 5ae49b0a90b70e313466bcaacbe2bb02e56e776e Mon Sep 17 00:00:00 2001 From: Minh Au Date: Wed, 15 May 2024 23:45:40 -0400 Subject: [PATCH 05/21] set field and construct filters --- realtime/events.go | 54 ++++++++++++++++++++++++++++-------- realtime/realtime_channel.go | 6 ++-- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 0a1a722..f310569 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -34,6 +34,10 @@ const ( postgresChangesEventType string = "postgres_changes" ) +type eventFilter interface { + constructPayload() string +} + type postgresFilter struct { Event string `supabase:"required"` Schema string `supabase:"required"` @@ -49,6 +53,18 @@ type presenceFilter struct { event string `supabase:"required"` } +func (filter *postgresFilter) constructPayload() string { + return "" +} + +func (filter *broadcastFilter) constructPayload() string { + return "" +} + +func (filter *presenceFilter) constructPayload() string{ + return "" +} + // Verify if the given event type is supported func verifyEventType(eventType string) bool { switch eventType { @@ -64,41 +80,55 @@ func verifyEventType(eventType string) bool { // Enforce client's filter object to follow a specific message // structure of certain events. Check messages.go for more -// information on the struct of each event. By default, -// non-supported events will return an error +// information on the struct of each event. // Only the following events are currently supported: // + postgres_changes, broadcast, presence -func verifyFilter(eventType string, filter map[string]string) error { - var filterType reflect.Type +func createFilter(eventType string, filter map[string]string) (eventFilter, error) { + var filterType reflect.Type // Type for filter + var filterConValue reflect.Value // Concrete value + var filterPtrValue reflect.Value // Pointer value to the concrete value var missingFields []string switch eventType { case postgresChangesEvent: - filterType = reflect.TypeOf(postgresFilter{}) + filterPtrValue = reflect.ValueOf(&postgresFilter{}) break case broadcastEvent: - filterType = reflect.TypeOf(broadcastFilter{}) + filterPtrValue = reflect.ValueOf(&broadcastFilter{}) break case presenceEventType: - filterType = reflect.TypeOf(presenceFilter{}) + filterPtrValue = reflect.ValueOf(&presenceFilter{}) default: - return fmt.Errorf("Unsupported event type: %s", eventType) + return nil, fmt.Errorf("Unsupported event type: %s", eventType) } - missingFields = make([]string, 0, filterType.NumField()) + // Get the underlying filter type to identify missing fields + filterConValue = filterPtrValue.Elem() + filterType = filterConValue.Type() + missingFields = make([]string, 0, filterType.NumField()) + for i := 0; i < filterType.NumField(); i++ { currField := filterType.Field(i) currFieldName := strings.ToLower(currField.Name) isRequired := currField.Tag.Get("supabase") == "required" - if _, ok := filter[currFieldName]; !ok && isRequired { + val, ok := filter[currFieldName] + if !ok && isRequired { missingFields = append(missingFields, currFieldName) } + + // Set field to empty string when value for currFieldName is missing + filterConValue.Field(i).SetString(val) } if len(missingFields) != 0 { - return fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields) + return nil, fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields) + } + + filterFinal, ok := filterPtrValue.Interface().(eventFilter) + if !ok { + return nil, fmt.Errorf("Unexpected Error: cannot create event filter") } - return nil + return filterFinal, nil } diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 920d435..ace1547 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -24,11 +24,13 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c if !verifyEventType(eventType) { return fmt.Errorf("invalid event type: %s", eventType) } - - if err := verifyFilter(eventType, filter); err != nil { + eventFilter, err := createFilter(eventType, filter) + if err != nil { return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) } + fmt.Println(eventFilter) + return nil } From 7450fb3c9e13d951d3d3cde69e354bc089e95198 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 17 May 2024 01:06:29 -0400 Subject: [PATCH 06/21] implement connection message constructing based on eventFilter type --- realtime/events.go | 176 ++++++++++++++++------------------- realtime/messages.go | 52 ++++++++++- realtime/realtime_channel.go | 11 ++- realtime/realtime_client.go | 11 ++- 4 files changed, 150 insertions(+), 100 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index f310569..9a5bf43 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -8,127 +8,113 @@ import ( // Events that are used to communicate with the server const ( - joinEvent string = "phx_join" - replyEvent string = "phx_reply" + joinEvent string = "phx_join" + replyEvent string = "phx_reply" - // DB Subscription Events - postgresChangesEvent string = "postgres_changes" + // DB Subscription Events + postgresChangesEvent string = "postgres_changes" - // Broadcast Events - broadcastEvent string = "broadcast" + // Broadcast Events + broadcastEvent string = "broadcast" - // Presence Events - presenceStateEvent string = "presence_state" - presenceDiffEvent string ="presence_diff" + // Presence Events + presenceStateEvent string = "presence_state" + presenceDiffEvent string = "presence_diff" - // Other Events - systemEvent string = "system" - heartbeatEvent string = "heartbeat" - accessTokennEvent string = "access_token" + // Other Events + systemEvent string = "system" + heartbeatEvent string = "heartbeat" + accessTokennEvent string = "access_token" ) // Event "type" that the user can specify for channel to listen to const ( - presenceEventType string = "presence" - broadcastEventType string = "broadcast" - postgresChangesEventType string = "postgres_changes" + presenceEventType string = "presence" + broadcastEventType string = "broadcast" + postgresChangesEventType string = "postgres_changes" ) -type eventFilter interface { - constructPayload() string -} +// type eventFilter struct {} +type eventFilter interface {} type postgresFilter struct { - Event string `supabase:"required"` - Schema string `supabase:"required"` - Table string `supabase:"optional"` - Filter string `supabase:"optional"` + Event string `supabase:"required" json:"event"` + Schema string `supabase:"required" json:"schema"` + Table string `supabase:"optional" json:"table"` + Filter string `supabase:"optional" json:"filter"` } type broadcastFilter struct { - event string `supabase:"required"` + Event string `supabase:"required"` } type presenceFilter struct { - event string `supabase:"required"` -} - -func (filter *postgresFilter) constructPayload() string { - return "" -} - -func (filter *broadcastFilter) constructPayload() string { - return "" -} - -func (filter *presenceFilter) constructPayload() string{ - return "" + Event string `supabase:"required"` } // Verify if the given event type is supported func verifyEventType(eventType string) bool { - switch eventType { - case presenceEventType: - case broadcastEventType: - case postgresChangesEventType: - return true - } - - return false + switch eventType { + case presenceEventType: + case broadcastEventType: + case postgresChangesEventType: + return true + } + + return false } - // Enforce client's filter object to follow a specific message // structure of certain events. Check messages.go for more // information on the struct of each event. // Only the following events are currently supported: -// + postgres_changes, broadcast, presence -func createFilter(eventType string, filter map[string]string) (eventFilter, error) { - var filterType reflect.Type // Type for filter - var filterConValue reflect.Value // Concrete value - var filterPtrValue reflect.Value // Pointer value to the concrete value - var missingFields []string - - switch eventType { - case postgresChangesEvent: - filterPtrValue = reflect.ValueOf(&postgresFilter{}) - break - case broadcastEvent: - filterPtrValue = reflect.ValueOf(&broadcastFilter{}) - break - case presenceEventType: - filterPtrValue = reflect.ValueOf(&presenceFilter{}) - default: - return nil, fmt.Errorf("Unsupported event type: %s", eventType) - } - - // Get the underlying filter type to identify missing fields - filterConValue = filterPtrValue.Elem() - filterType = filterConValue.Type() - missingFields = make([]string, 0, filterType.NumField()) - - for i := 0; i < filterType.NumField(); i++ { - currField := filterType.Field(i) - currFieldName := strings.ToLower(currField.Name) - isRequired := currField.Tag.Get("supabase") == "required" - - val, ok := filter[currFieldName] - if !ok && isRequired { - missingFields = append(missingFields, currFieldName) - } - - // Set field to empty string when value for currFieldName is missing - filterConValue.Field(i).SetString(val) - } - - if len(missingFields) != 0 { - return nil, fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields) - } - - filterFinal, ok := filterPtrValue.Interface().(eventFilter) - if !ok { - return nil, fmt.Errorf("Unexpected Error: cannot create event filter") - } - - return filterFinal, nil +// - postgres_changes, broadcast, presence +func createEventFilter(eventType string, filter map[string]string) (eventFilter, error) { + var filterType reflect.Type // Type for filter + var filterConValue reflect.Value // Concrete value + var filterPtrValue reflect.Value // Pointer value to the concrete value + var missingFields []string + + switch eventType { + case postgresChangesEvent: + filterPtrValue = reflect.ValueOf(&postgresFilter{}) + break + case broadcastEvent: + filterPtrValue = reflect.ValueOf(&broadcastFilter{}) + break + case presenceEventType: + filterPtrValue = reflect.ValueOf(&presenceFilter{}) + default: + return nil, fmt.Errorf("Unsupported event type: %s", eventType) + } + + // Get the underlying filter type to identify missing fields + filterConValue = filterPtrValue.Elem() + filterType = filterConValue.Type() + missingFields = make([]string, 0, filterType.NumField()) + + for i := 0; i < filterType.NumField(); i++ { + currField := filterType.Field(i) + currFieldName := strings.ToLower(currField.Name) + isRequired := currField.Tag.Get("supabase") == "required" + + val, ok := filter[currFieldName] + if !ok && isRequired { + missingFields = append(missingFields, currFieldName) + } + + // Set field to empty string when value for currFieldName is missing + filterConValue.Field(i).SetString(val) + } + + if len(missingFields) != 0 { + return nil, fmt.Errorf("Criteria for %s is missing: %+v", eventType, missingFields) + } + + filterFinal, ok := filterConValue.Interface().(eventFilter) + if !ok { + return nil, fmt.Errorf("Unexpected Error: cannot create event filter") + } + + return filterFinal, nil } diff --git a/realtime/messages.go b/realtime/messages.go index 21b9872..8e76e6a 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -7,7 +7,25 @@ type TemplateMsg struct { } type ConnectionMsg struct { - TemplateMsg + *TemplateMsg + + Payload struct { + Config struct { + Broadcast struct { + Self bool `json:"self"` + } `json:"broadcast"` + + Presence struct { + Key string `json:"key"` + } `json:"presence"` + + PostgresChanges []postgresFilter `json:"postgres_changes"` + } `json:"config"` + } `json:"payload"` +} + +type PostgresCDCMsg struct { + *TemplateMsg Payload struct { Data struct { @@ -23,8 +41,38 @@ type ConnectionMsg struct { } type HearbeatMsg struct { - TemplateMsg + *TemplateMsg Payload struct { } `json:"payload"` } + +// create a template message +func createTemplateMessage(event string, topic realtimeTopic) *TemplateMsg { + return &TemplateMsg{ + Event: event, + Topic: string(topic), + Ref: "", + } +} + +// create a connection message depending on event type +func createConnectionMessage(topic realtimeTopic, filter eventFilter) *ConnectionMsg { + msg := &ConnectionMsg{} + + // Common part across the three event type + msg.TemplateMsg = createTemplateMessage(joinEvent, topic) + switch filter.(type) { + case postgresFilter: + msg.Payload.Config.PostgresChanges = []postgresFilter{filter.(postgresFilter)} + break + case broadcastFilter: + msg.Payload.Config.Broadcast.Self = true + break + case presenceFilter: + msg.Payload.Config.Presence.Key = "" + break + } + + return msg +} diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index ace1547..e3d7869 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -24,12 +24,19 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c if !verifyEventType(eventType) { return fmt.Errorf("invalid event type: %s", eventType) } - eventFilter, err := createFilter(eventType, filter) + eventFilter, err := createEventFilter(eventType, filter) if err != nil { return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) } - fmt.Println(eventFilter) + fmt.Printf("%+v\n", eventFilter) + msg := createConnectionMessage(channel.topic, eventFilter) + fmt.Printf("%+v\n", msg) + // binding{ + // msg: msg, + // callback: callback, + // channel: channel, + // } return nil } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 010a210..34076b2 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -1,6 +1,7 @@ package realtime import ( + "container/list" "context" "errors" "fmt" @@ -26,9 +27,17 @@ type RealtimeClient struct { heartbeatDuration time.Duration heartbeatInterval time.Duration + msgQueue *list.List + topics map[realtimeTopic]*RealtimeChannel } +type binding struct { + msg *ConnectionMsg + callback func(interface{}) + channel *RealtimeChannel +} + // Create a new RealtimeClient with user's speicfications func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { realtimeUrl := fmt.Sprintf( @@ -135,7 +144,7 @@ func (client *RealtimeClient) startHeartbeats() { // Send the heartbeat to the realtime server func (client *RealtimeClient) sendHeartbeat() error { msg := HearbeatMsg{ - TemplateMsg: TemplateMsg{ + TemplateMsg: &TemplateMsg{ Event: heartbeatEvent, Topic: "phoenix", Ref: "", From 9dee7c7cec917c47fec028bf8ac83a02b11238ca Mon Sep 17 00:00:00 2001 From: Minh Au Date: Sat, 25 May 2024 17:19:47 -0400 Subject: [PATCH 07/21] implement addBinding with multiple queues, remove realtimeTopic type, prevent same topic channel --- realtime/messages.go | 6 +++--- realtime/realtime_channel.go | 19 +++++++++--------- realtime/realtime_client.go | 37 ++++++++++++++++++++++++++---------- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 8e76e6a..40d4451 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -48,16 +48,16 @@ type HearbeatMsg struct { } // create a template message -func createTemplateMessage(event string, topic realtimeTopic) *TemplateMsg { +func createTemplateMessage(event string, topic string) *TemplateMsg { return &TemplateMsg{ Event: event, - Topic: string(topic), + Topic: topic, Ref: "", } } // create a connection message depending on event type -func createConnectionMessage(topic realtimeTopic, filter eventFilter) *ConnectionMsg { +func createConnectionMessage(topic string, filter eventFilter) *ConnectionMsg { msg := &ConnectionMsg{} // Common part across the three event type diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index e3d7869..5eaadfa 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -2,16 +2,14 @@ package realtime import "fmt" -type realtimeTopic string // internal string type for representing topics - type RealtimeChannel struct { - topic realtimeTopic + topic string client *RealtimeClient hasJoined bool } // Initialize a new channel -func CreateRealtimeChannel(client *RealtimeClient, topic realtimeTopic) *RealtimeChannel { +func CreateRealtimeChannel(client *RealtimeClient, topic string) *RealtimeChannel { return &RealtimeChannel{ client: client, topic: topic, @@ -20,7 +18,7 @@ func CreateRealtimeChannel(client *RealtimeClient, topic realtimeTopic) *Realtim // Perform callbacks on specific events. Successive calls to On() // will result in multiple callbacks acting at the event -func (channel *RealtimeChannel) On(eventType string, filter map[string]string, callback func(interface{})) error { +func (channel *RealtimeChannel) On(eventType string, filter map[string]string, callback func(any)) error { if !verifyEventType(eventType) { return fmt.Errorf("invalid event type: %s", eventType) } @@ -32,11 +30,12 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c fmt.Printf("%+v\n", eventFilter) msg := createConnectionMessage(channel.topic, eventFilter) fmt.Printf("%+v\n", msg) - // binding{ - // msg: msg, - // callback: callback, - // channel: channel, - // } + newBinding := binding{ + msg: msg, + callback: callback, + } + + channel.client.addBinding(channel.topic, newBinding) return nil } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 34076b2..787ab7c 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -27,15 +27,14 @@ type RealtimeClient struct { heartbeatDuration time.Duration heartbeatInterval time.Duration - msgQueue *list.List - - topics map[realtimeTopic]*RealtimeChannel + currentTopics map[string]struct{} + bindingQueue map[string]*list.List + bindingSubscription map[string]*list.List } type binding struct { msg *ConnectionMsg - callback func(interface{}) - channel *RealtimeChannel + callback func(any) } // Create a new RealtimeClient with user's speicfications @@ -55,7 +54,7 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { heartbeatDuration: 5 * time.Second, heartbeatInterval: 20 * time.Second, reconnectInterval: 500 * time.Millisecond, - topics: make(map[realtimeTopic]*RealtimeChannel), + currentTopics: make(map[string]struct{}), } } @@ -106,12 +105,15 @@ func (client *RealtimeClient) Disconnect() error { } // Create a new channel with given topic string -func (client *RealtimeClient) Channel(topicStr string) *RealtimeChannel { - newTopic := realtimeTopic(topicStr) +func (client *RealtimeClient) Channel(newTopic string) (*RealtimeChannel, error) { + if _, ok := client.currentTopics[newTopic]; !ok { + return nil, fmt.Errorf("Error: channel with %v topic already created", newTopic) + } + newChannel := CreateRealtimeChannel(client, newTopic) - client.topics[newTopic] = newChannel + client.currentTopics[newTopic] = struct{}{} - return newChannel + return newChannel, nil } // Start sending heartbeats to the server to maintain connection @@ -225,6 +227,21 @@ func (client *RealtimeClient) isClientAlive() bool { return true } +// Add event bindings to the bindingQueue +func (client *RealtimeClient) addBinding(topic string, newBinding binding) { + queue, ok := client.bindingQueue[topic] + + // Add a queue for the topic if not already existed + if !ok { + queue = list.New() + client.bindingQueue[topic] = queue + } + + client.mu.Lock() + queue.PushBack(newBinding) + client.mu.Unlock() +} + // The underlying package of websocket returns an error if the connection is // terminated on the server side. Therefore, the state of the connection can // be achieved by investigating the error From 37982ce3594ba7be06b186e75c6cc37dc80dac25 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 10 Jun 2024 22:31:15 -0400 Subject: [PATCH 08/21] send connection message and process response from server --- realtime/events.go | 4 +- realtime/messages.go | 142 ++++++++++++++++++++------------ realtime/realtime_channel.go | 14 +--- realtime/realtime_client.go | 155 +++++++++++++++++++++++++++++------ 4 files changed, 223 insertions(+), 92 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 9a5bf43..95bb174 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -40,8 +40,8 @@ type eventFilter interface {} type postgresFilter struct { Event string `supabase:"required" json:"event"` Schema string `supabase:"required" json:"schema"` - Table string `supabase:"optional" json:"table"` - Filter string `supabase:"optional" json:"filter"` + Table string `supabase:"optional" json:"table,omitempty"` + Filter string `supabase:"optional" json:"filter,omitempty"` } type broadcastFilter struct { diff --git a/realtime/messages.go b/realtime/messages.go index 40d4451..04ac9d8 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -1,78 +1,112 @@ -package realtime; +package realtime + +import ( + "encoding/json" +) type TemplateMsg struct { - Event string `json:"event"` - Topic string `json:"topic"` - Ref string `json:"ref"` + Event string `json:"event"` + Topic string `json:"topic"` + Ref string `json:"ref"` +} + +type AbstractMsg struct { + *TemplateMsg + Payload json.RawMessage `json:"payload"` } type ConnectionMsg struct { - *TemplateMsg + *TemplateMsg - Payload struct { - Config struct { - Broadcast struct { - Self bool `json:"self"` - } `json:"broadcast"` + Payload struct { + Config struct { + Broadcast struct { + Self bool `json:"self"` + } `json:"broadcast,omitempty"` - Presence struct { - Key string `json:"key"` - } `json:"presence"` + Presence struct { + Key string `json:"key"` + } `json:"presence,omitempty"` - PostgresChanges []postgresFilter `json:"postgres_changes"` - } `json:"config"` - } `json:"payload"` + PostgresChanges []postgresFilter `json:"postgres_changes,omitempty"` + } `json:"config"` + } `json:"payload"` } type PostgresCDCMsg struct { - *TemplateMsg - - Payload struct { - Data struct { - Schema string `json:"schema"` - Table string `json:"table"` - CommitTime string `json:"commit_timestamp"` - EventType string `json:"eventType"` - New map[string]string `json:"new"` - Old map[string]string `json:"old"` - Errors string `json:"errors"` - } `json:"data"` - } `json:"payload"` + *TemplateMsg + + Payload struct { + Data struct { + Schema string `json:"schema"` + Table string `json:"table"` + CommitTime string `json:"commit_timestamp"` + EventType string `json:"eventType"` + New map[string]string `json:"new"` + Old map[string]string `json:"old"` + Errors string `json:"errors"` + } `json:"data"` + } `json:"payload"` +} + +type ReplyPayload struct { + Response struct { + PostgresChanges []struct{ + ID int `json:"id"` + postgresFilter + } `json:"postgres_changes"` + } `json:"response"` + Status string `json:"status"` +} + +type SystemPayload struct { + Channel string `json:"channel"` + Extension string `json:"extension"` + Message string `json:"message"` + Status string `json:"status"` +} + +// presence_state can contain any key. Hence map type instead of struct +type PresenceStatePayload map[string]struct{ + Metas []struct{ + Ref string `json:"phx_ref"` + Name string `json:"name"` + T float64 `json:"t"` + } `json:"metas,omitempty"` } type HearbeatMsg struct { - *TemplateMsg + *TemplateMsg - Payload struct { - } `json:"payload"` + Payload struct { + } `json:"payload"` } // create a template message func createTemplateMessage(event string, topic string) *TemplateMsg { - return &TemplateMsg{ - Event: event, - Topic: topic, - Ref: "", - } + return &TemplateMsg{ + Event: event, + Topic: topic, + Ref: "", + } } // create a connection message depending on event type func createConnectionMessage(topic string, filter eventFilter) *ConnectionMsg { - msg := &ConnectionMsg{} - - // Common part across the three event type - msg.TemplateMsg = createTemplateMessage(joinEvent, topic) - switch filter.(type) { - case postgresFilter: - msg.Payload.Config.PostgresChanges = []postgresFilter{filter.(postgresFilter)} - break - case broadcastFilter: - msg.Payload.Config.Broadcast.Self = true - break - case presenceFilter: - msg.Payload.Config.Presence.Key = "" - break - } - - return msg + msg := &ConnectionMsg{} + + // Common part across the three event type + msg.TemplateMsg = createTemplateMessage(joinEvent, topic) + switch filter.(type) { + case postgresFilter: + msg.Payload.Config.PostgresChanges = []postgresFilter{filter.(postgresFilter)} + break + case broadcastFilter: + msg.Payload.Config.Broadcast.Self = true + break + case presenceFilter: + msg.Payload.Config.Presence.Key = "" + break + } + return msg } diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 5eaadfa..45723dc 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -27,28 +27,22 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) } - fmt.Printf("%+v\n", eventFilter) msg := createConnectionMessage(channel.topic, eventFilter) - fmt.Printf("%+v\n", msg) newBinding := binding{ msg: msg, callback: callback, } - channel.client.addBinding(channel.topic, newBinding) + channel.client.addBinding(newBinding) return nil } // Subscribe to the channel and start listening to events func (channel *RealtimeChannel) Subscribe() error { - if channel.hasJoined { - return fmt.Errorf("The channel has already been subscribed") - } - - if channel.client.isClientAlive() { - - } + if err := channel.client.subscribe(); err != nil { + return fmt.Errorf("Channel %s failed to subscribe: %w", channel.topic, err) + } return nil } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 787ab7c..ddb7b61 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -3,6 +3,7 @@ package realtime import ( "container/list" "context" + "encoding/json" "errors" "fmt" "io" @@ -27,9 +28,10 @@ type RealtimeClient struct { heartbeatDuration time.Duration heartbeatInterval time.Duration - currentTopics map[string]struct{} - bindingQueue map[string]*list.List - bindingSubscription map[string]*list.List + bindingQueue *list.List + bindingSubscription map[string]binding + replyChan chan string + currentTopics map[string][]string } type binding struct { @@ -54,7 +56,9 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { heartbeatDuration: 5 * time.Second, heartbeatInterval: 20 * time.Second, reconnectInterval: 500 * time.Millisecond, - currentTopics: make(map[string]struct{}), + + currentTopics: make(map[string][]string), + bindingQueue: list.New(), } } @@ -64,18 +68,18 @@ func (client *RealtimeClient) Connect() error { return nil } + // Change status of client to alive + client.closed = make(chan struct{}) + // Attempt to dial the server err := client.dialServer() if err != nil { + close(client.closed) return fmt.Errorf("Cannot connect to the server: %w", err) } - // client is only alive after the connection has been made - client.mu.Lock() - client.closed = make(chan struct{}) - client.mu.Unlock() - go client.startHeartbeats() + go client.startListening() return nil } @@ -104,14 +108,50 @@ func (client *RealtimeClient) Disconnect() error { return nil } +// Begins subscribing to events in the bindingQueue +func (client *RealtimeClient) subscribe() error { + if !client.isClientAlive() { + client.Connect() + } + + if client.replyChan != nil { + client.replyChan = make(chan string) + } + bindNode := client.bindingQueue.Front() + + // TODO: take in a context to allow user to cancel subcribing + // TODO: add a way to either roll back (unscribe) if an error encounters or + // disconnect to client and close connection + for bindNode != nil { + bind := bindNode.Value.(binding) + + err := wsjson.Write(context.Background(), client.conn, bind.msg) + if err != nil { + return fmt.Errorf("Unable to subscribe to the event %v: %v", *bind.msg, err) + } + + select { + case rep, ok := <- client.replyChan: + if !ok { + return fmt.Errorf("Error: Unable to subscribe to the event %v succesfully", bind.msg) + } + client.bindingSubscription[rep] = bind + break + } + bindNode = bindNode.Next() + } + + return nil +} + // Create a new channel with given topic string func (client *RealtimeClient) Channel(newTopic string) (*RealtimeChannel, error) { - if _, ok := client.currentTopics[newTopic]; !ok { + if _, ok := client.currentTopics[newTopic]; ok { return nil, fmt.Errorf("Error: channel with %v topic already created", newTopic) } - - newChannel := CreateRealtimeChannel(client, newTopic) - client.currentTopics[newTopic] = struct{}{} + newChannel := CreateRealtimeChannel(client, "realtime:" + newTopic) + // TODO: Fix currentTopics + client.currentTopics[newTopic] = []string{} return newChannel, nil } @@ -167,12 +207,85 @@ func (client *RealtimeClient) sendHeartbeat() error { return nil } +// Keep reading from the connection from the connection +func (client *RealtimeClient) startListening() { + ctx := context.Background() + + for client.isClientAlive() { + var msg AbstractMsg + + // Read from the connection + err := wsjson.Read(ctx, client.conn, &msg) + + // Check if there's a way to partially marshal bytes into an object + // Or check if polymorphism in go (from TemplateMsg to another type of messg) + if err != nil { + if client.isConnectionAlive(err) { + client.logger.Printf("Unexpected error while listening: %v", err) + } else { + // Quick sleep to prevent taking up CPU cycles. + // Client should be able to reconnect automatically if it's still alive + time.Sleep(client.reconnectInterval) + } + } else { + // Spawn a new thread to process the server's respond + go client.processMessage(msg) + } + } +} + +// Process the given message according certain events +func (client *RealtimeClient) processMessage(msg AbstractMsg) { + client.logger.Printf("Header: %+v", *msg.TemplateMsg) + client.logger.Printf("Payload: %+v", string(msg.Payload)) + genericPayload, err := client.unmarshalPayload(msg) + if err != nil { + client.logger.Printf("Unable to process received message: %v", err) + client.logger.Printf("%v", genericPayload) + return + } + + switch payload := genericPayload.(type) { + case *ReplyPayload: + client.logger.Printf("Processing: %+v", payload) + break + } +} + +func (client *RealtimeClient) unmarshalPayload(msg AbstractMsg) (any, error) { + var payload any + var err error + + // Parse the payload depending on the event type + switch msg.Event { + case replyEvent: + payload = new(ReplyPayload) + break + case postgresChangesEvent: + break + case systemEvent: + payload = new(SystemPayload) + break + case presenceStateEvent: + payload = new(PresenceStatePayload) + break + default: + return struct{}{}, fmt.Errorf("Error: Unsupported event %v", msg.Event) + } + + err = json.Unmarshal(msg.Payload, payload) + if err != nil { + return struct{}{}, fmt.Errorf("Error: Unable to unmarshal payload: %v", err) + } + return payload, nil +} + // Dial the server with a certain timeout in seconds func (client *RealtimeClient) dialServer() error { client.mu.Lock() defer client.mu.Unlock() - if client.isClientAlive() { + if !client.isClientAlive() { return nil } @@ -228,18 +341,8 @@ func (client *RealtimeClient) isClientAlive() bool { } // Add event bindings to the bindingQueue -func (client *RealtimeClient) addBinding(topic string, newBinding binding) { - queue, ok := client.bindingQueue[topic] - - // Add a queue for the topic if not already existed - if !ok { - queue = list.New() - client.bindingQueue[topic] = queue - } - - client.mu.Lock() - queue.PushBack(newBinding) - client.mu.Unlock() +func (client *RealtimeClient) addBinding(newBinding binding) { + client.bindingQueue.PushBack(newBinding) } // The underlying package of websocket returns an error if the connection is From 46a6323c4bdd570f8db92c084eda145e4bab4685 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 10 Jun 2024 23:21:01 -0400 Subject: [PATCH 09/21] route ID to the correct binding --- realtime/messages.go | 13 ++++++++++++ realtime/realtime_client.go | 40 +++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 04ac9d8..73932f9 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -66,6 +66,19 @@ type SystemPayload struct { Status string `json:"status"` } +type PostgresCDCPayload struct { + Data struct { + Schema string `json:"schema"` + Table string `json:"table"` + CommitTime string `json:"commit_timestamp"` + EventType string `json:"eventType"` + New map[string]string `json:"new"` + Old map[string]string `json:"old"` + Errors string `json:"errors"` + } `json:"data"` + IDs []int `json:"ids"` +} + // presence_state can contain any key. Hence map type instead of struct type PresenceStatePayload map[string]struct{ Metas []struct{ diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index ddb7b61..b4cd315 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -29,8 +29,8 @@ type RealtimeClient struct { heartbeatInterval time.Duration bindingQueue *list.List - bindingSubscription map[string]binding - replyChan chan string + bindingSubscription map[int]binding + replyChan chan int currentTopics map[string][]string } @@ -57,8 +57,9 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { heartbeatInterval: 20 * time.Second, reconnectInterval: 500 * time.Millisecond, - currentTopics: make(map[string][]string), bindingQueue: list.New(), + currentTopics: make(map[string][]string), + bindingSubscription: make(map[int]binding), } } @@ -114,14 +115,15 @@ func (client *RealtimeClient) subscribe() error { client.Connect() } - if client.replyChan != nil { - client.replyChan = make(chan string) + if client.replyChan == nil { + client.replyChan = make(chan int) } bindNode := client.bindingQueue.Front() // TODO: take in a context to allow user to cancel subcribing // TODO: add a way to either roll back (unscribe) if an error encounters or // disconnect to client and close connection + // TODO: add a way to detect error if system returns status error for bindNode != nil { bind := bindNode.Value.(binding) @@ -130,11 +132,13 @@ func (client *RealtimeClient) subscribe() error { return fmt.Errorf("Unable to subscribe to the event %v: %v", *bind.msg, err) } + client.logger.Print("Waiting for a reply") select { case rep, ok := <- client.replyChan: if !ok { return fmt.Errorf("Error: Unable to subscribe to the event %v succesfully", bind.msg) } + client.logger.Printf("Register Postgres ID: %v", rep) client.bindingSubscription[rep] = bind break } @@ -236,8 +240,6 @@ func (client *RealtimeClient) startListening() { // Process the given message according certain events func (client *RealtimeClient) processMessage(msg AbstractMsg) { - client.logger.Printf("Header: %+v", *msg.TemplateMsg) - client.logger.Printf("Payload: %+v", string(msg.Payload)) genericPayload, err := client.unmarshalPayload(msg) if err != nil { client.logger.Printf("Unable to process received message: %v", err) @@ -247,7 +249,28 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { switch payload := genericPayload.(type) { case *ReplyPayload: - client.logger.Printf("Processing: %+v", payload) + changes := payload.Response.PostgresChanges + status := payload.Status + + if len(changes) == 0 || status != "ok" || changes[0].ID == 0 { + client.logger.Printf("Received: %+v", payload) + } else { + client.replyChan <- changes[0].ID + } + break + case *PostgresCDCPayload: + if len(payload.IDs) == 0 { + client.logger.Print("Unexected error: CDC message doesn't have any ids") + } + for _, id := range payload.IDs { + bind, ok := client.bindingSubscription[id] + if !ok { + client.logger.Printf("Unexpected error: ID %v is not recognized", id) + continue + } + bind.callback(payload.Data) + } + // client.bindingSubscription[payload.IDs[0]] break } } @@ -262,6 +285,7 @@ func (client *RealtimeClient) unmarshalPayload(msg AbstractMsg) (any, error) { payload = new(ReplyPayload) break case postgresChangesEvent: + payload = new(PostgresCDCPayload) break case systemEvent: payload = new(SystemPayload) From 44ac1b6632c4d87ffd73eb0d54afc077cb2d7419 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 10 Jun 2024 23:36:46 -0400 Subject: [PATCH 10/21] correctly unmarshal postgres changes --- realtime/messages.go | 18 +++++++++++------- realtime/realtime_client.go | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 73932f9..7b18936 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -68,13 +68,17 @@ type SystemPayload struct { type PostgresCDCPayload struct { Data struct { - Schema string `json:"schema"` - Table string `json:"table"` - CommitTime string `json:"commit_timestamp"` - EventType string `json:"eventType"` - New map[string]string `json:"new"` - Old map[string]string `json:"old"` - Errors string `json:"errors"` + Schema string `json:"schema"` + Table string `json:"table"` + CommitTime string `json:"commit_timestamp"` + Record map[string]any `json:"record"` + Columns []struct{ + Name string `json:"name"` + Type string `json:"type"` + } `json:"columns"` + EventType string `json:"type"` + Old map[string]any `json:"old_record"` + Errors string `json:"errors"` } `json:"data"` IDs []int `json:"ids"` } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index b4cd315..d55e290 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -259,6 +259,7 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { } break case *PostgresCDCPayload: + client.logger.Printf("Processing: %+v", string(msg.Payload)) if len(payload.IDs) == 0 { client.logger.Print("Unexected error: CDC message doesn't have any ids") } From 7a4aab3d4a051c37942375b7561d167c4c7f78fd Mon Sep 17 00:00:00 2001 From: Minh Au Date: Tue, 11 Jun 2024 23:41:23 -0400 Subject: [PATCH 11/21] able to subscribe to multiple events --- realtime/messages.go | 48 ++++++++++++++------- realtime/realtime_channel.go | 70 +++++++++++++++++++++++++++---- realtime/realtime_client.go | 81 ++++++++++++++---------------------- 3 files changed, 126 insertions(+), 73 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 7b18936..40c4954 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -1,6 +1,7 @@ package realtime import ( + "container/list" "encoding/json" ) @@ -109,21 +110,40 @@ func createTemplateMessage(event string, topic string) *TemplateMsg { } // create a connection message depending on event type -func createConnectionMessage(topic string, filter eventFilter) *ConnectionMsg { +func createConnectionMessage(topic string, bindings *list.List) *ConnectionMsg { msg := &ConnectionMsg{} + bindNode := bindings.Front() + + // Fill out the message template + msg.TemplateMsg = createTemplateMessage(joinEvent, topic) + + // Fill out the payload + for bindNode != nil { + bind, ok := bindNode.Value.(binding) + if !ok { + panic("TYPE ASSERTION FAILED: expecting type binding") + } + + filter := bind.filter + switch filter.(type) { + case postgresFilter: + if msg.Payload.Config.PostgresChanges == nil { + msg.Payload.Config.PostgresChanges = make([]postgresFilter, 0, 1) + } + msg.Payload.Config.PostgresChanges = append(msg.Payload.Config.PostgresChanges, filter.(postgresFilter)) + break + case broadcastFilter: + msg.Payload.Config.Broadcast.Self = true + break + case presenceFilter: + msg.Payload.Config.Presence.Key = "" + break + default: + panic("TYPE ASSERTION FAILED: expecting one of postgresFilter, broadcastFilter, or presenceFilter") + } + + bindNode = bindNode.Next() + } - // Common part across the three event type - msg.TemplateMsg = createTemplateMessage(joinEvent, topic) - switch filter.(type) { - case postgresFilter: - msg.Payload.Config.PostgresChanges = []postgresFilter{filter.(postgresFilter)} - break - case broadcastFilter: - msg.Payload.Config.Broadcast.Self = true - break - case presenceFilter: - msg.Payload.Config.Presence.Key = "" - break - } return msg } diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 45723dc..b0c8905 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -1,11 +1,18 @@ package realtime -import "fmt" +import ( + "container/list" + "context" + "fmt" +) type RealtimeChannel struct { topic string client *RealtimeClient - hasJoined bool + + bindings *list.List + bindingsMap map[int]binding + hasSubscribed bool } // Initialize a new channel @@ -13,6 +20,9 @@ func CreateRealtimeChannel(client *RealtimeClient, topic string) *RealtimeChanne return &RealtimeChannel{ client: client, topic: topic, + bindings: list.New(), + bindingsMap: make(map[int]binding), + hasSubscribed: false, } } @@ -22,28 +32,59 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c if !verifyEventType(eventType) { return fmt.Errorf("invalid event type: %s", eventType) } + eventFilter, err := createEventFilter(eventType, filter) if err != nil { return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) } - msg := createConnectionMessage(channel.topic, eventFilter) newBinding := binding{ - msg: msg, + eventType: eventType, + filter: eventFilter, callback: callback, } - - channel.client.addBinding(newBinding) + channel.bindings.PushBack(newBinding) return nil } // Subscribe to the channel and start listening to events -func (channel *RealtimeChannel) Subscribe() error { - if err := channel.client.subscribe(); err != nil { - return fmt.Errorf("Channel %s failed to subscribe: %w", channel.topic, err) +func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { + if channel.hasSubscribed { + return fmt.Errorf("Error: Channel %s can only be subscribed once", channel.topic) + } + + // Do nothing if there are no bindings + if channel.bindings.Len() == 0 { + return nil } + ids, err := channel.client.subscribe(channel.topic, channel.bindings, ctx) + if err != nil { + return fmt.Errorf("Channel %s failed to subscribe: %v", channel.topic, err) + } + + bindNode := channel.bindings.Front() + for _, id := range ids { + if bindNode == nil { + channel.Unsubscribe() + return fmt.Errorf("Error: the number subscribed events are not equal to the total events") + } + + binding, ok := bindNode.Value.(binding) + if !ok { + panic("TYPE ASSERTION FAILED: expecting type binding") + } + + if binding.eventType == postgresChangesEvent { + channel.bindingsMap[id] = binding; + } + + bindNode = bindNode.Next() + } + + channel.hasSubscribed = true + return nil } @@ -51,4 +92,15 @@ func (channel *RealtimeChannel) Unsubscribe() { if channel.client.isClientAlive() { } + channel.hasSubscribed = false +} + +// Route the id of triggered event to appropriate callback +func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCPayload) { + binding, ok := channel.bindingsMap[id] + if !ok { + channel.client.logger.Printf("Error: Unrecognized id %v", id) + } + + go binding.callback(payload) } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index d55e290..d6f5069 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -28,14 +28,13 @@ type RealtimeClient struct { heartbeatDuration time.Duration heartbeatInterval time.Duration - bindingQueue *list.List - bindingSubscription map[int]binding - replyChan chan int - currentTopics map[string][]string + replyChan chan []int + currentTopics map[string]*RealtimeChannel } type binding struct { - msg *ConnectionMsg + eventType string + filter eventFilter callback func(any) } @@ -57,9 +56,7 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { heartbeatInterval: 20 * time.Second, reconnectInterval: 500 * time.Millisecond, - bindingQueue: list.New(), - currentTopics: make(map[string][]string), - bindingSubscription: make(map[int]binding), + currentTopics: make(map[string]*RealtimeChannel), } } @@ -110,42 +107,29 @@ func (client *RealtimeClient) Disconnect() error { } // Begins subscribing to events in the bindingQueue -func (client *RealtimeClient) subscribe() error { +func (client *RealtimeClient) subscribe(topic string, bindings *list.List, ctx context.Context) ([]int, error) { if !client.isClientAlive() { client.Connect() } if client.replyChan == nil { - client.replyChan = make(chan int) + client.replyChan = make(chan []int) } - bindNode := client.bindingQueue.Front() - // TODO: take in a context to allow user to cancel subcribing - // TODO: add a way to either roll back (unscribe) if an error encounters or - // disconnect to client and close connection - // TODO: add a way to detect error if system returns status error - for bindNode != nil { - bind := bindNode.Value.(binding) - - err := wsjson.Write(context.Background(), client.conn, bind.msg) - if err != nil { - return fmt.Errorf("Unable to subscribe to the event %v: %v", *bind.msg, err) - } - - client.logger.Print("Waiting for a reply") - select { - case rep, ok := <- client.replyChan: - if !ok { - return fmt.Errorf("Error: Unable to subscribe to the event %v succesfully", bind.msg) - } - client.logger.Printf("Register Postgres ID: %v", rep) - client.bindingSubscription[rep] = bind - break - } - bindNode = bindNode.Next() + msg := createConnectionMessage(topic, bindings) + err := wsjson.Write(context.Background(), client.conn, msg) + if err != nil { + return nil, fmt.Errorf("Unable to send the connection message: %v", err) + } + select { + case rep, ok := <- client.replyChan: + if !ok { + return nil, fmt.Errorf("Error: Unable to subscribe to the channel %v succesfully", msg.Topic) + } + return rep, nil + case <- ctx.Done(): + return nil, fmt.Errorf("Error: Subscribing to to the channel %v has been canceled", msg.Topic) } - - return nil } // Create a new channel with given topic string @@ -154,8 +138,7 @@ func (client *RealtimeClient) Channel(newTopic string) (*RealtimeChannel, error) return nil, fmt.Errorf("Error: channel with %v topic already created", newTopic) } newChannel := CreateRealtimeChannel(client, "realtime:" + newTopic) - // TODO: Fix currentTopics - client.currentTopics[newTopic] = []string{} + client.currentTopics["realtime:" + newTopic] = newChannel return newChannel, nil } @@ -255,23 +238,26 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { if len(changes) == 0 || status != "ok" || changes[0].ID == 0 { client.logger.Printf("Received: %+v", payload) } else { - client.replyChan <- changes[0].ID + ids := make([]int, len(changes)) + for i, change := range changes { + ids[i] = change.ID + } + client.replyChan <- ids } break case *PostgresCDCPayload: - client.logger.Printf("Processing: %+v", string(msg.Payload)) if len(payload.IDs) == 0 { - client.logger.Print("Unexected error: CDC message doesn't have any ids") + client.logger.Print("Unexpected error: CDC message doesn't have any ids") } for _, id := range payload.IDs { - bind, ok := client.bindingSubscription[id] + targetedChannel, ok := client.currentTopics[msg.Topic] if !ok { - client.logger.Printf("Unexpected error: ID %v is not recognized", id) + client.logger.Printf("Error: Unrecognized topic %v", msg.Topic) continue } - bind.callback(payload.Data) + + targetedChannel.routePostgresEvent(id, payload) } - // client.bindingSubscription[payload.IDs[0]] break } } @@ -365,11 +351,6 @@ func (client *RealtimeClient) isClientAlive() bool { return true } -// Add event bindings to the bindingQueue -func (client *RealtimeClient) addBinding(newBinding binding) { - client.bindingQueue.PushBack(newBinding) -} - // The underlying package of websocket returns an error if the connection is // terminated on the server side. Therefore, the state of the connection can // be achieved by investigating the error From 2b1d69a4502a12d0bafd69ed640483d024f1d38b Mon Sep 17 00:00:00 2001 From: Minh Au Date: Wed, 12 Jun 2024 23:12:06 -0400 Subject: [PATCH 12/21] match action/event type for postgres event at route --- realtime/messages.go | 2 +- realtime/realtime_channel.go | 18 ++++++++++++++++-- realtime/realtime_client.go | 2 +- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 40c4954..dc9c886 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -77,7 +77,7 @@ type PostgresCDCPayload struct { Name string `json:"name"` Type string `json:"type"` } `json:"columns"` - EventType string `json:"type"` + ActionType string `json:"type"` Old map[string]any `json:"old_record"` Errors string `json:"errors"` } `json:"data"` diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index b0c8905..4265bbc 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -43,6 +43,7 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c filter: eventFilter, callback: callback, } + channel.bindings.PushBack(newBinding) return nil @@ -63,7 +64,6 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { if err != nil { return fmt.Errorf("Channel %s failed to subscribe: %v", channel.topic, err) } - bindNode := channel.bindings.Front() for _, id := range ids { if bindNode == nil { @@ -101,6 +101,20 @@ func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCP if !ok { channel.client.logger.Printf("Error: Unrecognized id %v", id) } + + bindFilter, ok := binding.filter.(postgresFilter) + if !ok { + panic("TYPE ASSERTION FAILED: expecting type postgresFilter") + } - go binding.callback(payload) + // Match * | INSERT | UPDATE | DELETE + switch bindFilter.Event { + case "*": + fallthrough + case payload.Data.ActionType: + go binding.callback(payload) + break + default: + return + } } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index d6f5069..d2faee6 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -33,7 +33,7 @@ type RealtimeClient struct { } type binding struct { - eventType string + eventType string filter eventFilter callback func(any) } From 9aa6d7b8d089505f409fe9b7e689f977dce2467d Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 14 Jun 2024 00:53:00 -0400 Subject: [PATCH 13/21] add postgres event validation after initial connection message --- realtime/messages.go | 14 ++------ realtime/realtime_channel.go | 64 +++++++++++++++++++++--------------- realtime/realtime_client.go | 13 +++----- 3 files changed, 44 insertions(+), 47 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index dc9c886..a83e289 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -1,7 +1,6 @@ package realtime import ( - "container/list" "encoding/json" ) @@ -110,20 +109,13 @@ func createTemplateMessage(event string, topic string) *TemplateMsg { } // create a connection message depending on event type -func createConnectionMessage(topic string, bindings *list.List) *ConnectionMsg { +func createConnectionMessage(topic string, bindings []*binding) *ConnectionMsg { msg := &ConnectionMsg{} - bindNode := bindings.Front() - // Fill out the message template msg.TemplateMsg = createTemplateMessage(joinEvent, topic) // Fill out the payload - for bindNode != nil { - bind, ok := bindNode.Value.(binding) - if !ok { - panic("TYPE ASSERTION FAILED: expecting type binding") - } - + for _, bind := range bindings { filter := bind.filter switch filter.(type) { case postgresFilter: @@ -141,8 +133,6 @@ func createConnectionMessage(topic string, bindings *list.List) *ConnectionMsg { default: panic("TYPE ASSERTION FAILED: expecting one of postgresFilter, broadcastFilter, or presenceFilter") } - - bindNode = bindNode.Next() } return msg diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 4265bbc..6354ecf 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -1,18 +1,18 @@ package realtime import ( - "container/list" "context" "fmt" ) type RealtimeChannel struct { - topic string - client *RealtimeClient - - bindings *list.List - bindingsMap map[int]binding + topic string + client *RealtimeClient hasSubscribed bool + + numBindings int + bindings map[string][]*binding + postgresBindingRoute map[int]*binding } // Initialize a new channel @@ -20,8 +20,9 @@ func CreateRealtimeChannel(client *RealtimeClient, topic string) *RealtimeChanne return &RealtimeChannel{ client: client, topic: topic, - bindings: list.New(), - bindingsMap: make(map[int]binding), + numBindings: 0, + bindings: make(map[string][]*binding), + postgresBindingRoute: make(map[int]*binding), hasSubscribed: false, } } @@ -38,13 +39,14 @@ func (channel *RealtimeChannel) On(eventType string, filter map[string]string, c return fmt.Errorf("Invalid filter criteria for %s event type: %w", eventType, err) } - newBinding := binding{ + newBinding := &binding{ eventType: eventType, filter: eventFilter, callback: callback, } - channel.bindings.PushBack(newBinding) + channel.numBindings += 1 + channel.bindings[eventType] = append(channel.bindings[eventType], newBinding) return nil } @@ -56,31 +58,41 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { } // Do nothing if there are no bindings - if channel.bindings.Len() == 0 { + if channel.numBindings == 0 { return nil } - ids, err := channel.client.subscribe(channel.topic, channel.bindings, ctx) + // Flatten all type of bindings into one slice + allBindings := make([]*binding, channel.numBindings) + for _, eventType := range []string{postgresChangesEventType, broadcastEventType, presenceEventType} { + copy(allBindings, channel.bindings[eventType]) + } + + respPayload, err := channel.client.subscribe(channel.topic, allBindings, ctx) if err != nil { return fmt.Errorf("Channel %s failed to subscribe: %v", channel.topic, err) } - bindNode := channel.bindings.Front() - for _, id := range ids { - if bindNode == nil { - channel.Unsubscribe() - return fmt.Errorf("Error: the number subscribed events are not equal to the total events") - } - binding, ok := bindNode.Value.(binding) + // Verify and map postgres events. If there are any mismatch, channel will + // rollback, and unsubscribe to the events. + changes := respPayload.Response.PostgresChanges + postgresBindings := channel.bindings[postgresChangesEventType] + if len(postgresBindings) != len(changes) { + channel.Unsubscribe() + return fmt.Errorf("Server returns the wrong number of subscribed events: %v events", len(changes)) + } + + for i, change := range changes { + bindingFilter, ok := postgresBindings[i].filter.(postgresFilter) if !ok { - panic("TYPE ASSERTION FAILED: expecting type binding") + panic("TYPE ASSERTION FAILED: expecting type postgresFilter") } - - if binding.eventType == postgresChangesEvent { - channel.bindingsMap[id] = binding; + if change.Schema != bindingFilter.Schema || change.Event != bindingFilter.Event || + change.Table != bindingFilter.Table || change.Filter != bindingFilter.Filter { + channel.Unsubscribe() + return fmt.Errorf("Configuration mismatch between server's event and channel's event") } - - bindNode = bindNode.Next() + channel.postgresBindingRoute[change.ID] = postgresBindings[i] } channel.hasSubscribed = true @@ -97,7 +109,7 @@ func (channel *RealtimeChannel) Unsubscribe() { // Route the id of triggered event to appropriate callback func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCPayload) { - binding, ok := channel.bindingsMap[id] + binding, ok := channel.postgresBindingRoute[id] if !ok { channel.client.logger.Printf("Error: Unrecognized id %v", id) } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index d2faee6..771c166 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -1,7 +1,6 @@ package realtime import ( - "container/list" "context" "encoding/json" "errors" @@ -28,7 +27,7 @@ type RealtimeClient struct { heartbeatDuration time.Duration heartbeatInterval time.Duration - replyChan chan []int + replyChan chan *ReplyPayload currentTopics map[string]*RealtimeChannel } @@ -107,13 +106,13 @@ func (client *RealtimeClient) Disconnect() error { } // Begins subscribing to events in the bindingQueue -func (client *RealtimeClient) subscribe(topic string, bindings *list.List, ctx context.Context) ([]int, error) { +func (client *RealtimeClient) subscribe(topic string, bindings []*binding, ctx context.Context) (*ReplyPayload, error) { if !client.isClientAlive() { client.Connect() } if client.replyChan == nil { - client.replyChan = make(chan []int) + client.replyChan = make(chan *ReplyPayload) } msg := createConnectionMessage(topic, bindings) @@ -238,11 +237,7 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { if len(changes) == 0 || status != "ok" || changes[0].ID == 0 { client.logger.Printf("Received: %+v", payload) } else { - ids := make([]int, len(changes)) - for i, change := range changes { - ids[i] = change.ID - } - client.replyChan <- ids + client.replyChan <- payload } break case *PostgresCDCPayload: From 8715df0c8796acee6df5c102007c791ed13f5f4e Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 14 Jun 2024 01:29:22 -0400 Subject: [PATCH 14/21] implement RealtimeChannel.Unsubscribe (with potential race condition --- realtime/events.go | 2 ++ realtime/messages.go | 3 ++- realtime/realtime_channel.go | 16 +++++++++++----- realtime/realtime_client.go | 32 ++++++++++++++++++++++++-------- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/realtime/events.go b/realtime/events.go index 95bb174..3e7e579 100644 --- a/realtime/events.go +++ b/realtime/events.go @@ -10,6 +10,8 @@ import ( const ( joinEvent string = "phx_join" replyEvent string = "phx_reply" + leaveEvent string = "phx_leave" + closeEvent string = "phx_close" // DB Subscription Events postgresChangesEvent string = "postgres_changes" diff --git a/realtime/messages.go b/realtime/messages.go index a83e289..d2aacec 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -92,7 +92,8 @@ type PresenceStatePayload map[string]struct{ } `json:"metas,omitempty"` } -type HearbeatMsg struct { +// Messages that have empty payload +type BlankMsg struct { *TemplateMsg Payload struct { diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 6354ecf..12bab9d 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -78,7 +78,7 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { changes := respPayload.Response.PostgresChanges postgresBindings := channel.bindings[postgresChangesEventType] if len(postgresBindings) != len(changes) { - channel.Unsubscribe() + channel.Unsubscribe(ctx) return fmt.Errorf("Server returns the wrong number of subscribed events: %v events", len(changes)) } @@ -89,7 +89,7 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { } if change.Schema != bindingFilter.Schema || change.Event != bindingFilter.Event || change.Table != bindingFilter.Table || change.Filter != bindingFilter.Filter { - channel.Unsubscribe() + channel.Unsubscribe(ctx) return fmt.Errorf("Configuration mismatch between server's event and channel's event") } channel.postgresBindingRoute[change.ID] = postgresBindings[i] @@ -100,10 +100,15 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { return nil } -func (channel *RealtimeChannel) Unsubscribe() { - if channel.client.isClientAlive() { - +// Unsubscribe from the channel and stop listening to events +func (channel *RealtimeChannel) Unsubscribe(ctx context.Context) { + if !channel.hasSubscribed { + return } + + // Refresh all the binding routes + clear(channel.postgresBindingRoute) + channel.client.unsubscribe(channel.topic, ctx) channel.hasSubscribed = false } @@ -112,6 +117,7 @@ func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCP binding, ok := channel.postgresBindingRoute[id] if !ok { channel.client.logger.Printf("Error: Unrecognized id %v", id) + return } bindFilter, ok := binding.filter.(postgresFilter) diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 771c166..4c7ebe1 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -105,7 +105,7 @@ func (client *RealtimeClient) Disconnect() error { return nil } -// Begins subscribing to events in the bindingQueue +// Begins subscribing to events func (client *RealtimeClient) subscribe(topic string, bindings []*binding, ctx context.Context) (*ReplyPayload, error) { if !client.isClientAlive() { client.Connect() @@ -131,6 +131,24 @@ func (client *RealtimeClient) subscribe(topic string, bindings []*binding, ctx c } } +// Unsubscribe from events +func (client *RealtimeClient) unsubscribe(topic string, ctx context.Context) { + // There's no connection, so no need to unsubscribe from anything + if !client.isClientAlive() { + return + } + + leaveMsg := BlankMsg{ + TemplateMsg: createTemplateMessage(leaveEvent, topic), + Payload: struct{}{}, + } + + err := wsjson.Write(context.Background(), client.conn, leaveMsg) + if err != nil { + fmt.Printf("Unexpected error: %v", err) + } +} + // Create a new channel with given topic string func (client *RealtimeClient) Channel(newTopic string) (*RealtimeChannel, error) { if _, ok := client.currentTopics[newTopic]; ok { @@ -171,12 +189,8 @@ func (client *RealtimeClient) startHeartbeats() { // Send the heartbeat to the realtime server func (client *RealtimeClient) sendHeartbeat() error { - msg := HearbeatMsg{ - TemplateMsg: &TemplateMsg{ - Event: heartbeatEvent, - Topic: "phoenix", - Ref: "", - }, + msg := BlankMsg{ + TemplateMsg: createTemplateMessage(heartbeatEvent, "phoenix"), Payload: struct{}{}, } @@ -235,7 +249,7 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { status := payload.Status if len(changes) == 0 || status != "ok" || changes[0].ID == 0 { - client.logger.Printf("Received: %+v", payload) + client.logger.Printf("Received %v: %+v", msg.Event, payload) } else { client.replyChan <- payload } @@ -263,6 +277,8 @@ func (client *RealtimeClient) unmarshalPayload(msg AbstractMsg) (any, error) { // Parse the payload depending on the event type switch msg.Event { + case closeEvent: + fallthrough case replyEvent: payload = new(ReplyPayload) break From 4080fb0a9dfb8916cec2cd6ee56f273e387a51f0 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 14 Jun 2024 16:46:31 -0400 Subject: [PATCH 15/21] add rwMutex to prevent clearing route map during a callback --- realtime/realtime_channel.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 12bab9d..ed2b43f 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -3,6 +3,7 @@ package realtime import ( "context" "fmt" + "sync" ) type RealtimeChannel struct { @@ -10,6 +11,7 @@ type RealtimeChannel struct { client *RealtimeClient hasSubscribed bool + rwMu sync.RWMutex numBindings int bindings map[string][]*binding postgresBindingRoute map[int]*binding @@ -107,14 +109,20 @@ func (channel *RealtimeChannel) Unsubscribe(ctx context.Context) { } // Refresh all the binding routes + channel.rwMu.Lock() clear(channel.postgresBindingRoute) + channel.rwMu.Unlock() + channel.client.unsubscribe(channel.topic, ctx) channel.hasSubscribed = false } // Route the id of triggered event to appropriate callback func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCPayload) { + channel.rwMu.RLock() binding, ok := channel.postgresBindingRoute[id] + channel.rwMu.RUnlock() + if !ok { channel.client.logger.Printf("Error: Unrecognized id %v", id) return From 417899c31adf0a5c947631b6526fa641f25a13f7 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 14 Jun 2024 17:28:13 -0400 Subject: [PATCH 16/21] cleanup messages and add docs --- realtime/messages.go | 133 +++++++++++++++++++----------------- realtime/realtime_client.go | 16 ++--- 2 files changed, 79 insertions(+), 70 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index d2aacec..78150a7 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -4,51 +4,57 @@ import ( "encoding/json" ) -type TemplateMsg struct { - Event string `json:"event"` - Topic string `json:"topic"` - Ref string `json:"ref"` +// This is a general message strucutre. It follows the message protocol +// of the phoenix server: +/* + { + event: string, + topic: string, + payload: [key: string]: boolean | int | string | any, + ref: string + } +*/ +type Msg struct { + Metadata + Payload any `json:"payload"` } -type AbstractMsg struct { - *TemplateMsg +// Generic message that contains raw payload. It can be used +// as a tagged union, where the event field can be used to +// determine the structure of the payload. +type RawMsg struct { + Metadata Payload json.RawMessage `json:"payload"` } -type ConnectionMsg struct { - *TemplateMsg - - Payload struct { - Config struct { - Broadcast struct { - Self bool `json:"self"` - } `json:"broadcast,omitempty"` +// The other fields besides the payload that make up a message. +// It describes other information about a message such as type of event, +// the topic the message belongs to, and its reference. +type Metadata struct { + Event string `json:"event"` + Topic string `json:"topic"` + Ref string `json:"ref"` +} - Presence struct { - Key string `json:"key"` - } `json:"presence,omitempty"` +// Payload for the conection message for when client first joins the channel. +// More info: https://supabase.com/docs/guides/realtime/protocol#connection +type ConnectionPayload struct { + Config struct { + Broadcast struct { + Self bool `json:"self"` + } `json:"broadcast,omitempty"` - PostgresChanges []postgresFilter `json:"postgres_changes,omitempty"` - } `json:"config"` - } `json:"payload"` -} + Presence struct { + Key string `json:"key"` + } `json:"presence,omitempty"` -type PostgresCDCMsg struct { - *TemplateMsg - - Payload struct { - Data struct { - Schema string `json:"schema"` - Table string `json:"table"` - CommitTime string `json:"commit_timestamp"` - EventType string `json:"eventType"` - New map[string]string `json:"new"` - Old map[string]string `json:"old"` - Errors string `json:"errors"` - } `json:"data"` - } `json:"payload"` + PostgresChanges []postgresFilter `json:"postgres_changes,omitempty"` + } `json:"config"` } +// Payload of the server's first response of three upon joining channel. +// It contains details about subscribed postgres events. +// More info: https://supabase.com/docs/guides/realtime/protocol#connection type ReplyPayload struct { Response struct { PostgresChanges []struct{ @@ -59,6 +65,9 @@ type ReplyPayload struct { Status string `json:"status"` } +// Payload of the server's second response of three upon joining channel. +// It contains details about the status of subscribing to PostgresSQL. +// More info: https://supabase.com/docs/guides/realtime/protocol#system-messages type SystemPayload struct { Channel string `json:"channel"` Extension string `json:"extension"` @@ -66,6 +75,19 @@ type SystemPayload struct { Status string `json:"status"` } +// Payload of the server's third response of three upon joining channel. +// It contains details about the Presence feature of Supabase. +// More info: https://supabase.com/docs/guides/realtime/protocol#state-update +type PresenceStatePayload map[string]struct{ + Metas []struct{ + Ref string `json:"phx_ref"` + Name string `json:"name"` + T float64 `json:"t"` + } `json:"metas,omitempty"` +} + +// Payload of the server's response when there is a postgres_changes event. +// More info: https://supabase.com/docs/guides/realtime/protocol#system-messages type PostgresCDCPayload struct { Data struct { Schema string `json:"schema"` @@ -83,26 +105,9 @@ type PostgresCDCPayload struct { IDs []int `json:"ids"` } -// presence_state can contain any key. Hence map type instead of struct -type PresenceStatePayload map[string]struct{ - Metas []struct{ - Ref string `json:"phx_ref"` - Name string `json:"name"` - T float64 `json:"t"` - } `json:"metas,omitempty"` -} - -// Messages that have empty payload -type BlankMsg struct { - *TemplateMsg - - Payload struct { - } `json:"payload"` -} - // create a template message -func createTemplateMessage(event string, topic string) *TemplateMsg { - return &TemplateMsg{ +func createMsgMetadata(event string, topic string) *Metadata { + return &Metadata{ Event: event, Topic: topic, Ref: "", @@ -110,31 +115,35 @@ func createTemplateMessage(event string, topic string) *TemplateMsg { } // create a connection message depending on event type -func createConnectionMessage(topic string, bindings []*binding) *ConnectionMsg { - msg := &ConnectionMsg{} +func createConnectionMessage(topic string, bindings []*binding) *Msg { + msg := &Msg{} + // Fill out the message template - msg.TemplateMsg = createTemplateMessage(joinEvent, topic) + msg.Metadata = *createMsgMetadata(joinEvent, topic) // Fill out the payload + payload := &ConnectionPayload{} for _, bind := range bindings { filter := bind.filter switch filter.(type) { case postgresFilter: - if msg.Payload.Config.PostgresChanges == nil { - msg.Payload.Config.PostgresChanges = make([]postgresFilter, 0, 1) + if payload.Config.PostgresChanges == nil { + payload.Config.PostgresChanges = make([]postgresFilter, 0, 1) } - msg.Payload.Config.PostgresChanges = append(msg.Payload.Config.PostgresChanges, filter.(postgresFilter)) + payload.Config.PostgresChanges = append(payload.Config.PostgresChanges, filter.(postgresFilter)) break case broadcastFilter: - msg.Payload.Config.Broadcast.Self = true + payload.Config.Broadcast.Self = true break case presenceFilter: - msg.Payload.Config.Presence.Key = "" + payload.Config.Presence.Key = "" break default: panic("TYPE ASSERTION FAILED: expecting one of postgresFilter, broadcastFilter, or presenceFilter") } } + msg.Payload = payload + return msg } diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 4c7ebe1..0376220 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -138,12 +138,12 @@ func (client *RealtimeClient) unsubscribe(topic string, ctx context.Context) { return } - leaveMsg := BlankMsg{ - TemplateMsg: createTemplateMessage(leaveEvent, topic), + leaveMsg := &Msg{ + Metadata: *createMsgMetadata(leaveEvent, topic), Payload: struct{}{}, } - err := wsjson.Write(context.Background(), client.conn, leaveMsg) + err := wsjson.Write(ctx, client.conn, leaveMsg) if err != nil { fmt.Printf("Unexpected error: %v", err) } @@ -189,8 +189,8 @@ func (client *RealtimeClient) startHeartbeats() { // Send the heartbeat to the realtime server func (client *RealtimeClient) sendHeartbeat() error { - msg := BlankMsg{ - TemplateMsg: createTemplateMessage(heartbeatEvent, "phoenix"), + msg := &Msg{ + Metadata: *createMsgMetadata(heartbeatEvent, "phoenix"), Payload: struct{}{}, } @@ -212,7 +212,7 @@ func (client *RealtimeClient) startListening() { ctx := context.Background() for client.isClientAlive() { - var msg AbstractMsg + var msg RawMsg // Read from the connection err := wsjson.Read(ctx, client.conn, &msg) @@ -235,7 +235,7 @@ func (client *RealtimeClient) startListening() { } // Process the given message according certain events -func (client *RealtimeClient) processMessage(msg AbstractMsg) { +func (client *RealtimeClient) processMessage(msg RawMsg) { genericPayload, err := client.unmarshalPayload(msg) if err != nil { client.logger.Printf("Unable to process received message: %v", err) @@ -271,7 +271,7 @@ func (client *RealtimeClient) processMessage(msg AbstractMsg) { } } -func (client *RealtimeClient) unmarshalPayload(msg AbstractMsg) (any, error) { +func (client *RealtimeClient) unmarshalPayload(msg RawMsg) (any, error) { var payload any var err error From 6c31f0d5628075881cb536395a7503201429a06c Mon Sep 17 00:00:00 2001 From: Minh Au Date: Fri, 14 Jun 2024 17:41:11 -0400 Subject: [PATCH 17/21] move binding definition into channel --- realtime/realtime_channel.go | 7 +++++++ realtime/realtime_client.go | 6 ------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index ed2b43f..aa0ecad 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -17,6 +17,13 @@ type RealtimeChannel struct { postgresBindingRoute map[int]*binding } +// Bind an event with the user's callback function +type binding struct { + eventType string + filter eventFilter + callback func(any) +} + // Initialize a new channel func CreateRealtimeChannel(client *RealtimeClient, topic string) *RealtimeChannel { return &RealtimeChannel{ diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index 0376220..e66b844 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -31,12 +31,6 @@ type RealtimeClient struct { currentTopics map[string]*RealtimeChannel } -type binding struct { - eventType string - filter eventFilter - callback func(any) -} - // Create a new RealtimeClient with user's speicfications func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { realtimeUrl := fmt.Sprintf( From a2e11cf1459cf95ca26723cd41966b5868e2dd53 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 17 Jun 2024 18:56:52 -0400 Subject: [PATCH 18/21] add ref to msg for msg identifications --- realtime/messages.go | 5 ++++- realtime/realtime_client.go | 21 +++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/realtime/messages.go b/realtime/messages.go index 78150a7..3bb2c87 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -2,9 +2,11 @@ package realtime import ( "encoding/json" + "strconv" + "time" ) -// This is a general message strucutre. It follows the message protocol +// This is a general message strucutre. It follows the message protocol // of the phoenix server: /* { @@ -120,6 +122,7 @@ func createConnectionMessage(topic string, bindings []*binding) *Msg { // Fill out the message template msg.Metadata = *createMsgMetadata(joinEvent, topic) + msg.Metadata.Ref = strconv.FormatInt(time.Now().Unix(), 10) // Fill out the payload payload := &ConnectionPayload{} diff --git a/realtime/realtime_client.go b/realtime/realtime_client.go index e66b844..1e2c5e9 100644 --- a/realtime/realtime_client.go +++ b/realtime/realtime_client.go @@ -50,6 +50,7 @@ func CreateRealtimeClient(projectRef string, apiKey string) *RealtimeClient { reconnectInterval: 500 * time.Millisecond, currentTopics: make(map[string]*RealtimeChannel), + replyChan: make(chan *ReplyPayload), } } @@ -105,18 +106,14 @@ func (client *RealtimeClient) subscribe(topic string, bindings []*binding, ctx c client.Connect() } - if client.replyChan == nil { - client.replyChan = make(chan *ReplyPayload) - } - msg := createConnectionMessage(topic, bindings) err := wsjson.Write(context.Background(), client.conn, msg) if err != nil { return nil, fmt.Errorf("Unable to send the connection message: %v", err) } select { - case rep, ok := <- client.replyChan: - if !ok { + case rep := <- client.replyChan: + if rep == nil { return nil, fmt.Errorf("Error: Unable to subscribe to the channel %v succesfully", msg.Topic) } return rep, nil @@ -187,6 +184,7 @@ func (client *RealtimeClient) sendHeartbeat() error { Metadata: *createMsgMetadata(heartbeatEvent, "phoenix"), Payload: struct{}{}, } + msg.Metadata.Ref = heartbeatEvent ctx, cancel := context.WithTimeout(context.Background(), client.heartbeatDuration) defer cancel() @@ -239,12 +237,15 @@ func (client *RealtimeClient) processMessage(msg RawMsg) { switch payload := genericPayload.(type) { case *ReplyPayload: - changes := payload.Response.PostgresChanges status := payload.Status - if len(changes) == 0 || status != "ok" || changes[0].ID == 0 { - client.logger.Printf("Received %v: %+v", msg.Event, payload) - } else { + if msg.Ref == heartbeatEvent && status != "ok" { + client.logger.Printf("Heartbeat failure from server: %v", payload) + } else if msg.Ref == heartbeatEvent && status == "ok" { + client.logger.Printf("Heartbeat success from server: %v", payload) + } else if msg.Ref != heartbeatEvent && status != "ok" { + client.replyChan <- nil + } else if msg.Ref != heartbeatEvent && status == "ok" { client.replyChan <- payload } break From 39379947c29fdb1aba8e7f34e37d3a792a95fdb4 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Wed, 19 Jun 2024 14:43:03 -0400 Subject: [PATCH 19/21] prevent overwrite other eventType bindings --- realtime/realtime_channel.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index aa0ecad..3d5a75d 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -73,8 +73,10 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { // Flatten all type of bindings into one slice allBindings := make([]*binding, channel.numBindings) + startIdx := 0 for _, eventType := range []string{postgresChangesEventType, broadcastEventType, presenceEventType} { - copy(allBindings, channel.bindings[eventType]) + copy(allBindings[startIdx:], channel.bindings[eventType]) + startIdx += len(channel.bindings) } respPayload, err := channel.client.subscribe(channel.topic, allBindings, ctx) From 1f5b15afda414b46e9c1a30b90c8b61ea7624393 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Wed, 19 Jun 2024 14:54:31 -0400 Subject: [PATCH 20/21] add case insensitive checking --- realtime/realtime_channel.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 3d5a75d..482b151 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -3,6 +3,7 @@ package realtime import ( "context" "fmt" + "strings" "sync" ) @@ -39,6 +40,7 @@ func CreateRealtimeChannel(client *RealtimeClient, topic string) *RealtimeChanne // Perform callbacks on specific events. Successive calls to On() // will result in multiple callbacks acting at the event func (channel *RealtimeChannel) On(eventType string, filter map[string]string, callback func(any)) error { + eventType = strings.ToLower(eventType) if !verifyEventType(eventType) { return fmt.Errorf("invalid event type: %s", eventType) } @@ -98,8 +100,10 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { if !ok { panic("TYPE ASSERTION FAILED: expecting type postgresFilter") } - if change.Schema != bindingFilter.Schema || change.Event != bindingFilter.Event || - change.Table != bindingFilter.Table || change.Filter != bindingFilter.Filter { + if strings.ToLower(change.Schema) != strings.ToLower(bindingFilter.Schema) || + strings.ToUpper(change.Event) != strings.ToUpper(bindingFilter.Event) || + strings.ToLower(change.Table) != strings.ToLower(bindingFilter.Table) || + strings.ToLower(change.Filter) != strings.ToLower(bindingFilter.Filter) { channel.Unsubscribe(ctx) return fmt.Errorf("Configuration mismatch between server's event and channel's event") } @@ -143,7 +147,7 @@ func (channel *RealtimeChannel) routePostgresEvent(id int, payload *PostgresCDCP } // Match * | INSERT | UPDATE | DELETE - switch bindFilter.Event { + switch strings.ToUpper(bindFilter.Event) { case "*": fallthrough case payload.Data.ActionType: From cb8e48b108efe2023705fba238f13d9e8f9dcb29 Mon Sep 17 00:00:00 2001 From: Minh Au Date: Mon, 1 Jul 2024 13:44:55 -0400 Subject: [PATCH 21/21] fix out-of-bound error --- realtime/realtime_channel.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/realtime/realtime_channel.go b/realtime/realtime_channel.go index 482b151..f059381 100644 --- a/realtime/realtime_channel.go +++ b/realtime/realtime_channel.go @@ -77,8 +77,12 @@ func (channel *RealtimeChannel) Subscribe(ctx context.Context) error { allBindings := make([]*binding, channel.numBindings) startIdx := 0 for _, eventType := range []string{postgresChangesEventType, broadcastEventType, presenceEventType} { + if startIdx >= channel.numBindings { + break + } + copy(allBindings[startIdx:], channel.bindings[eventType]) - startIdx += len(channel.bindings) + startIdx += len(channel.bindings[eventType]) } respPayload, err := channel.client.subscribe(channel.topic, allBindings, ctx)