Skip to content

Commit b7926aa

Browse files
authored
Prepare release 1.20.1 (#7136)
* fix panic on health check failure when using stream push (#7116) Signed-off-by: SungJin1212 <[email protected]> * prepare release 1.20.1 Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent 4e56848 commit b7926aa

File tree

5 files changed

+53
-4
lines changed

5 files changed

+53
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Changelog
22

3-
## master / unreleased
3+
## 1.20.1 2025-12-03
4+
5+
* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116
46

57
## 1.20.0 2025-11-10
68

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.20.0
1+
1.20.1

docs/getting-started/.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CORTEX_VERSION=v1.20.0
1+
CORTEX_VERSION=v1.20.1
22
GRAFANA_VERSION=10.4.2
33
PROMETHEUS_VERSION=v3.2.1
44
SEAWEEDFS_VERSION=3.67

pkg/ingester/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error {
231231
select {
232232
case <-ctx.Done():
233233
return
234-
case job := <-c.streamPushChan:
234+
case job, ok := <-c.streamPushChan:
235+
if !ok {
236+
return
237+
}
235238
err = stream.Send(job.req)
236239
if err == io.EOF {
237240
job.resp = &cortexpb.WriteResponse{}

pkg/ingester/client/client_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
1314
"google.golang.org/grpc"
1415

@@ -115,12 +116,18 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ
115116

116117
type mockIngester struct {
117118
IngesterClient
119+
mock.Mock
118120
}
119121

120122
func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
121123
return &cortexpb.WriteResponse{}, nil
122124
}
123125

126+
func (m *mockIngester) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) {
127+
args := m.Called(ctx, opts)
128+
return args.Get(0).(Ingester_PushStreamClient), nil
129+
}
130+
124131
type mockClientConn struct {
125132
ClosableClientConn
126133
}
@@ -227,3 +234,40 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
227234
assert.True(t, job1Cancelled, "job1 should have been cancelled")
228235
assert.True(t, job2Cancelled, "job2 should have been cancelled")
229236
}
237+
238+
type mockClientStream struct {
239+
mock.Mock
240+
grpc.ClientStream
241+
}
242+
243+
func (m *mockClientStream) Send(msg *cortexpb.StreamWriteRequest) error {
244+
args := m.Called(msg)
245+
return args.Error(0)
246+
}
247+
248+
func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
249+
return &cortexpb.WriteResponse{}, nil
250+
}
251+
252+
func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) {
253+
ctx, cancel := context.WithCancel(context.Background())
254+
streamChan := make(chan *streamWriteJob)
255+
256+
mockIngester := &mockIngester{}
257+
mockStream := &mockClientStream{}
258+
mockIngester.On("PushStream", mock.Anything, mock.Anything).Return(mockStream, nil).Once()
259+
260+
client := &closableHealthAndIngesterClient{
261+
IngesterClient: mockIngester,
262+
conn: &mockClientConn{},
263+
addr: "test-addr",
264+
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
265+
streamCtx: ctx,
266+
streamCancel: cancel,
267+
streamPushChan: streamChan,
268+
}
269+
require.NoError(t, client.worker(context.Background()))
270+
require.NoError(t, client.Close())
271+
272+
time.Sleep(100 * time.Millisecond)
273+
}

0 commit comments

Comments
 (0)