Skip to content

Commit f4da85b

Browse files
committed
Pings and bigger buffer
1 parent f079042 commit f4da85b

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

pkg/server/server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
112112
}
113113
defer ws.Close()
114114

115+
writeWait := 10 * time.Second
116+
115117
log := slog.With("source", "server_handle_subscribe", "socket_addr", ws.RemoteAddr().String(), "real_ip", subIP)
116118

117119
sub, err := s.AddSubscriber(ws, subIP, subscriberOpts)
@@ -143,6 +145,8 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
143145
cancel()
144146
return
145147
}
148+
case websocket.PongMessage:
149+
log.Debug("received pong message from client")
146150
case websocket.CloseMessage:
147151
log.Info("received close message from client")
148152
cancel()
@@ -241,6 +245,9 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
241245
}
242246

243247
// Read events from the outbox and send them to the subscriber
248+
pingPeriod := 30 * time.Second
249+
t := time.NewTicker(pingPeriod)
250+
defer t.Stop()
244251
for {
245252
select {
246253
case <-ctx.Done():
@@ -253,6 +260,8 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
253260
return fmt.Errorf("failed to wait for rate limiter: %w", err)
254261
}
255262

263+
ws.SetWriteDeadline(time.Now().Add(writeWait))
264+
256265
// When compression is enabled, the msg is a zstd compressed message
257266
if compress {
258267
if err := sub.WriteMessage(websocket.BinaryMessage, *msg); err != nil {
@@ -267,6 +276,12 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
267276
log.Error("failed to write message to websocket", "error", err)
268277
return nil
269278
}
279+
case <-t.C:
280+
ws.SetWriteDeadline(time.Now().Add(writeWait))
281+
if err := sub.WriteMessage(websocket.PingMessage, nil); err != nil {
282+
log.Error("failed to write ping to websocket", "error", err)
283+
return nil
284+
}
270285
}
271286
}
272287
}

pkg/server/subscriber.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"log/slog"
8+
"slices"
89
"strconv"
910
"strings"
1011
"sync"
@@ -72,7 +73,7 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti
7273

7374
if playback {
7475
// Copy the event bytes so the playback iterator can reuse the buffer
75-
evtBytes = append([]byte{}, evtBytes...)
76+
evtBytes = slices.Clone(evtBytes)
7677
select {
7778
case <-ctx.Done():
7879
log.Error("failed to send event to subscriber", "error", ctx.Err(), "subscriber", sub.id)
@@ -213,7 +214,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
213214
sub := Subscriber{
214215
ws: ws,
215216
realIP: realIP,
216-
outbox: make(chan *[]byte, 10_000),
217+
outbox: make(chan *[]byte, 50_000),
217218
hello: make(chan struct{}),
218219
id: s.nextSub,
219220
wantedCollections: opts.WantedCollections,

0 commit comments

Comments
 (0)