Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions block/internal/common/broadcaster_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions block/internal/common/expected_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/go-header"

syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier"
)

// broadcaster interface for P2P broadcasting
type Broadcaster[H header.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
Store() header.Store[H]
Notifier() *syncnotifier.Notifier
}
75 changes: 25 additions & 50 deletions block/internal/syncing/p2p_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,74 +55,42 @@ func (h *P2PHandler) SetProcessedHeight(height uint64) {
h.mu.Unlock()
}

// ProcessHeaderRange scans the provided heights and emits events when both the
// header and data are available.
func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
if startHeight > endHeight {
return
}

for height := startHeight; height <= endHeight; height++ {
h.mu.Lock()
shouldProcess := height > h.processedHeight
h.mu.Unlock()

if !shouldProcess {
continue
}
h.processHeight(ctx, height, heightInCh, "header_range")
}
}

// ProcessDataRange scans the provided heights and emits events when both the
// header and data are available.
func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
if startHeight > endHeight {
return
}

for height := startHeight; height <= endHeight; height++ {
h.mu.Lock()
shouldProcess := height > h.processedHeight
h.mu.Unlock()
// ProcessHeight waits until both header and data for the given height are available.
// Once available, it validates and emits the event to the provided channel or stores it as pending.
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
h.mu.Lock()
shouldProcess := height > h.processedHeight
h.mu.Unlock()

if !shouldProcess {
continue
}
h.processHeight(ctx, height, heightInCh, "data_range")
if !shouldProcess {
return nil
}
}

func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent, source string) {
header, err := h.headerStore.GetByHeight(ctx, height)
if err != nil {
if ctx.Err() == nil {
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("header unavailable in store")
h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store")
}
return
return err
}
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("invalid header from P2P")
return
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
return err
}

data, err := h.dataStore.GetByHeight(ctx, height)
if err != nil {
if ctx.Err() == nil {
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("data unavailable in store")
h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store")
}
return
return err
}

dataCommitment := data.DACommitment()
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
h.logger.Warn().
Uint64("height", height).
Str("header_data_hash", fmt.Sprintf("%x", header.DataHash)).
Str("actual_data_hash", fmt.Sprintf("%x", dataCommitment)).
Str("source", source).
Msg("DataHash mismatch: header and data do not match from P2P, discarding")
return
err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment)
h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P")
return err
}

event := common.DAHeightEvent{
Expand All @@ -138,7 +106,14 @@ func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInC
h.cache.SetPendingEvent(event.Header.Height(), &event)
}

h.logger.Debug().Uint64("height", height).Str("source", source).Msg("processed event from P2P")
h.mu.Lock()
if height > h.processedHeight {
h.processedHeight = height
}
h.mu.Unlock()

h.logger.Debug().Uint64("height", height).Msg("processed event from P2P")
return nil
}

// assertExpectedProposer validates the proposer address.
Expand Down
43 changes: 28 additions & 15 deletions block/internal/syncing/p2p_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Du
}
}

func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T) {
func TestP2PHandler_ProcessHeight_EmitsEventWhenHeaderAndDataPresent(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()

Expand All @@ -149,15 +149,16 @@ func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data, nil).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 5, 5, ch)
err = p.Handler.ProcessHeight(ctx, 5, ch)
require.NoError(t, err)

events := collectEvents(t, ch, 50*time.Millisecond)
require.Len(t, events, 1)
require.Equal(t, uint64(5), events[0].Header.Height())
require.NotNil(t, events[0].Data)
}

func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) {
func TestP2PHandler_ProcessHeight_SkipsWhenDataMissing(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()

Expand All @@ -174,27 +175,30 @@ func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) {
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("missing")).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 7, 7, ch)
err = p.Handler.ProcessHeight(ctx, 7, ch)
require.Error(t, err)

require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
}

func TestP2PHandler_ProcessRange_SkipsWhenHeaderMissing(t *testing.T) {
func TestP2PHandler_ProcessHeight_SkipsWhenHeaderMissing(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()

p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("missing")).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 9, 9, ch)
err := p.Handler.ProcessHeight(ctx, 9, ch)
require.Error(t, err)

require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(9))
}

func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) {
func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()
var err error

badAddr, pub, signer := buildTestSigner(t)
require.NotEqual(t, string(p.Genesis.ProposerAddress), string(badAddr))
Expand All @@ -205,19 +209,29 @@ func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) {
p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, nil).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 11, 11, ch)
err = p.Handler.ProcessHeight(ctx, 11, ch)
require.Error(t, err)

require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(11))
}

func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) {
func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()

// Mark up to height 5 as processed.
p.Handler.SetProcessedHeight(5)

ch := make(chan common.DAHeightEvent, 1)

// Heights below or equal to 5 should be skipped without touching the stores.
require.NoError(t, p.Handler.ProcessHeight(ctx, 4, ch))
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
p.HeaderStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4))
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4))

// Height 6 should be fetched normally.
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 6, p.ProposerAddr, p.ProposerPub, p.Signer)
data := makeData(p.Genesis.ChainID, 6, 1)
header.DataHash = data.DACommitment()
Expand All @@ -230,15 +244,14 @@ func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) {
p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header, nil).Once()
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data, nil).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 4, 6, ch)
require.NoError(t, p.Handler.ProcessHeight(ctx, 6, ch))

events := collectEvents(t, ch, 50*time.Millisecond)
require.Len(t, events, 1)
require.Equal(t, uint64(6), events[0].Header.Height())
}

func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) {
func TestP2PHandler_SetProcessedHeightPreventsDuplicates(t *testing.T) {
p := setupP2P(t)
ctx := context.Background()

Expand All @@ -255,18 +268,18 @@ func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) {
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(data, nil).Once()

ch := make(chan common.DAHeightEvent, 1)
p.Handler.ProcessHeaderRange(ctx, 8, 8, ch)
require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch))

events := collectEvents(t, ch, 50*time.Millisecond)
require.Len(t, events, 1)

// Mark the height as processed; a subsequent range should skip lookups.
// Mark the height as processed; a subsequent request should skip store access.
p.Handler.SetProcessedHeight(8)

p.HeaderStore.AssertExpectations(t)
p.DataStore.AssertExpectations(t)

// No additional expectations set; if the handler queried the stores again the mock would fail.
p.Handler.ProcessHeaderRange(ctx, 7, 8, ch)
require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch))
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
}
Loading
Loading