diff --git a/CHANGELOG.md b/CHANGELOG.md index 14ff201dd..7cb2f6d2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed race in `readerReconnector` + ## v3.117.1 * Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()` * Fixed race in integration test `TestTopicWriterLogMessagesWithoutData` diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 46ef40626..65d150f9e 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -212,9 +212,21 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, reason error) er r.closeOnce.Do(func() { closeErr = r.background.Close(ctx, reason) - if r.streamVal != nil { - streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed)) - r.streamContextCancel(errReaderClosed) + // Get references under lock + var streamVal batchedStreamReader + var streamCancel context.CancelCauseFunc + + r.m.WithLock(func() { + streamVal = r.streamVal + streamCancel = r.streamContextCancel + }) + + // Make I/O calls outside the lock + if streamVal != nil { + streamCloseErr := streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed)) + if streamCancel != nil { + streamCancel(errReaderClosed) + } if closeErr == nil { closeErr = streamCloseErr }