Skip to content

Commit 12c2574

Browse files
tac0turtlerenaynay
andauthored
refactor!: syncing (#2798)
## Summary - Flattened the syncer’s P2P handler so it runs directly inside `tryFetchFromP2P`, dropping the goroutine + semaphore watcher layer. - Rewrote the handler to stream a height range synchronously, emitting events only when both header and data are available and the proposer matches genesis. - Added unit tests with tighter coverage for the new flow (missing data/header, proposer mismatch, processed-height skipping, etc.). ### Motivation - The notifier integration already gives us event-driven fan-out; the per-height watcher machinery had become redundant complexity with awkward semaphore limits and cancellation paths. - Simpler, synchronous code is easier to reason about and better matches the new notifier-based design. --------- Co-authored-by: rene <[email protected]>
1 parent 765b127 commit 12c2574

File tree

22 files changed

+564
-518
lines changed

22 files changed

+564
-518
lines changed

.mockery.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,3 @@ packages:
6565
dir: ./block/internal/common
6666
pkgname: common
6767
filename: broadcaster_mock.go
68-
p2pHandler:
69-
config:
70-
dir: ./block/internal/syncing
71-
pkgname: syncing
72-
filename: syncer_mock.go

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,5 @@ Pre-release versions: 0.x.y (anything may change)
123123
-->
124124

125125
<!-- Links -->
126+
126127
- [Unreleased]: https://github.com/evstack/ev-node/compare/v1.0.0-beta.1...HEAD

apps/grpc/single/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module github.com/evstack/ev-node/apps/grpc/single
22

33
go 1.24.6
44

5+
replace github.com/celestiaorg/go-header => github.com/julienrbrt/go-header v0.0.0-20251008134330-747c8c192fa8 // TODO: to remove after https://github.com/celestiaorg/go-header/pull/347
6+
57
require (
68
github.com/evstack/ev-node v1.0.0-beta.9
79
github.com/evstack/ev-node/core v1.0.0-beta.4

apps/grpc/single/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
2222
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
2323
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
2424
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
25-
github.com/celestiaorg/go-header v0.7.3 h1:3+kIa+YXT789gPGRh3a55qmdYq3yTTBIqTyum26AvN0=
26-
github.com/celestiaorg/go-header v0.7.3/go.mod h1:eX9iTSPthVEAlEDLux40ZT/olXPGhpxHd+mEzJeDhd0=
2725
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
2826
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
2927
github.com/celestiaorg/go-square/v3 v3.0.2 h1:eSQOgNII8inK9IhiBZ+6GADQeWbRq4HYY72BOgcduA4=
@@ -182,6 +180,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
182180
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
183181
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
184182
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
183+
github.com/julienrbrt/go-header v0.0.0-20251008134330-747c8c192fa8 h1:F+gOiipBxG43s+Ho+ri9T8IwumjWjp1XUon4DLWjxfQ=
184+
github.com/julienrbrt/go-header v0.0.0-20251008134330-747c8c192fa8/go.mod h1:eX9iTSPthVEAlEDLux40ZT/olXPGhpxHd+mEzJeDhd0=
185185
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
186186
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
187187
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=

block/internal/common/expected_interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"context"
5+
56
pubsub "github.com/libp2p/go-libp2p-pubsub"
67

78
"github.com/celestiaorg/go-header"

block/internal/syncing/p2p_handler.go

Lines changed: 62 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ package syncing
33
import (
44
"bytes"
55
"context"
6-
"errors"
76
"fmt"
8-
"time"
7+
"sync/atomic"
98

109
goheader "github.com/celestiaorg/go-header"
1110
"github.com/rs/zerolog"
@@ -16,16 +15,23 @@ import (
1615
"github.com/evstack/ev-node/types"
1716
)
1817

19-
// P2PHandler handles all P2P operations for the syncer
18+
// P2PHandler coordinates block retrieval from P2P stores for the syncer.
19+
// It waits for both header and data to be available at a given height,
20+
// validates their consistency, and emits events to the syncer for processing.
21+
//
22+
// The handler maintains a processedHeight to track the highest block that has been
23+
// successfully validated and sent to the syncer, preventing duplicate processing.
2024
type P2PHandler struct {
2125
headerStore goheader.Store[*types.SignedHeader]
2226
dataStore goheader.Store[*types.Data]
2327
cache cache.Manager
2428
genesis genesis.Genesis
2529
logger zerolog.Logger
30+
31+
processedHeight atomic.Uint64
2632
}
2733

28-
// NewP2PHandler creates a new P2P handler
34+
// NewP2PHandler creates a new P2P handler.
2935
func NewP2PHandler(
3036
headerStore goheader.Store[*types.SignedHeader],
3137
dataStore goheader.Store[*types.Data],
@@ -42,176 +48,76 @@ func NewP2PHandler(
4248
}
4349
}
4450

45-
// ProcessHeaderRange processes headers from the header store within the given range
46-
func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
47-
if startHeight > endHeight {
48-
return
49-
}
50-
51-
for height := startHeight; height <= endHeight; height++ {
52-
select {
53-
case <-ctx.Done():
51+
// SetProcessedHeight updates the highest processed block height.
52+
func (h *P2PHandler) SetProcessedHeight(height uint64) {
53+
for {
54+
current := h.processedHeight.Load()
55+
if height <= current {
5456
return
55-
default:
56-
}
57-
58-
// Create a timeout context for each GetByHeight call to prevent blocking
59-
timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
60-
header, err := h.headerStore.GetByHeight(timeoutCtx, height)
61-
cancel()
62-
63-
if err != nil {
64-
if errors.Is(err, context.DeadlineExceeded) {
65-
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later")
66-
// Don't continue processing further heights if we timeout on one
67-
// This prevents blocking on sequential heights
68-
return
69-
}
70-
h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get header from store")
71-
continue
72-
}
73-
74-
// basic header validation
75-
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
76-
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
77-
continue
78-
}
79-
80-
// Get corresponding data (empty data are still broadcasted by peers)
81-
var data *types.Data
82-
timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond)
83-
retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height)
84-
cancel()
85-
86-
if err != nil {
87-
if errors.Is(err, context.DeadlineExceeded) {
88-
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later")
89-
// Don't continue processing if data is not available
90-
// Store event with header only for later processing
91-
continue
92-
}
93-
h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve data for header from data store")
94-
continue
95-
}
96-
data = retrievedData
97-
98-
// CRITICAL: Validate that data matches the header's DataHash commitment
99-
// This prevents accepting legitimate headers paired with tampered data from different blocks
100-
dataCommitment := data.DACommitment()
101-
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
102-
h.logger.Warn().
103-
Uint64("height", height).
104-
Str("header_data_hash", fmt.Sprintf("%x", header.DataHash)).
105-
Str("actual_data_hash", fmt.Sprintf("%x", dataCommitment)).
106-
Msg("DataHash mismatch: header and data do not match from P2P, discarding")
107-
continue
108-
}
109-
110-
// further header validation (signature) is done in validateBlock.
111-
// we need to be sure that the previous block n-1 was executed before validating block n
112-
113-
// Create height event
114-
event := common.DAHeightEvent{
115-
Header: header,
116-
Data: data,
117-
DaHeight: 0, // P2P events don't have DA height context
118-
Source: common.SourceP2P,
11957
}
120-
121-
select {
122-
case heightInCh <- event:
123-
default:
124-
h.cache.SetPendingEvent(event.Header.Height(), &event)
58+
if h.processedHeight.CompareAndSwap(current, height) {
59+
return
12560
}
126-
127-
h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P")
12861
}
12962
}
13063

131-
// ProcessDataRange processes data from the data store within the given range
132-
func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
133-
if startHeight > endHeight {
134-
return
64+
// ProcessHeight retrieves and validates both header and data for the given height from P2P stores.
65+
// It blocks until both are available, validates consistency (proposer address and data hash match),
66+
// then emits the event to heightInCh or stores it as pending. Updates processedHeight on success.
67+
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
68+
if height <= h.processedHeight.Load() {
69+
return nil
13570
}
13671

137-
for height := startHeight; height <= endHeight; height++ {
138-
select {
139-
case <-ctx.Done():
140-
return
141-
default:
142-
}
143-
144-
// Create a timeout context for each GetByHeight call to prevent blocking
145-
timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
146-
data, err := h.dataStore.GetByHeight(timeoutCtx, height)
147-
cancel()
148-
149-
if err != nil {
150-
if errors.Is(err, context.DeadlineExceeded) {
151-
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later")
152-
// Don't continue processing further heights if we timeout on one
153-
// This prevents blocking on sequential heights
154-
return
155-
}
156-
h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get data from store")
157-
continue
158-
}
159-
160-
// Get corresponding header with timeout
161-
timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond)
162-
header, err := h.headerStore.GetByHeight(timeoutCtx, height)
163-
cancel()
164-
165-
if err != nil {
166-
if errors.Is(err, context.DeadlineExceeded) {
167-
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later")
168-
// Don't continue processing if header is not available
169-
continue
170-
}
171-
h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve header for data from header store")
172-
continue
72+
header, err := h.headerStore.GetByHeight(ctx, height)
73+
if err != nil {
74+
if ctx.Err() == nil {
75+
h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store")
17376
}
77+
return err
78+
}
79+
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
80+
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
81+
return err
82+
}
17483

175-
// basic header validation
176-
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
177-
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
178-
continue
84+
data, err := h.dataStore.GetByHeight(ctx, height)
85+
if err != nil {
86+
if ctx.Err() == nil {
87+
h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store")
17988
}
89+
return err
90+
}
18091

181-
// CRITICAL: Validate that data matches the header's DataHash commitment
182-
// This prevents accepting legitimate headers paired with tampered data from different blocks
183-
dataCommitment := data.DACommitment()
184-
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
185-
h.logger.Warn().
186-
Uint64("height", height).
187-
Str("header_data_hash", fmt.Sprintf("%x", header.DataHash)).
188-
Str("actual_data_hash", fmt.Sprintf("%x", dataCommitment)).
189-
Msg("DataHash mismatch: header and data do not match from P2P, discarding")
190-
continue
191-
}
92+
dataCommitment := data.DACommitment()
93+
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
94+
err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment)
95+
h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P")
96+
return err
97+
}
19298

