diff --git a/block/internal/common/broadcaster_mock.go b/block/internal/common/broadcaster_mock.go index 3c1d0c7ef..298347807 100644 --- a/block/internal/common/broadcaster_mock.go +++ b/block/internal/common/broadcaster_mock.go @@ -8,7 +8,6 @@ import ( "context" "github.com/celestiaorg/go-header" - "github.com/evstack/ev-node/pkg/sync/notifier" "github.com/libp2p/go-libp2p-pubsub" mock "github.com/stretchr/testify/mock" ) @@ -40,52 +39,6 @@ func (_m *MockBroadcaster[H]) EXPECT() *MockBroadcaster_Expecter[H] { return &MockBroadcaster_Expecter[H]{mock: &_m.Mock} } -// Notifier provides a mock function for the type MockBroadcaster -func (_mock *MockBroadcaster[H]) Notifier() *notifier.Notifier { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for Notifier") - } - - var r0 *notifier.Notifier - if returnFunc, ok := ret.Get(0).(func() *notifier.Notifier); ok { - r0 = returnFunc() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*notifier.Notifier) - } - } - return r0 -} - -// MockBroadcaster_Notifier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notifier' -type MockBroadcaster_Notifier_Call[H header.Header[H]] struct { - *mock.Call -} - -// Notifier is a helper method to define mock.On call -func (_e *MockBroadcaster_Expecter[H]) Notifier() *MockBroadcaster_Notifier_Call[H] { - return &MockBroadcaster_Notifier_Call[H]{Call: _e.mock.On("Notifier")} -} - -func (_c *MockBroadcaster_Notifier_Call[H]) Run(run func()) *MockBroadcaster_Notifier_Call[H] { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockBroadcaster_Notifier_Call[H]) Return(notifier1 *notifier.Notifier) *MockBroadcaster_Notifier_Call[H] { - _c.Call.Return(notifier1) - return _c -} - -func (_c *MockBroadcaster_Notifier_Call[H]) RunAndReturn(run func() *notifier.Notifier) *MockBroadcaster_Notifier_Call[H] { - _c.Call.Return(run) - return _c -} - // Store provides a mock function for the type MockBroadcaster func (_mock *MockBroadcaster[H]) Store() header.Store[H] { ret := _mock.Called() diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go index 5891c4e20..8f36af624 100644 --- a/block/internal/common/expected_interfaces.go +++ b/block/internal/common/expected_interfaces.go @@ -6,13 +6,10 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/celestiaorg/go-header" - - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" ) // broadcaster interface for P2P broadcasting type Broadcaster[H header.Header[H]] interface { WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error Store() header.Store[H] - Notifier() *syncnotifier.Notifier } diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index 35c9e464f..f51f8090b 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -55,74 +55,42 @@ func (h *P2PHandler) SetProcessedHeight(height uint64) { h.mu.Unlock() } -// ProcessHeaderRange scans the provided heights and emits events when both the -// header and data are available. -func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { - if startHeight > endHeight { - return - } - - for height := startHeight; height <= endHeight; height++ { - h.mu.Lock() - shouldProcess := height > h.processedHeight - h.mu.Unlock() - - if !shouldProcess { - continue - } - h.processHeight(ctx, height, heightInCh, "header_range") - } -} - -// ProcessDataRange scans the provided heights and emits events when both the -// header and data are available. -func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { - if startHeight > endHeight { - return - } - - for height := startHeight; height <= endHeight; height++ { - h.mu.Lock() - shouldProcess := height > h.processedHeight - h.mu.Unlock() +// ProcessHeight waits until both header and data for the given height are available. +// Once available, it validates and emits the event to the provided channel or stores it as pending. +func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error { + h.mu.Lock() + shouldProcess := height > h.processedHeight + h.mu.Unlock() - if !shouldProcess { - continue - } - h.processHeight(ctx, height, heightInCh, "data_range") + if !shouldProcess { + return nil } -} -func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent, source string) { header, err := h.headerStore.GetByHeight(ctx, height) if err != nil { if ctx.Err() == nil { - h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("header unavailable in store") + h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store") } - return + return err } if err := h.assertExpectedProposer(header.ProposerAddress); err != nil { - h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("invalid header from P2P") - return + h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P") + return err } data, err := h.dataStore.GetByHeight(ctx, height) if err != nil { if ctx.Err() == nil { - h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("data unavailable in store") + h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store") } - return + return err } dataCommitment := data.DACommitment() if !bytes.Equal(header.DataHash[:], dataCommitment[:]) { - h.logger.Warn(). - Uint64("height", height). - Str("header_data_hash", fmt.Sprintf("%x", header.DataHash)). - Str("actual_data_hash", fmt.Sprintf("%x", dataCommitment)). - Str("source", source). - Msg("DataHash mismatch: header and data do not match from P2P, discarding") - return + err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment) + h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P") + return err } event := common.DAHeightEvent{ @@ -138,7 +106,14 @@ func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInC h.cache.SetPendingEvent(event.Header.Height(), &event) } - h.logger.Debug().Uint64("height", height).Str("source", source).Msg("processed event from P2P") + h.mu.Lock() + if height > h.processedHeight { + h.processedHeight = height + } + h.mu.Unlock() + + h.logger.Debug().Uint64("height", height).Msg("processed event from P2P") + return nil } // assertExpectedProposer validates the proposer address. diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 7519aa40d..bd9c4178a 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -130,7 +130,7 @@ func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Du } } -func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T) { +func TestP2PHandler_ProcessHeight_EmitsEventWhenHeaderAndDataPresent(t *testing.T) { p := setupP2P(t) ctx := context.Background() @@ -149,7 +149,8 @@ func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data, nil).Once() ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 5, 5, ch) + err = p.Handler.ProcessHeight(ctx, 5, ch) + require.NoError(t, err) events := collectEvents(t, ch, 50*time.Millisecond) require.Len(t, events, 1) @@ -157,7 +158,7 @@ func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T require.NotNil(t, events[0].Data) } -func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) { +func TestP2PHandler_ProcessHeight_SkipsWhenDataMissing(t *testing.T) { p := setupP2P(t) ctx := context.Background() @@ -174,27 +175,30 @@ func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) { p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("missing")).Once() ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 7, 7, ch) + err = p.Handler.ProcessHeight(ctx, 7, ch) + require.Error(t, err) require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) } -func TestP2PHandler_ProcessRange_SkipsWhenHeaderMissing(t *testing.T) { +func TestP2PHandler_ProcessHeight_SkipsWhenHeaderMissing(t *testing.T) { p := setupP2P(t) ctx := context.Background() p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("missing")).Once() ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 9, 9, ch) + err := p.Handler.ProcessHeight(ctx, 9, ch) + require.Error(t, err) require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(9)) } -func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) { +func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) { p := setupP2P(t) ctx := context.Background() + var err error badAddr, pub, signer := buildTestSigner(t) require.NotEqual(t, string(p.Genesis.ProposerAddress), string(badAddr)) @@ -205,19 +209,29 @@ func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) { p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, nil).Once() ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 11, 11, ch) + err = p.Handler.ProcessHeight(ctx, 11, ch) + require.Error(t, err) require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(11)) } -func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) { +func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) { p := setupP2P(t) ctx := context.Background() // Mark up to height 5 as processed. p.Handler.SetProcessedHeight(5) + ch := make(chan common.DAHeightEvent, 1) + + // Heights below or equal to 5 should be skipped without touching the stores. + require.NoError(t, p.Handler.ProcessHeight(ctx, 4, ch)) + require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) + p.HeaderStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4)) + p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4)) + + // Height 6 should be fetched normally. header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 6, p.ProposerAddr, p.ProposerPub, p.Signer) data := makeData(p.Genesis.ChainID, 6, 1) header.DataHash = data.DACommitment() @@ -230,15 +244,14 @@ func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) { p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header, nil).Once() p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data, nil).Once() - ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 4, 6, ch) + require.NoError(t, p.Handler.ProcessHeight(ctx, 6, ch)) events := collectEvents(t, ch, 50*time.Millisecond) require.Len(t, events, 1) require.Equal(t, uint64(6), events[0].Header.Height()) } -func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) { +func TestP2PHandler_SetProcessedHeightPreventsDuplicates(t *testing.T) { p := setupP2P(t) ctx := context.Background() @@ -255,18 +268,18 @@ func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) { p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(data, nil).Once() ch := make(chan common.DAHeightEvent, 1) - p.Handler.ProcessHeaderRange(ctx, 8, 8, ch) + require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch)) events := collectEvents(t, ch, 50*time.Millisecond) require.Len(t, events, 1) - // Mark the height as processed; a subsequent range should skip lookups. + // Mark the height as processed; a subsequent request should skip store access. p.Handler.SetProcessedHeight(8) p.HeaderStore.AssertExpectations(t) p.DataStore.AssertExpectations(t) // No additional expectations set; if the handler queried the stores again the mock would fail. - p.Handler.ProcessHeaderRange(ctx, 7, 8, ch) + require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch)) require.Empty(t, collectEvents(t, ch, 50*time.Millisecond)) } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 609ba938c..47ffce600 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -21,7 +21,6 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/store" - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" "github.com/evstack/ev-node/types" ) @@ -30,8 +29,7 @@ type daRetriever interface { } type p2pHandler interface { - ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) - ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) + ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error SetProcessedHeight(height uint64) } @@ -77,9 +75,9 @@ type Syncer struct { cancel context.CancelFunc wg sync.WaitGroup - // Notifier subscriptions - headerSub *syncnotifier.Subscription - dataSub *syncnotifier.Subscription + // P2P wait coordination + p2pWaitMu sync.Mutex + p2pWaitState p2pWaitState } // NewSyncer creates a new block syncer @@ -110,7 +108,7 @@ func NewSyncer( dataStore: dataStore, lastState: &atomic.Pointer[types.State]{}, daHeight: &atomic.Uint64{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), + heightInCh: make(chan common.DAHeightEvent, 1_000), errorCh: errorCh, logger: logger.With().Str("component", "syncer").Logger(), } @@ -134,10 +132,6 @@ func (s *Syncer) Start(ctx context.Context) error { s.p2pHandler.SetProcessedHeight(currentHeight) } - if err := s.startP2PListeners(); err != nil { - return fmt.Errorf("failed to start P2P listeners: %w", err) - } - // Start main processing loop s.wg.Add(1) go func() { @@ -157,12 +151,7 @@ func (s *Syncer) Stop() error { if s.cancel != nil { s.cancel() } - if s.headerSub != nil { - s.headerSub.Cancel() - } - if s.dataSub != nil { - s.dataSub.Cancel() - } + s.cancelP2PWait(0) s.wg.Wait() s.logger.Info().Msg("syncer stopped") return nil @@ -262,107 +251,117 @@ func (s *Syncer) processLoop() { } func (s *Syncer) startSyncWorkers() { - s.wg.Add(2) + s.wg.Add(3) go s.daWorkerLoop() go s.pendingWorkerLoop() + go s.p2pWorkerLoop() } -func (s *Syncer) startP2PListeners() error { - headerNotifier := s.headerStore.Notifier() - if headerNotifier == nil { - return errors.New("header notifier not configured") - } - - dataNotifier := s.dataStore.Notifier() - if dataNotifier == nil { - return errors.New("data notifier not configured") - } - - s.headerSub = headerNotifier.Subscribe() - s.wg.Add(1) - go s.consumeNotifierEvents(s.headerSub, syncnotifier.EventTypeHeader) - - s.dataSub = dataNotifier.Subscribe() - s.wg.Add(1) - go s.consumeNotifierEvents(s.dataSub, syncnotifier.EventTypeData) - - return nil -} +const ( + fastDAPollInterval = 10 * time.Millisecond + futureHeightBackoff = 6 * time.Second // current celestia block time +) -func (s *Syncer) consumeNotifierEvents(sub *syncnotifier.Subscription, expected syncnotifier.EventType) { +func (s *Syncer) daWorkerLoop() { defer s.wg.Done() if !s.waitForGenesis() { return } - logger := s.logger.With().Str("event_type", string(expected)).Logger() - logger.Info().Msg("starting P2P notifier worker") - defer logger.Info().Msg("P2P notifier worker stopped") + s.logger.Info().Msg("starting DA worker") + defer s.logger.Info().Msg("DA worker stopped") + + nextDARequestAt := &time.Time{} + pollInterval := fastDAPollInterval for { + s.tryFetchFromDA(nextDARequestAt) select { case <-s.ctx.Done(): return - case evt, ok := <-sub.C: - if !ok { - return - } - if evt.Type != expected { - logger.Debug().Str("received_type", string(evt.Type)).Msg("ignoring notifier event with unexpected type") - continue - } - s.logger.Debug().Str("event_type", string(evt.Type)).Str("source", string(evt.Source)).Uint64("height", evt.Height).Msg("received store event") - s.tryFetchFromP2P() + case <-time.After(pollInterval): } } } -func (s *Syncer) daWorkerLoop() { +func (s *Syncer) pendingWorkerLoop() { defer s.wg.Done() if !s.waitForGenesis() { return } - s.logger.Info().Msg("starting DA worker") - defer s.logger.Info().Msg("DA worker stopped") + s.logger.Info().Msg("starting pending worker") + defer s.logger.Info().Msg("pending worker stopped") - nextDARequestAt := &time.Time{} - pollInterval := min(10*time.Millisecond, s.config.Node.BlockTime.Duration) - if pollInterval <= 0 { - pollInterval = 10 * time.Millisecond - } + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() for { - s.tryFetchFromDA(nextDARequestAt) select { case <-s.ctx.Done(): return - case <-time.After(pollInterval): + case <-ticker.C: + s.processPendingEvents() } } } -func (s *Syncer) pendingWorkerLoop() { +func (s *Syncer) p2pWorkerLoop() { defer s.wg.Done() if !s.waitForGenesis() { return } - s.logger.Info().Msg("starting pending worker") - defer s.logger.Info().Msg("pending worker stopped") - - ticker := time.NewTicker(10 * time.Millisecond) - defer ticker.Stop() + logger := s.logger.With().Str("worker", "p2p").Logger() + logger.Info().Msg("starting P2P worker") + defer logger.Info().Msg("P2P worker stopped") for { select { case <-s.ctx.Done(): return - case <-ticker.C: - s.processPendingEvents() + default: + } + + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get current height for P2P worker") + if !s.sleepOrDone(50 * time.Millisecond) { + return + } + continue + } + + targetHeight := currentHeight + 1 + waitCtx, cancel := context.WithCancel(s.ctx) + s.setP2PWaitState(targetHeight, cancel) + + err = s.p2pHandler.ProcessHeight(waitCtx, targetHeight, s.heightInCh) + s.cancelP2PWait(targetHeight) + + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + continue + } + + if waitCtx.Err() == nil { + logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height") + } + + if !s.sleepOrDone(50 * time.Millisecond) { + return + } + continue + } + + if err := s.waitForStoreHeight(targetHeight); err != nil { + if errors.Is(err, context.Canceled) { + return + } + logger.Error().Err(err).Uint64("height", targetHeight).Msg("failed waiting for height commit") } } } @@ -396,24 +395,30 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { // DaHeight is only increased on successful retrieval, it will retry on failure at the next iteration events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight) if err != nil { - if errors.Is(err, coreda.ErrBlobNotFound) { + switch { + case errors.Is(err, coreda.ErrBlobNotFound): // no data at this height, increase DA height s.SetDAHeight(daHeight + 1) // Reset backoff on success *nextDARequestAt = time.Time{} return - } - - // Back off exactly by DA block time to avoid overloading - backoffDelay := s.config.DA.BlockTime.Duration - if backoffDelay <= 0 { - backoffDelay = 2 * time.Second - } - *nextDARequestAt = now.Add(backoffDelay) + case errors.Is(err, coreda.ErrHeightFromFuture): + delay := futureHeightBackoff + *nextDARequestAt = now.Add(delay) + s.logger.Debug().Err(err).Dur("delay", delay).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests") + return + default: + // Back off exactly by DA block time to avoid overloading + backoffDelay := s.config.DA.BlockTime.Duration + if backoffDelay <= 0 { + backoffDelay = 2 * time.Second + } + *nextDARequestAt = now.Add(backoffDelay) - s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") + s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - return + return + } } // Reset backoff on success @@ -432,29 +437,6 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { s.SetDAHeight(daHeight + 1) } -// tryFetchFromP2P attempts to fetch events from P2P stores. -// It processes both header and data ranges when the block ticker fires. -// Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P() { - currentHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get current height") - return - } - - // Process headers - newHeaderHeight := s.headerStore.Store().Height() - if newHeaderHeight > currentHeight { - s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) - } - - // Process data (if not already processed by headers) - newDataHeight := s.dataStore.Store().Height() - if newDataHeight > currentHeight { - s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) - } -} - func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { height := event.Header.Height() headerHash := event.Header.Hash().String() @@ -513,6 +495,9 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } + // Cancel any P2P wait that might still be blocked on this height + s.cancelP2PWait(height) + // only save to p2p stores if the event came from DA if event.Source == common.SourceDA { g, ctx := errgroup.WithContext(s.ctx) @@ -748,3 +733,58 @@ func (s *Syncer) processPendingEvents() { nextHeight++ } } + +func (s *Syncer) waitForStoreHeight(target uint64) error { + for { + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + return err + } + + if currentHeight >= target { + return nil + } + + if !s.sleepOrDone(10 * time.Millisecond) { + if s.ctx.Err() != nil { + return s.ctx.Err() + } + } + } +} + +func (s *Syncer) sleepOrDone(duration time.Duration) bool { + timer := time.NewTimer(duration) + defer timer.Stop() + + select { + case <-s.ctx.Done(): + return false + case <-timer.C: + return true + } +} + +type p2pWaitState struct { + height uint64 + cancel context.CancelFunc +} + +func (s *Syncer) setP2PWaitState(height uint64, cancel context.CancelFunc) { + s.p2pWaitMu.Lock() + defer s.p2pWaitMu.Unlock() + s.p2pWaitState = p2pWaitState{ + height: height, + cancel: cancel, + } +} + +func (s *Syncer) cancelP2PWait(height uint64) { + s.p2pWaitMu.Lock() + defer s.p2pWaitMu.Unlock() + + if s.p2pWaitState.cancel != nil && (height == 0 || s.p2pWaitState.height <= height) { + s.p2pWaitState.cancel() + s.p2pWaitState = p2pWaitState{} + } +} diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 38d9c1f79..9d58d226e 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -71,9 +71,9 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { // Setup mocks daRetriever := newMockdaRetriever(t) p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever syncer.p2pHandler = p2pHandler - p2pHandler.On("OnHeightProcessed", mock.Anything).Return().Maybe() p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() // Create mock stores for P2P @@ -167,11 +167,9 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { daRetriever := newMockdaRetriever(t) p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever syncer.p2pHandler = p2pHandler - p2pHandler.On("OnHeightProcessed", mock.Anything).Return().Maybe() - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - p2pHandler.On("OnHeightProcessed", mock.Anything).Return().Maybe() p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() // Create mock stores for P2P @@ -260,6 +258,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { daRetriever := newMockdaRetriever(t) p2pHandler := newMockp2pHandler(t) + p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever syncer.p2pHandler = p2pHandler @@ -279,7 +278,6 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { syncer.dataStore = dataStore var callTimes []time.Time - p2pHandler.On("OnHeightProcessed", mock.Anything).Return().Maybe() p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() // First call - error (triggers backoff) diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index e076f8df8..7404de3c5 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -150,7 +150,6 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay // Attach mocks s.daRetriever = daR mockP2P := newMockp2pHandler(b) // not used directly in this benchmark path - mockP2P.On("OnHeightProcessed", mock.Anything).Return().Maybe() mockP2P.On("SetProcessedHeight", mock.Anything).Return().Maybe() s.p2pHandler = mockP2P headerP2PStore := common.NewMockBroadcaster[*types.SignedHeader](b) diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 7696d5a91..6238a5836 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -133,125 +133,37 @@ func (_m *mockp2pHandler) EXPECT() *mockp2pHandler_Expecter { return &mockp2pHandler_Expecter{mock: &_m.Mock} } -// OnHeightProcessed provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) OnHeightProcessed(height uint64) { - _mock.Called(height) - return -} - -// mockp2pHandler_OnHeightProcessed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnHeightProcessed' -type mockp2pHandler_OnHeightProcessed_Call struct { - *mock.Call -} - -// OnHeightProcessed is a helper method to define mock.On call -// - height uint64 -func (_e *mockp2pHandler_Expecter) OnHeightProcessed(height interface{}) *mockp2pHandler_OnHeightProcessed_Call { - return &mockp2pHandler_OnHeightProcessed_Call{Call: _e.mock.On("OnHeightProcessed", height)} -} - -func (_c *mockp2pHandler_OnHeightProcessed_Call) Run(run func(height uint64)) *mockp2pHandler_OnHeightProcessed_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 uint64 - if args[0] != nil { - arg0 = args[0].(uint64) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *mockp2pHandler_OnHeightProcessed_Call) Return() *mockp2pHandler_OnHeightProcessed_Call { - _c.Call.Return() - return _c -} - -func (_c *mockp2pHandler_OnHeightProcessed_Call) RunAndReturn(run func(height uint64)) *mockp2pHandler_OnHeightProcessed_Call { - _c.Run(run) - return _c -} - -// ProcessDataRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { - _mock.Called(ctx, fromHeight, toHeight, heightInCh) - return -} - -// mockp2pHandler_ProcessDataRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessDataRange' -type mockp2pHandler_ProcessDataRange_Call struct { - *mock.Call -} - -// ProcessDataRange is a helper method to define mock.On call -// - ctx context.Context -// - fromHeight uint64 -// - toHeight uint64 -// - heightInCh chan<- common.DAHeightEvent -func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessDataRange_Call { - return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight, heightInCh)} -} - -func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 uint64 - if args[1] != nil { - arg1 = args[1].(uint64) - } - var arg2 uint64 - if args[2] != nil { - arg2 = args[2].(uint64) - } - var arg3 chan<- common.DAHeightEvent - if args[3] != nil { - arg3 = args[3].(chan<- common.DAHeightEvent) - } - run( - arg0, - arg1, - arg2, - arg3, - ) - }) - return _c -} +// ProcessHeight provides a mock function for the type mockp2pHandler +func (_mock *mockp2pHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error { + ret := _mock.Called(ctx, height, heightInCh) -func (_c *mockp2pHandler_ProcessDataRange_Call) Return() *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return() - return _c -} - -func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { - _c.Run(run) - return _c -} + if len(ret) == 0 { + panic("no return value specified for ProcessHeight") + } -// ProcessHeaderRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { - _mock.Called(ctx, fromHeight, toHeight, heightInCh) - return + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, chan<- common.DAHeightEvent) error); ok { + r0 = returnFunc(ctx, height, heightInCh) + } else { + r0 = ret.Error(0) + } + return r0 } -// mockp2pHandler_ProcessHeaderRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeaderRange' -type mockp2pHandler_ProcessHeaderRange_Call struct { +// mockp2pHandler_ProcessHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeight' +type mockp2pHandler_ProcessHeight_Call struct { *mock.Call } -// ProcessHeaderRange is a helper method to define mock.On call +// ProcessHeight is a helper method to define mock.On call // - ctx context.Context -// - fromHeight uint64 -// - toHeight uint64 +// - height uint64 // - heightInCh chan<- common.DAHeightEvent -func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeaderRange_Call { - return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight, heightInCh)} +func (_e *mockp2pHandler_Expecter) ProcessHeight(ctx interface{}, height interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeight_Call { + return &mockp2pHandler_ProcessHeight_Call{Call: _e.mock.On("ProcessHeight", ctx, height, heightInCh)} } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { +func (_c *mockp2pHandler_ProcessHeight_Call) Run(run func(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeight_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -261,31 +173,26 @@ func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Conte if args[1] != nil { arg1 = args[1].(uint64) } - var arg2 uint64 + var arg2 chan<- common.DAHeightEvent if args[2] != nil { - arg2 = args[2].(uint64) - } - var arg3 chan<- common.DAHeightEvent - if args[3] != nil { - arg3 = args[3].(chan<- common.DAHeightEvent) + arg2 = args[2].(chan<- common.DAHeightEvent) } run( arg0, arg1, arg2, - arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return() *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return() +func (_c *mockp2pHandler_ProcessHeight_Call) Return(err error) *mockp2pHandler_ProcessHeight_Call { + _c.Call.Return(err) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Run(run) +func (_c *mockp2pHandler_ProcessHeight_Call) RunAndReturn(run func(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error) *mockp2pHandler_ProcessHeight_Call { + _c.Call.Return(run) return _c } @@ -328,36 +235,3 @@ func (_c *mockp2pHandler_SetProcessedHeight_Call) RunAndReturn(run func(height u _c.Run(run) return _c } - -// Shutdown provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) Shutdown() { - _mock.Called() - return -} - -// mockp2pHandler_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown' -type mockp2pHandler_Shutdown_Call struct { - *mock.Call -} - -// Shutdown is a helper method to define mock.On call -func (_e *mockp2pHandler_Expecter) Shutdown() *mockp2pHandler_Shutdown_Call { - return &mockp2pHandler_Shutdown_Call{Call: _e.mock.On("Shutdown")} -} - -func (_c *mockp2pHandler_Shutdown_Call) Run(run func()) *mockp2pHandler_Shutdown_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *mockp2pHandler_Shutdown_Call) Return() *mockp2pHandler_Shutdown_Call { - _c.Call.Return() - return _c -} - -func (_c *mockp2pHandler_Shutdown_Call) RunAndReturn(run func()) *mockp2pHandler_Shutdown_Call { - _c.Run(run) - return _c -} diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 4ebea4296..93e7ae38b 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -27,7 +27,6 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/store" - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -101,33 +100,6 @@ func makeData(chainID string, height uint64, txs int) *types.Data { return d } -type stubP2PHandler struct { - headerCalls chan [2]uint64 - dataCalls chan [2]uint64 -} - -func newStubP2PHandler() *stubP2PHandler { - return &stubP2PHandler{ - headerCalls: make(chan [2]uint64, 4), - dataCalls: make(chan [2]uint64, 4), - } -} - -func (s *stubP2PHandler) ProcessHeaderRange(_ context.Context, fromHeight, toHeight uint64, _ chan<- common.DAHeightEvent) { - s.headerCalls <- [2]uint64{fromHeight, toHeight} -} - -func (s *stubP2PHandler) ProcessDataRange(_ context.Context, fromHeight, toHeight uint64, _ chan<- common.DAHeightEvent) { - s.dataCalls <- [2]uint64{fromHeight, toHeight} -} - -func (s *stubP2PHandler) SetProcessedHeight(uint64) {} - -func (s *stubP2PHandler) OnHeightProcessed(uint64) {} - -func (s *stubP2PHandler) Shutdown() { -} - func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) @@ -302,152 +274,6 @@ func TestSequentialBlockSync(t *testing.T) { requireEmptyChan(t, errChan) } -func TestSyncerNotifierTriggersHeaderRange(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cfg := config.DefaultConfig() - gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: []byte("proposer")} - - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - mockExec := testmocks.NewMockExecutor(t) - mockExec.EXPECT().InitChain(mock.Anything, gen.StartTime, gen.InitialHeight, gen.ChainID).Return([]byte("app0"), uint64(1024), nil).Once() - - headerNotifier := syncnotifier.New(8, zerolog.Nop()) - dataNotifier := syncnotifier.New(8, zerolog.Nop()) - - headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) - headerStoreMock.EXPECT().Height().Return(uint64(5)).Maybe() - - dataStoreMock := extmocks.NewMockStore[*types.Data](t) - dataStoreMock.EXPECT().Height().Return(uint64(0)).Maybe() - - headerBroadcaster := common.NewMockBroadcaster[*types.SignedHeader](t) - headerBroadcaster.EXPECT().Notifier().Return(headerNotifier).Once() - headerBroadcaster.EXPECT().Store().Return(headerStoreMock).Maybe() - - dataBroadcaster := common.NewMockBroadcaster[*types.Data](t) - dataBroadcaster.EXPECT().Notifier().Return(dataNotifier).Once() - dataBroadcaster.EXPECT().Store().Return(dataStoreMock).Maybe() - - s := NewSyncer( - st, - mockExec, - nil, - cm, - common.NopMetrics(), - cfg, - gen, - headerBroadcaster, - dataBroadcaster, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - ) - require.NoError(t, s.initializeState()) - - s.ctx, s.cancel = context.WithCancel(context.Background()) - defer func() { - require.NoError(t, s.Stop()) - }() - - stubHandler := newStubP2PHandler() - s.p2pHandler = stubHandler - - require.NoError(t, s.startP2PListeners()) - - published := headerNotifier.Publish(syncnotifier.Event{Type: syncnotifier.EventTypeHeader, Height: 5, Source: syncnotifier.SourceP2P, Timestamp: time.Now()}) - require.True(t, published) - - select { - case call := <-stubHandler.headerCalls: - require.Equal(t, uint64(1), call[0]) - require.Equal(t, uint64(5), call[1]) - case <-time.After(time.Second): - t.Fatal("timeout waiting for header range processing") - } - - select { - case <-stubHandler.dataCalls: - t.Fatal("unexpected data range processing for header-only event") - default: - } -} - -func TestSyncerNotifierTriggersDataRange(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cfg := config.DefaultConfig() - gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: []byte("proposer")} - - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - mockExec := testmocks.NewMockExecutor(t) - mockExec.EXPECT().InitChain(mock.Anything, gen.StartTime, gen.InitialHeight, gen.ChainID).Return([]byte("app0"), uint64(1024), nil).Once() - - headerNotifier := syncnotifier.New(8, zerolog.Nop()) - dataNotifier := syncnotifier.New(8, zerolog.Nop()) - - headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) - headerStoreMock.EXPECT().Height().Return(uint64(0)).Maybe() - - dataStoreMock := extmocks.NewMockStore[*types.Data](t) - dataStoreMock.EXPECT().Height().Return(uint64(7)).Maybe() - - headerBroadcaster := common.NewMockBroadcaster[*types.SignedHeader](t) - headerBroadcaster.EXPECT().Notifier().Return(headerNotifier).Once() - headerBroadcaster.EXPECT().Store().Return(headerStoreMock).Maybe() - - dataBroadcaster := common.NewMockBroadcaster[*types.Data](t) - dataBroadcaster.EXPECT().Notifier().Return(dataNotifier).Once() - dataBroadcaster.EXPECT().Store().Return(dataStoreMock).Maybe() - - s := NewSyncer( - st, - mockExec, - nil, - cm, - common.NopMetrics(), - cfg, - gen, - headerBroadcaster, - dataBroadcaster, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - ) - require.NoError(t, s.initializeState()) - - s.ctx, s.cancel = context.WithCancel(context.Background()) - defer func() { - require.NoError(t, s.Stop()) - }() - - stubHandler := newStubP2PHandler() - s.p2pHandler = stubHandler - - require.NoError(t, s.startP2PListeners()) - - published := dataNotifier.Publish(syncnotifier.Event{Type: syncnotifier.EventTypeData, Height: 7, Source: syncnotifier.SourceP2P, Timestamp: time.Now()}) - require.True(t, published) - - select { - case call := <-stubHandler.dataCalls: - require.Equal(t, uint64(1), call[0]) - require.Equal(t, uint64(7), call[1]) - case <-time.After(time.Second): - t.Fatal("timeout waiting for data range processing") - } - - select { - case <-stubHandler.headerCalls: - t.Fatal("unexpected header range processing for data-only event") - default: - } -} - func TestSyncer_sendNonBlockingSignal(t *testing.T) { s := &Syncer{logger: zerolog.Nop()} ch := make(chan struct{}, 1) @@ -510,6 +336,8 @@ func TestSyncLoopPersistState(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) cfg := config.DefaultConfig() + t.Setenv("HOME", t.TempDir()) + cfg.RootDir = t.TempDir() cfg.ClearCache = true cacheMgr, err := cache.NewManager(cfg, st, zerolog.Nop()) @@ -556,9 +384,7 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("OnHeightProcessed", mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() p2pHndlMock.On("SetProcessedHeight", mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock @@ -650,9 +476,7 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("OnHeightProcessed", mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() p2pHndlMock.On("SetProcessedHeight", mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock diff --git a/node/full.go b/node/full.go index 153df3237..2097c24b5 100644 --- a/node/full.go +++ b/node/full.go @@ -29,7 +29,6 @@ import ( "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" evsync "github.com/evstack/ev-node/pkg/sync" - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" ) // prefixes used in KV store to separate rollkit data from execution environment data (if the same data base is reused) @@ -39,9 +38,6 @@ const ( // genesisChunkSize is the maximum size, in bytes, of each // chunk in the genesis structure for the chunked API genesisChunkSize = 16 * 1024 * 1024 // 16 MiB - - // syncNotifierBufferSize bounds event fan-out queues. - syncNotifierBufferSize = 128 ) var _ Node = &FullNode{} @@ -159,13 +155,8 @@ func initHeaderSyncService( logger zerolog.Logger, ) (*evsync.HeaderSyncService, error) { componentLogger := logger.With().Str("component", "HeaderSyncService").Logger() - var opts []evsync.ServiceOption - if !nodeConfig.Node.Aggregator { - n := syncnotifier.New(syncNotifierBufferSize, componentLogger.With().Str("subcomponent", "notifier").Logger()) - opts = append(opts, evsync.WithNotifier(n)) - } - headerSyncService, err := evsync.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger, opts...) + headerSyncService, err := evsync.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger) if err != nil { return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } @@ -180,13 +171,8 @@ func initDataSyncService( logger zerolog.Logger, ) (*evsync.DataSyncService, error) { componentLogger := logger.With().Str("component", "DataSyncService").Logger() - var opts []evsync.ServiceOption - if !nodeConfig.Node.Aggregator { - n := syncnotifier.New(syncNotifierBufferSize, componentLogger.With().Str("subcomponent", "notifier").Logger()) - opts = append(opts, evsync.WithNotifier(n)) - } - dataSyncService, err := evsync.NewDataSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger, opts...) + dataSyncService, err := evsync.NewDataSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger) if err != nil { return nil, fmt.Errorf("error while initializing DataSyncService: %w", err) } diff --git a/node/light.go b/node/light.go index b2626b4e7..89fc0230f 100644 --- a/node/light.go +++ b/node/light.go @@ -17,13 +17,10 @@ import ( "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/pkg/sync" - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" ) var _ Node = &LightNode{} -const lightNotifierBufferSize = 128 - // LightNode is a chain node that only needs the header service type LightNode struct { service.BaseService @@ -46,8 +43,7 @@ func newLightNode( logger zerolog.Logger, ) (ln *LightNode, err error) { componentLogger := logger.With().Str("component", "HeaderSyncService").Logger() - n := syncnotifier.New(lightNotifierBufferSize, componentLogger.With().Str("subcomponent", "notifier").Logger()) - headerSyncService, err := sync.NewHeaderSyncService(database, conf, genesis, p2pClient, componentLogger, sync.WithNotifier(n)) + headerSyncService, err := sync.NewHeaderSyncService(database, conf, genesis, p2pClient, componentLogger) if err != nil { return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } diff --git a/pkg/sync/notifier/notifier.go b/pkg/sync/notifier/notifier.go deleted file mode 100644 index 483aa4621..000000000 --- a/pkg/sync/notifier/notifier.go +++ /dev/null @@ -1,145 +0,0 @@ -package notifier - -import ( - "sync" - "time" - - "github.com/rs/zerolog" -) - -// EventType identifies whether a notification refers to a header or block data artifact. -type EventType string - -const ( - // EventTypeHeader marks header events. - EventTypeHeader EventType = "header" - // EventTypeData marks block data events. - EventTypeData EventType = "data" -) - -// EventSource describes where the artifact originated from. -type EventSource string - -const ( - // SourceUnknown marks an event where the origin could not be determined. - SourceUnknown EventSource = "unknown" - // SourceLocal marks events generated by the local node (executor, bootstrap, DA replay). - SourceLocal EventSource = "local" - // SourceP2P marks events retrieved from the P2P network. - SourceP2P EventSource = "p2p" -) - -// Event carries metadata about a stored artifact height. -type Event struct { - Type EventType - Height uint64 - Hash string - Source EventSource - Timestamp time.Time -} - -// Subscription wraps a read-only event channel with an unsubscribe function. -type Subscription struct { - C <-chan Event - cancel func() - once sync.Once - notifier *Notifier - id uint64 -} - -// Cancel stops delivery for this subscription. It is safe to call multiple times. -func (s *Subscription) Cancel() { - if s == nil { - return - } - s.once.Do(func() { - if s.cancel != nil { - s.cancel() - } - }) -} - -// Notifier fan-outs store events to subscribers. -type Notifier struct { - logger zerolog.Logger - size int - - mu sync.RWMutex - subs map[uint64]chan Event - next uint64 -} - -// New creates a notifier with the given buffer size per subscriber. -func New(bufferSize int, logger zerolog.Logger) *Notifier { - if bufferSize <= 0 { - bufferSize = 1 - } - return &Notifier{ - logger: logger, - size: bufferSize, - subs: make(map[uint64]chan Event), - } -} - -// Publish fan-outs the event to all subscribers using best-effort delivery. -// Returns true if at least one subscriber received the event without dropping it. -func (n *Notifier) Publish(evt Event) bool { - n.mu.RLock() - defer n.mu.RUnlock() - - if len(n.subs) == 0 { - return false - } - - delivered := false - for id, ch := range n.subs { - select { - case ch <- evt: - delivered = true - default: - n.logger.Debug(). - Str("type", string(evt.Type)). - Uint64("height", evt.Height). - Str("source", string(evt.Source)). - Uint64("subscriber_id", id). - Msg("dropping event: subscriber channel full") - } - } - - return delivered -} - -// Subscribe registers a new subscriber and returns a subscription handle. -func (n *Notifier) Subscribe() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - - id := n.next - n.next++ - - ch := make(chan Event, n.size) - n.subs[id] = ch - - sub := &Subscription{ - C: ch, - notifier: n, - id: id, - } - sub.cancel = func() { - n.mu.Lock() - defer n.mu.Unlock() - if existing, ok := n.subs[id]; ok { - delete(n.subs, id) - close(existing) - } - } - - return sub -} - -// SubscriberCount reports the current number of active subscribers. -func (n *Notifier) SubscriberCount() int { - n.mu.RLock() - defer n.mu.RUnlock() - return len(n.subs) -} diff --git a/pkg/sync/notifier/notifier_test.go b/pkg/sync/notifier/notifier_test.go deleted file mode 100644 index ac7dea1a7..000000000 --- a/pkg/sync/notifier/notifier_test.go +++ /dev/null @@ -1,518 +0,0 @@ -package notifier - -import ( - "sync" - "testing" - "time" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNew(t *testing.T) { - tests := []struct { - name string - bufferSize int - wantSize int - }{ - { - name: "positive buffer size", - bufferSize: 10, - wantSize: 10, - }, - { - name: "zero buffer size defaults to 1", - bufferSize: 0, - wantSize: 1, - }, - { - name: "negative buffer size defaults to 1", - bufferSize: -5, - wantSize: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - logger := zerolog.Nop() - n := New(tt.bufferSize, logger) - - require.NotNil(t, n) - assert.Equal(t, tt.wantSize, n.size) - assert.NotNil(t, n.subs) - assert.Equal(t, 0, len(n.subs)) - assert.Equal(t, uint64(0), n.next) - }) - } -} - -func TestNotifier_Subscribe(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - t.Run("single subscription", func(t *testing.T) { - sub := n.Subscribe() - require.NotNil(t, sub) - assert.NotNil(t, sub.C) - assert.Equal(t, 1, n.SubscriberCount()) - assert.Equal(t, uint64(0), sub.id) - }) - - t.Run("multiple subscriptions increment IDs", func(t *testing.T) { - sub2 := n.Subscribe() - sub3 := n.Subscribe() - - assert.Equal(t, 3, n.SubscriberCount()) - assert.Equal(t, uint64(1), sub2.id) - assert.Equal(t, uint64(2), sub3.id) - }) -} - -func TestNotifier_Publish_NoSubscribers(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - evt := Event{ - Type: EventTypeHeader, - Height: 100, - Hash: "test-hash", - Source: SourceLocal, - Timestamp: time.Now(), - } - - delivered := n.Publish(evt) - assert.False(t, delivered, "should return false when no subscribers") -} - -func TestNotifier_Publish_SingleSubscriber(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - sub := n.Subscribe() - defer sub.Cancel() - - evt := Event{ - Type: EventTypeHeader, - Height: 100, - Hash: "test-hash", - Source: SourceLocal, - Timestamp: time.Now(), - } - - delivered := n.Publish(evt) - assert.True(t, delivered, "should deliver to subscriber") - - select { - case received := <-sub.C: - assert.Equal(t, evt.Type, received.Type) - assert.Equal(t, evt.Height, received.Height) - assert.Equal(t, evt.Hash, received.Hash) - assert.Equal(t, evt.Source, received.Source) - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } -} - -func TestNotifier_Publish_MultipleSubscribers(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - sub1 := n.Subscribe() - sub2 := n.Subscribe() - sub3 := n.Subscribe() - defer sub1.Cancel() - defer sub2.Cancel() - defer sub3.Cancel() - - evt := Event{ - Type: EventTypeData, - Height: 200, - Hash: "block-hash", - Source: SourceP2P, - Timestamp: time.Now(), - } - - delivered := n.Publish(evt) - assert.True(t, delivered, "should deliver to at least one subscriber") - - // Verify all three subscribers receive the event - var wg sync.WaitGroup - wg.Add(3) - - checkSubscriber := func(sub *Subscription) { - defer wg.Done() - select { - case received := <-sub.C: - assert.Equal(t, evt.Type, received.Type) - assert.Equal(t, evt.Height, received.Height) - case <-time.After(time.Second): - t.Error("timeout waiting for event") - } - } - - go checkSubscriber(sub1) - go checkSubscriber(sub2) - go checkSubscriber(sub3) - - wg.Wait() -} - -func TestNotifier_Publish_FullBuffer(t *testing.T) { - logger := zerolog.Nop() - n := New(2, logger) // Small buffer - - sub := n.Subscribe() - defer sub.Cancel() - - // Fill the buffer - evt1 := Event{Type: EventTypeHeader, Height: 1, Hash: "hash1", Source: SourceLocal, Timestamp: time.Now()} - evt2 := Event{Type: EventTypeHeader, Height: 2, Hash: "hash2", Source: SourceLocal, Timestamp: time.Now()} - - assert.True(t, n.Publish(evt1)) - assert.True(t, n.Publish(evt2)) - - // This should be dropped due to full buffer - evt3 := Event{Type: EventTypeHeader, Height: 3, Hash: "hash3", Source: SourceLocal, Timestamp: time.Now()} - delivered := n.Publish(evt3) - - // Should still return true if at least one subscriber got it (or false if all dropped) - // In this case, buffer is full so it should be dropped - // The function returns true if ANY subscriber received it without dropping - // Since we have a full buffer, this will be dropped - assert.False(t, delivered, "should not deliver when buffer is full") -} - -func TestNotifier_Publish_MixedDelivery(t *testing.T) { - logger := zerolog.Nop() - n := New(1, logger) // Buffer size of 1 - - // Create two subscribers - sub1 := n.Subscribe() - sub2 := n.Subscribe() - defer sub1.Cancel() - defer sub2.Cancel() - - evt1 := Event{Type: EventTypeHeader, Height: 1, Hash: "hash1", Source: SourceLocal, Timestamp: time.Now()} - evt2 := Event{Type: EventTypeHeader, Height: 2, Hash: "hash2", Source: SourceLocal, Timestamp: time.Now()} - - // Fill both buffers - assert.True(t, n.Publish(evt1)) - - // Read from sub1 only, leaving sub2 full - select { - case <-sub1.C: - // sub1 buffer is now empty - case <-time.After(time.Millisecond * 100): - t.Fatal("timeout reading from sub1") - } - - // Publish another event - should deliver to sub1 but drop for sub2 - delivered := n.Publish(evt2) - assert.True(t, delivered, "should deliver to at least one subscriber (sub1)") -} - -func TestSubscription_Cancel(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - t.Run("cancel removes subscriber", func(t *testing.T) { - sub := n.Subscribe() - assert.Equal(t, 1, n.SubscriberCount()) - - sub.Cancel() - assert.Equal(t, 0, n.SubscriberCount()) - }) - - t.Run("cancel closes channel", func(t *testing.T) { - sub := n.Subscribe() - sub.Cancel() - - // Channel should be closed - _, ok := <-sub.C - assert.False(t, ok, "channel should be closed after cancel") - }) - - t.Run("multiple cancels are safe", func(t *testing.T) { - sub := n.Subscribe() - sub.Cancel() - sub.Cancel() - sub.Cancel() - - assert.Equal(t, 0, n.SubscriberCount()) - }) - - t.Run("nil subscription cancel is safe", func(t *testing.T) { - var sub *Subscription - assert.NotPanics(t, func() { - sub.Cancel() - }) - }) - - t.Run("cancel prevents future deliveries", func(t *testing.T) { - sub := n.Subscribe() - sub.Cancel() - - evt := Event{ - Type: EventTypeHeader, - Height: 100, - Hash: "test", - Source: SourceLocal, - Timestamp: time.Now(), - } - - delivered := n.Publish(evt) - assert.False(t, delivered, "should not deliver to cancelled subscriber") - }) -} - -func TestNotifier_SubscriberCount(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - assert.Equal(t, 0, n.SubscriberCount()) - - sub1 := n.Subscribe() - assert.Equal(t, 1, n.SubscriberCount()) - - sub2 := n.Subscribe() - assert.Equal(t, 2, n.SubscriberCount()) - - sub3 := n.Subscribe() - assert.Equal(t, 3, n.SubscriberCount()) - - sub1.Cancel() - assert.Equal(t, 2, n.SubscriberCount()) - - sub2.Cancel() - assert.Equal(t, 1, n.SubscriberCount()) - - sub3.Cancel() - assert.Equal(t, 0, n.SubscriberCount()) -} - -func TestEventTypes(t *testing.T) { - tests := []struct { - name string - eventType EventType - want string - }{ - { - name: "header event type", - eventType: EventTypeHeader, - want: "header", - }, - { - name: "data event type", - eventType: EventTypeData, - want: "data", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, string(tt.eventType)) - }) - } -} - -func TestEventSources(t *testing.T) { - tests := []struct { - name string - source EventSource - want string - }{ - { - name: "unknown source", - source: SourceUnknown, - want: "unknown", - }, - { - name: "local source", - source: SourceLocal, - want: "local", - }, - { - name: "p2p source", - source: SourceP2P, - want: "p2p", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, string(tt.source)) - }) - } -} - -func TestNotifier_ConcurrentOperations(t *testing.T) { - logger := zerolog.Nop() - n := New(10, logger) - - const numSubscribers = 10 - const numEvents = 100 - - var wg sync.WaitGroup - - // Start multiple subscribers - subs := make([]*Subscription, numSubscribers) - for i := 0; i < numSubscribers; i++ { - subs[i] = n.Subscribe() - } - - // Start publishers - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < numEvents; i++ { - evt := Event{ - Type: EventTypeHeader, - Height: uint64(i), - Hash: "hash", - Source: SourceLocal, - Timestamp: time.Now(), - } - n.Publish(evt) - } - }() - - // Start consumers - for i := 0; i < numSubscribers; i++ { - wg.Add(1) - go func(sub *Subscription) { - defer wg.Done() - count := 0 - timeout := time.After(2 * time.Second) - for { - select { - case _, ok := <-sub.C: - if !ok { - return - } - count++ - if count >= numEvents { - return - } - case <-timeout: - return - } - } - }(subs[i]) - } - - // Cancel some subscribers concurrently - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(50 * time.Millisecond) - for i := 0; i < numSubscribers/2; i++ { - subs[i].Cancel() - } - }() - - wg.Wait() - - // Verify final state - assert.LessOrEqual(t, n.SubscriberCount(), numSubscribers/2) -} - -func TestNotifier_SubscriberCountRace(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - var wg sync.WaitGroup - const numGoroutines = 10 - - // Concurrent Subscribe and Cancel operations - for i := 0; i < numGoroutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - sub := n.Subscribe() - _ = n.SubscriberCount() - sub.Cancel() - _ = n.SubscriberCount() - }() - } - - wg.Wait() - assert.Equal(t, 0, n.SubscriberCount()) -} - -func TestEvent_AllFields(t *testing.T) { - now := time.Now() - evt := Event{ - Type: EventTypeData, - Height: 12345, - Hash: "0xabcdef123456", - Source: SourceP2P, - Timestamp: now, - } - - assert.Equal(t, EventTypeData, evt.Type) - assert.Equal(t, uint64(12345), evt.Height) - assert.Equal(t, "0xabcdef123456", evt.Hash) - assert.Equal(t, SourceP2P, evt.Source) - assert.Equal(t, now, evt.Timestamp) -} - -func TestSubscription_CancelIdempotency(t *testing.T) { - logger := zerolog.Nop() - n := New(5, logger) - - sub := n.Subscribe() - initialCount := n.SubscriberCount() - - // Cancel multiple times - for i := 0; i < 5; i++ { - sub.Cancel() - } - - // Should only decrease count once - assert.Equal(t, initialCount-1, n.SubscriberCount()) -} - -func BenchmarkNotifier_Publish(b *testing.B) { - logger := zerolog.Nop() - n := New(100, logger) - - // Create 10 subscribers - subs := make([]*Subscription, 10) - for i := 0; i < 10; i++ { - subs[i] = n.Subscribe() - // Drain events in background - go func(s *Subscription) { - for range s.C { - } - }(subs[i]) - } - - evt := Event{ - Type: EventTypeHeader, - Height: 100, - Hash: "test-hash", - Source: SourceLocal, - Timestamp: time.Now(), - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - n.Publish(evt) - } - - for _, sub := range subs { - sub.Cancel() - } -} - -func BenchmarkNotifier_Subscribe(b *testing.B) { - logger := zerolog.Nop() - n := New(100, logger) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - sub := n.Subscribe() - sub.Cancel() - } -} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index b5d88e281..ba808e366 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -23,7 +23,6 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/p2p" - syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier" "github.com/evstack/ev-node/types" ) @@ -34,63 +33,6 @@ const ( dataSync syncType = "dataSync" ) -func eventTypeForSync(syncType syncType) syncnotifier.EventType { - switch syncType { - case headerSync: - return syncnotifier.EventTypeHeader - case dataSync: - return syncnotifier.EventTypeData - default: - return syncnotifier.EventTypeHeader - } -} - -type serviceOptions struct { - notifier *syncnotifier.Notifier -} - -// ServiceOption configures optional SyncService behaviour. -type ServiceOption func(*serviceOptions) - -// WithNotifier enables event publication for store writes. -func WithNotifier(notifier *syncnotifier.Notifier) ServiceOption { - return func(o *serviceOptions) { - o.notifier = notifier - } -} - -type publishFn[H header.Header[H]] func([]H) - -type instrumentedStore[H header.Header[H]] struct { - header.Store[H] - publish publishFn[H] -} - -func newInstrumentedStore[H header.Header[H]]( - delegate header.Store[H], - publish publishFn[H], -) header.Store[H] { - if publish == nil { - return delegate - } - return &instrumentedStore[H]{ - Store: delegate, - publish: publish, - } -} - -func (s *instrumentedStore[H]) Append(ctx context.Context, headers ...H) error { - if err := s.Store.Append(ctx, headers...); err != nil { - return err - } - - if len(headers) > 0 { - s.publish(headers) - } - - return nil -} - // TODO: when we add pruning we can remove this const ninetyNineYears = 99 * 365 * 24 * time.Hour @@ -110,13 +52,9 @@ type SyncService[H header.Header[H]] struct { sub *goheaderp2p.Subscriber[H] p2pServer *goheaderp2p.ExchangeServer[H] store *goheaderstore.Store[H] - storeView header.Store[H] syncer *goheadersync.Syncer[H] syncerStatus *SyncerStatus topicSubscription header.Subscription[H] - - notifier *syncnotifier.Notifier - eventType syncnotifier.EventType } // DataSyncService is the P2P Sync Service for blocks. @@ -132,9 +70,8 @@ func NewDataSyncService( genesis genesis.Genesis, p2p *p2p.Client, logger zerolog.Logger, - opts ...ServiceOption, ) (*DataSyncService, error) { - return newSyncService[*types.Data](store, dataSync, conf, genesis, p2p, logger, opts...) + return newSyncService[*types.Data](store, dataSync, conf, genesis, p2p, logger) } // NewHeaderSyncService returns a new HeaderSyncService. @@ -144,9 +81,8 @@ func NewHeaderSyncService( genesis genesis.Genesis, p2p *p2p.Client, logger zerolog.Logger, - opts ...ServiceOption, ) (*HeaderSyncService, error) { - return newSyncService[*types.SignedHeader](store, headerSync, conf, genesis, p2p, logger, opts...) + return newSyncService[*types.SignedHeader](store, headerSync, conf, genesis, p2p, logger) } func newSyncService[H header.Header[H]]( @@ -156,19 +92,11 @@ func newSyncService[H header.Header[H]]( genesis genesis.Genesis, p2p *p2p.Client, logger zerolog.Logger, - opts ...ServiceOption, ) (*SyncService[H], error) { if p2p == nil { return nil, errors.New("p2p client cannot be nil") } - options := serviceOptions{} - for _, opt := range opts { - if opt != nil { - opt(&options) - } - } - ss, err := goheaderstore.NewStore[H]( store, goheaderstore.WithStorePrefix(string(syncType)), @@ -178,8 +106,6 @@ func newSyncService[H header.Header[H]]( return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err) } - eventType := eventTypeForSync(syncType) - svc := &SyncService[H]{ conf: conf, genesis: genesis, @@ -188,16 +114,6 @@ func newSyncService[H header.Header[H]]( syncType: syncType, logger: logger, syncerStatus: new(SyncerStatus), - notifier: options.notifier, - eventType: eventType, - } - - if options.notifier != nil { - svc.storeView = newInstrumentedStore[H](ss, func(headers []H) { - svc.publish(headers, syncnotifier.SourceP2P) - }) - } else { - svc.storeView = ss } return svc, nil @@ -205,7 +121,7 @@ func newSyncService[H header.Header[H]]( // Store returns the store of the SyncService func (syncService *SyncService[H]) Store() header.Store[H] { - return syncService.storeView + return syncService.store } // WriteToStoreAndBroadcast initializes store if needed and broadcasts provided header or block. @@ -265,7 +181,7 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error { // create syncer, must be before initFromP2PWithRetry which calls startSyncer. if syncService.syncer, err = newSyncer( syncService.ex, - syncService.storeView, + syncService.store, syncService.sub, []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, ); err != nil { @@ -319,7 +235,6 @@ func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) err return err } - syncService.publish([]H{initial}, syncnotifier.SourceLocal) } return nil @@ -470,27 +385,6 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error { return err } -// Notifier exposes the event notifier instance, if configured. -func (syncService *SyncService[H]) Notifier() *syncnotifier.Notifier { - return syncService.notifier -} - -func (syncService *SyncService[H]) publish(headers []H, source syncnotifier.EventSource) { - if syncService.notifier == nil { - return - } - - for _, h := range headers { - syncService.notifier.Publish(syncnotifier.Event{ - Type: syncService.eventType, - Height: h.Height(), - Hash: h.Hash().String(), - Source: source, - Timestamp: time.Now(), - }) - } -} - // newP2PServer constructs a new ExchangeServer using the given Network as a protocolID suffix. func newP2PServer[H header.Header[H]]( host host.Host,