Skip to content

Commit ea96859

Browse files
authored
Merge pull request #21 from bluesky-social/jetstream_limiters
Add per-IP rate limiters
2 parents 4cb3eef + 2aacb58 commit ea96859

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

pkg/server/server.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@ import (
1717
"github.com/labstack/echo/v4"
1818
"go.opentelemetry.io/otel"
1919
"golang.org/x/sync/semaphore"
20+
"golang.org/x/time/rate"
2021
)
2122

2223
type Server struct {
23-
Subscribers map[int64]*Subscriber
24-
lk sync.RWMutex
25-
nextSub int64
26-
Consumer *consumer.Consumer
27-
maxSubRate float64
28-
seq int64
24+
Subscribers map[int64]*Subscriber
25+
lk sync.RWMutex
26+
nextSub int64
27+
Consumer *consumer.Consumer
28+
maxSubRate float64
29+
seq int64
30+
perIPLimiters map[string]*rate.Limiter
2931
}
3032

3133
var upgrader = websocket.Upgrader{
@@ -40,8 +42,9 @@ var tracer = otel.Tracer("jetstream-server")
4042

4143
func NewServer(maxSubRate float64) (*Server, error) {
4244
s := Server{
43-
Subscribers: make(map[int64]*Subscriber),
44-
maxSubRate: maxSubRate,
45+
Subscribers: make(map[int64]*Subscriber),
46+
maxSubRate: maxSubRate,
47+
perIPLimiters: make(map[string]*rate.Limiter),
4548
}
4649

4750
return &s, nil

pkg/server/subscriber.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
204204
s.lk.Lock()
205205
defer s.lk.Unlock()
206206

207+
lim := s.perIPLimiters[realIP]
208+
if lim == nil {
209+
lim = rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate))
210+
s.perIPLimiters[realIP] = lim
211+
}
212+
207213
sub := Subscriber{
208214
ws: ws,
209215
realIP: realIP,
@@ -216,7 +222,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
216222
compress: opts.Compress,
217223
deliveredCounter: eventsDelivered.WithLabelValues(realIP),
218224
bytesCounter: bytesDelivered.WithLabelValues(realIP),
219-
rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)),
225+
rl: lim,
220226
}
221227

222228
s.Subscribers[s.nextSub] = &sub

0 commit comments

Comments
 (0)