Skip to content

Commit 3ea9614

Browse files
committed
Add concurrent request chanel check
1 parent a2f2c88 commit 3ea9614

1 file changed

Lines changed: 26 additions & 12 deletions

File tree

adsource/multisource_wrapper.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"errors"
4040
"fmt"
4141
"strings"
42+
"sync/atomic"
4243
"time"
4344

4445
"github.com/demdxx/rpool/v2"
@@ -126,11 +127,12 @@ func (wrp *MultisourceWrapper) Bid(request *adtype.BidRequest) (response adtype.
126127
return adtype.NewEmptyResponse(request, nil, errors.New("wrapper is nil"))
127128
}
128129
var (
129-
count = wrp.maxParallelRequest
130-
queue = make(chan respItem, count)
131-
span, _ = gtracing.StartSpanFromContext(request.Ctx, "ssp.bid")
132-
trafaret trafaret.Filler
133-
err error
130+
count = wrp.maxParallelRequest
131+
isQueueClosed atomic.Bool
132+
queue = make(chan respItem, count)
133+
span, _ = gtracing.StartSpanFromContext(request.Ctx, "ssp.bid")
134+
trafaret trafaret.Filler
135+
err error
134136
)
135137

136138
if span != nil {
@@ -144,12 +146,22 @@ func (wrp *MultisourceWrapper) Bid(request *adtype.BidRequest) (response adtype.
144146
}
145147

146148
// Ensure that the queue is closed when the function exits
147-
defer close(queue)
149+
defer func() {
150+
isQueueClosed.Store(true)
151+
close(queue)
152+
}()
148153

149154
// Source request loop
150155
for prior, src := range wrp.sources.Iterator(request) {
151156
count--
157+
if isQueueClosed.Load() {
158+
break
159+
}
152160
wrp.execpool.Go(func() {
161+
if isQueueClosed.Load() {
162+
return
163+
}
164+
153165
startTime := fasttime.UnixTimestampNano()
154166

155167
// Send request to the source for the advertising
@@ -159,12 +171,14 @@ func (wrp *MultisourceWrapper) Bid(request *adtype.BidRequest) (response adtype.
159171
wrp.metrics.IncrementBidRequestCount(src,
160172
request, time.Duration(startTime-fasttime.UnixTimestampNano()))
161173

162-
// Send response to the channel if it is still open
163-
select {
164-
case queue <- respItem{priority: prior, resp: resp}:
165-
// Successfully sent to the channel
166-
default:
167-
// Channel is closed or full, skip sending
174+
if !isQueueClosed.Load() {
175+
// Send response to the channel if it is still open
176+
select {
177+
case queue <- respItem{priority: prior, resp: resp}:
178+
// Successfully sent to the channel
179+
default:
180+
// Channel is closed or full, skip sending
181+
}
168182
}
169183

170184
// Store bidding information

0 commit comments

Comments
 (0)