Skip to content

Commit 2aacb58

Browse files
committed
Add per-IP rate limiters
1 parent 75fdbaa commit 2aacb58

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

pkg/server/server.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@ import (
1717
"github.com/labstack/echo/v4"
1818
"go.opentelemetry.io/otel"
1919
"golang.org/x/sync/semaphore"
20+
"golang.org/x/time/rate"
2021
)
2122

2223
type Server struct {
23-
Subscribers map[int64]*Subscriber
24-
lk sync.RWMutex
25-
nextSub int64
26-
Consumer *consumer.Consumer
27-
maxSubRate float64
28-
seq int64
24+
Subscribers map[int64]*Subscriber
25+
lk sync.RWMutex
26+
nextSub int64
27+
Consumer *consumer.Consumer
28+
maxSubRate float64
29+
seq int64
30+
perIPLimiters map[string]*rate.Limiter
2931
}
3032

3133
var upgrader = websocket.Upgrader{
@@ -40,8 +42,9 @@ var tracer = otel.Tracer("jetstream-server")
4042

4143
func NewServer(maxSubRate float64) (*Server, error) {
4244
s := Server{
43-
Subscribers: make(map[int64]*Subscriber),
44-
maxSubRate: maxSubRate,
45+
Subscribers: make(map[int64]*Subscriber),
46+
maxSubRate: maxSubRate,
47+
perIPLimiters: make(map[string]*rate.Limiter),
4548
}
4649

4750
return &s, nil

pkg/server/subscriber.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
195195
s.lk.Lock()
196196
defer s.lk.Unlock()
197197

198+
lim := s.perIPLimiters[realIP]
199+
if lim == nil {
200+
lim = rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate))
201+
s.perIPLimiters[realIP] = lim
202+
}
203+
198204
sub := Subscriber{
199205
ws: ws,
200206
realIP: realIP,
@@ -207,7 +213,7 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
207213
compress: opts.Compress,
208214
deliveredCounter: eventsDelivered.WithLabelValues(realIP),
209215
bytesCounter: bytesDelivered.WithLabelValues(realIP),
210-
rl: rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate)),
216+
rl: lim,
211217
}
212218

213219
s.Subscribers[s.nextSub] = &sub

0 commit comments

Comments
 (0)