193-
// further header validation (signature) is done in validateBlock.
194-
// we need to be sure that the previous block n-1 was executed before validating block n
99+
// further header validation (signature) is done in validateBlock.
100+
// we need to be sure that the previous block n-1 was executed before validating block n
101+
event := common.DAHeightEvent{
102+
Header: header,
103+
Data: data,
104+
DaHeight: 0,
105+
Source: common.SourceP2P,
106+
}
195107

196-
// Create height event
197-
event := common.DAHeightEvent{
198-
Header: header,
199-
Data: data,
200-
DaHeight: 0, // P2P events don't have DA height context
201-
Source: common.SourceP2P,
202-
}
108+
select {
109+
case heightInCh <- event:
110+
default:
111+
h.cache.SetPendingEvent(event.Header.Height(), &event)
112+
}
203113

204-
select {
205-
case heightInCh <- event:
206-
default:
207-
h.cache.SetPendingEvent(event.Header.Height(), &event)
208-
}
114+
h.SetProcessedHeight(height)
209115

210-
h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P")
211-
}
116+
h.logger.Debug().Uint64("height", height).Msg("processed event from P2P")
117+
return nil
212118
}
213119

214-
// assertExpectedProposer validates the proposer address
120+
// assertExpectedProposer validates the proposer address.
215121
func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error {
216122
if !bytes.Equal(h.genesis.ProposerAddress, proposerAddr) {
217123
return fmt.Errorf("proposer address mismatch: got %x, expected %x",

0 commit comments

Comments
 (0)