diff --git a/app/app.go b/app/app.go index 7e3dee8ee..b2a67be39 100644 --- a/app/app.go +++ b/app/app.go @@ -480,16 +480,22 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, valCache := eth2wrap.NewValidatorCache(eth2Cl, eth2Pubkeys) eth2Cl.SetValidatorCache(valCache.GetByHead) - firstValCacheRefresh := true + firstCacheRefresh := true refreshedBySlot := true + // Setup duties cache, refreshing it every epoch. + dutiesCache := eth2wrap.NewDutiesCache(eth2Cl, []eth2p0.ValidatorIndex{}) + eth2Cl.SetDutiesCache(dutiesCache.ProposerDutiesCache, dutiesCache.AttesterDutiesCache, dutiesCache.SyncCommDutiesCache) + + sseListener.SubscribeChainReorgEvent(dutiesCache.InvalidateCache) + var fvcrLock sync.RWMutex shouldUpdateCache := func(slot core.Slot, lock *sync.RWMutex) bool { lock.RLock() defer lock.RUnlock() - if !slot.FirstInEpoch() && !firstValCacheRefresh && refreshedBySlot { + if !slot.FirstInEpoch() && !firstCacheRefresh && refreshedBySlot { return false } @@ -504,7 +510,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, fvcrLock.Lock() defer fvcrLock.Unlock() - ctx = log.WithCtx(ctx, z.Bool("first_refresh", firstValCacheRefresh)) + ctx = log.WithCtx(ctx, z.Bool("first_refresh", firstCacheRefresh)) log.Info(ctx, "Refreshing validator cache") @@ -518,14 +524,21 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, valCache.Trim() - _, _, refresh, err := valCache.GetBySlot(ctx, slotToFetch) + activeValidators, _, refresh, err := valCache.GetBySlot(ctx, slotToFetch) if err != nil { log.Error(ctx, "Failed to refresh validator cache", err) return err } + vIdxs := []eth2p0.ValidatorIndex{} + for idx := range activeValidators { + vIdxs = append(vIdxs, idx) + } + + dutiesCache.UpdateCacheIndices(ctx, vIdxs) + refreshedBySlot = refresh - firstValCacheRefresh = false + firstCacheRefresh = false return nil }) diff --git a/app/eth2wrap/cache.go b/app/eth2wrap/cache.go new file mode 100644 index 000000000..1c44634f7 --- /dev/null +++ b/app/eth2wrap/cache.go @@ -0,0 +1,426 @@ +// Copyright © 2022-2026 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package eth2wrap + +import ( + "context" + "maps" + "slices" + "strconv" + "sync" + + eth2api "github.com/attestantio/go-eth2-client/api" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + + "github.com/obolnetwork/charon/app/errors" +) + +const dutiesCacheTrimThreshold = 6 + +// ActiveValidators is a map of active validator indices to pubkeys. +type ActiveValidators map[eth2p0.ValidatorIndex]eth2p0.BLSPubKey + +// CompleteValidators represents the complete response of the beacon node validators endpoint. +type CompleteValidators map[eth2p0.ValidatorIndex]*eth2v1.Validator + +// Pubkeys returns a list of active validator pubkeys. +func (m ActiveValidators) Pubkeys() []eth2p0.BLSPubKey { + var pubkeys []eth2p0.BLSPubKey + for _, pubkey := range m { + pubkeys = append(pubkeys, pubkey) + } + + return pubkeys +} + +// Indices returns a list of active validator indices. +func (m ActiveValidators) Indices() []eth2p0.ValidatorIndex { + var indices []eth2p0.ValidatorIndex + for index := range m { + indices = append(indices, index) + } + + return indices +} + +// CachedValidatorsProvider is the interface for providing current epoch's cached active validator +// identity information. +type CachedValidatorsProvider interface { + ActiveValidators(context.Context) (ActiveValidators, error) + CompleteValidators(context.Context) (CompleteValidators, error) +} + +// NewValidatorCache creates a new validator cache. +func NewValidatorCache(eth2Cl Client, pubkeys []eth2p0.BLSPubKey) *ValidatorCache { + return &ValidatorCache{ + eth2Cl: eth2Cl, + pubkeys: pubkeys, + } +} + +// ValidatorCache caches active validators. +type ValidatorCache struct { + eth2Cl Client + pubkeys []eth2p0.BLSPubKey + + mu sync.RWMutex + active ActiveValidators + complete CompleteValidators +} + +// Trim trims the cache. +// This should be called on epoch boundary. +func (c *ValidatorCache) Trim() { + c.mu.Lock() + defer c.mu.Unlock() + + c.active = nil + c.complete = nil +} + +// activeCached returns the cached active validators and true if they are available. +func (c *ValidatorCache) activeCached() (ActiveValidators, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.active, c.active != nil +} + +// cached returns the cached complete validators and true if they are available. +func (c *ValidatorCache) cached() (CompleteValidators, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.complete, c.complete != nil +} + +// GetByHead returns the cached active validators, cached complete Validators response, or fetches them if not available populating the cache. +func (c *ValidatorCache) GetByHead(ctx context.Context) (ActiveValidators, CompleteValidators, error) { + completeCached, completeOk := c.cached() + activeCached, activeOk := c.activeCached() + + if completeOk && activeOk { + return activeCached, completeCached, nil + } + + // This code is only ever invoked by scheduler's slot ticking method. + // It's fine locking this way. + c.mu.Lock() + defer c.mu.Unlock() + + opts := ð2api.ValidatorsOpts{ + State: "head", + PubKeys: c.pubkeys, + } + + eth2Resp, err := c.eth2Cl.Validators(ctx, opts) + if err != nil { + return nil, nil, err + } + + vals := eth2Resp.Data + + resp := make(ActiveValidators) + + for _, val := range vals { + if val == nil || val.Validator == nil { + return nil, nil, errors.New("validator data is nil") + } + + if !val.Status.IsActive() { + continue + } + + resp[val.Index] = val.Validator.PublicKey + } + + c.active = resp + c.complete = eth2Resp.Data + + return resp, eth2Resp.Data, nil +} + +// GetBySlot fetches active and complete validator by slot populating the cache. +// If it fails to fetch by slot, it falls back to head state and retries to fetch by slot next slot. +func (c *ValidatorCache) GetBySlot(ctx context.Context, slot uint64) (ActiveValidators, CompleteValidators, bool, error) { + c.mu.Lock() + defer c.mu.Unlock() + + refreshedBySlot := true + + opts := ð2api.ValidatorsOpts{ + State: strconv.FormatUint(slot, 10), + PubKeys: c.pubkeys, + } + + eth2Resp, err := c.eth2Cl.Validators(ctx, opts) + if err != nil { + // Failed to fetch by slot, fall back to head state + refreshedBySlot = false + opts.State = "head" + + eth2Resp, err = c.eth2Cl.Validators(ctx, opts) + if err != nil { + return nil, nil, refreshedBySlot, err + } + } + + complete := eth2Resp.Data + + active := make(ActiveValidators) + + for _, val := range complete { + if val == nil || val.Validator == nil { + return nil, nil, refreshedBySlot, errors.New("validator data is nil") + } + + if !val.Status.IsActive() { + continue + } + + active[val.Index] = val.Validator.PublicKey + } + + c.active = active + c.complete = complete + + return active, complete, refreshedBySlot, nil +} + +// ProposerDuties is a map of proposer duties per epoch. +type ProposerDuties map[eth2p0.Epoch][]*eth2v1.ProposerDuty + +// AttesterDuties is a map of attester duties per epoch. +type AttesterDuties map[eth2p0.Epoch][]*eth2v1.AttesterDuty + +// SyncDuties is a map of sync committee duties per epoch. +type SyncDuties map[eth2p0.Epoch][]*eth2v1.SyncCommitteeDuty + +// CachedDutiesProvider is the interface for providing current epoch's duties. +type CachedDutiesProvider interface { + UpdateCacheIndices(context.Context, []eth2p0.ValidatorIndex) + + ProposerDutiesCache(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) + AttesterDutiesCache(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) + SyncCommDutiesCache(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) +} + +// NewDutiesCache creates a new validator cache. +func NewDutiesCache(eth2Cl Client, validatorIndices []eth2p0.ValidatorIndex) *DutiesCache { + return &DutiesCache{ + eth2Cl: eth2Cl, + validatorIndices: validatorIndices, + + proposerDuties: make(ProposerDuties), + attesterDuties: make(AttesterDuties), + syncDuties: make(SyncDuties), + } +} + +// DutiesCache caches active duties. +type DutiesCache struct { + eth2Cl Client + validatorIndices []eth2p0.ValidatorIndex + + mu sync.RWMutex + proposerDuties ProposerDuties + attesterDuties AttesterDuties + syncDuties SyncDuties +} + +// Trim trims the cache of 6 epochs older than the current. +// This should be called on epoch boundary. +func (c *DutiesCache) Trim(epoch eth2p0.Epoch) { + c.mu.Lock() + defer c.mu.Unlock() + + if epoch < dutiesCacheTrimThreshold { + return + } + + proposerDutiesEpochs := slices.Collect(maps.Keys(c.proposerDuties)) + for _, e := range proposerDutiesEpochs { + if e < epoch-dutiesCacheTrimThreshold { + delete(c.proposerDuties, e) + } + } + + attesterDutiesEpochs := slices.Collect(maps.Keys(c.attesterDuties)) + for _, e := range attesterDutiesEpochs { + if e < epoch-dutiesCacheTrimThreshold { + delete(c.attesterDuties, e) + } + } + + syncDutiesEpochs := slices.Collect(maps.Keys(c.syncDuties)) + for _, e := range syncDutiesEpochs { + if e < epoch-dutiesCacheTrimThreshold { + delete(c.syncDuties, e) + } + } +} + +// UpdateCacheIndices updates the validator indices to be queried. +func (c *DutiesCache) UpdateCacheIndices(_ context.Context, indices []eth2p0.ValidatorIndex) { + c.mu.Lock() + defer c.mu.Unlock() + + c.validatorIndices = indices +} + +// InvalidateCache handles chain reorg, invalidating all cached duties. +func (c *DutiesCache) InvalidateCache(_ context.Context, epoch eth2p0.Epoch) { + c.mu.Lock() + defer c.mu.Unlock() + + for e := range c.proposerDuties { + if e >= epoch { + delete(c.proposerDuties, e) + } + } + + for e := range c.attesterDuties { + if e >= epoch { + delete(c.attesterDuties, e) + } + } + + for e := range c.syncDuties { + if e >= epoch { + delete(c.syncDuties, e) + } + } +} + +// ProposerDutiesCache returns the cached proposer duties, or fetches them if not available populating the cache. +func (c *DutiesCache) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + duties, ok := c.cachedProposerDuties(epoch) + + if ok { + return duties, nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + opts := ð2api.ProposerDutiesOpts{ + Epoch: epoch, + Indices: vidxs, + } + + eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts) + if err != nil { + return nil, err + } + + proposerDutiesCurrEpoch := []*eth2v1.ProposerDuty{} + + for _, duty := range eth2Resp.Data { + if duty == nil { + return nil, errors.New("proposer duty data is nil") + } + + proposerDutiesCurrEpoch = append(proposerDutiesCurrEpoch, duty) + } + + proposerDuties := c.proposerDuties + proposerDuties[epoch] = proposerDutiesCurrEpoch + c.proposerDuties = proposerDuties + + return proposerDutiesCurrEpoch, nil +} + +// AttesterDutiesCache returns the cached attester duties, or fetches them if not available populating the cache. +func (c *DutiesCache) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidx []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + duties, ok := c.cachedAttesterDuties(epoch) + + if ok { + return duties, nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + opts := ð2api.AttesterDutiesOpts{ + Epoch: epoch, + Indices: vidx, + } + + eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts) + if err != nil { + return nil, err + } + + c.attesterDuties[epoch] = eth2Resp.Data + + return eth2Resp.Data, nil +} + +// SyncCommDutiesCache returns the cached sync duties, or fetches them if not available populating the cache. +func (c *DutiesCache) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidx []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + duties, ok := c.cachedSyncDuties(epoch) + + if ok { + return duties, nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + opts := ð2api.SyncCommitteeDutiesOpts{ + Epoch: epoch, + Indices: vidx, + } + + eth2Resp, err := c.eth2Cl.SyncCommitteeDuties(ctx, opts) + if err != nil { + return nil, err + } + + syncDutiesCurrEpoch := []*eth2v1.SyncCommitteeDuty{} + + for _, duty := range eth2Resp.Data { + if duty == nil { + return nil, errors.New("sync duty data is nil") + } + + syncDutiesCurrEpoch = append(syncDutiesCurrEpoch, duty) + } + + syncDuties := c.syncDuties + syncDuties[epoch] = syncDutiesCurrEpoch + c.syncDuties = syncDuties + + return syncDutiesCurrEpoch, nil +} + +// cachedProposerDuties returns the cached proposer duties and true if they are available. +func (c *DutiesCache) cachedProposerDuties(epoch eth2p0.Epoch) ([]*eth2v1.ProposerDuty, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + duties, ok := c.proposerDuties[epoch] + + return duties, ok +} + +// cachedAttesterDuties returns the cached attester duties and true if they are available. +func (c *DutiesCache) cachedAttesterDuties(epoch eth2p0.Epoch) ([]*eth2v1.AttesterDuty, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + duties, ok := c.attesterDuties[epoch] + + return duties, ok +} + +// cachedSyncDuties returns the cached sync duties and true if they are available. +func (c *DutiesCache) cachedSyncDuties(epoch eth2p0.Epoch) ([]*eth2v1.SyncCommitteeDuty, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + duties, ok := c.syncDuties[epoch] + + return duties, ok +} diff --git a/app/eth2wrap/valcache_test.go b/app/eth2wrap/cache_test.go similarity index 79% rename from app/eth2wrap/valcache_test.go rename to app/eth2wrap/cache_test.go index 7c04f6073..30750abe2 100644 --- a/app/eth2wrap/valcache_test.go +++ b/app/eth2wrap/cache_test.go @@ -4,7 +4,9 @@ package eth2wrap_test import ( "context" + "maps" "math/rand" + "slices" "testing" eth2api "github.com/attestantio/go-eth2-client/api" @@ -237,3 +239,57 @@ func TestGetBySlot(t *testing.T) { require.False(t, refreshedBySlot) }) } + +func TestDutiesCache(t *testing.T) { + NValidators := 64 + // Create a set of validators + valSet := testutil.RandomValidatorSet(t, NValidators) + + proposerDutiesCalled := false + + // Create a mock client. + eth2Cl, err := beaconmock.New(t.Context(), beaconmock.WithValidatorSet(valSet)) + require.NoError(t, err) + + eth2Cl.ProposerDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + // use 3 random validators from the set + vidxs := slices.Collect(maps.Keys(valSet)) + resp := []*eth2v1.ProposerDuty{} + + for it := range 3 { + validator := *valSet[vidxs[rand.Intn(NValidators)]] + resp = append(resp, ð2v1.ProposerDuty{ + PubKey: validator.Validator.PublicKey, + ValidatorIndex: validator.Index, + Slot: eth2p0.Slot(it), + }) + } + + proposerDutiesCalled = true + + return resp, nil + } + + // Create a cache. + valCache := eth2wrap.NewDutiesCache(eth2Cl, slices.Collect(maps.Keys(valSet))) + ctx := t.Context() + + // First call should populate the cache + _, err = valCache.ProposerDutiesCache(ctx, 0, slices.Collect(maps.Keys(valSet))) + require.NoError(t, err) + require.True(t, proposerDutiesCalled) + + // Second call should use the cache + proposerDutiesCalled = false + _, err = valCache.ProposerDutiesCache(ctx, 0, slices.Collect(maps.Keys(valSet))) + require.NoError(t, err) + require.False(t, proposerDutiesCalled) + + // Trim cache + valCache.Trim(7) + + // Third call should populate the cache + _, err = valCache.ProposerDutiesCache(ctx, 0, slices.Collect(maps.Keys(valSet))) + require.NoError(t, err) + require.True(t, proposerDutiesCalled) +} diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index a72176f9c..6e5719405 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -12,9 +12,11 @@ import ( eth2client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/altair" "github.com/attestantio/go-eth2-client/spec/phase0" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" ) // Client defines all go-eth2-client interfaces used in charon. @@ -23,13 +25,18 @@ type Client interface { CachedValidatorsProvider SetValidatorCache(func(context.Context) (ActiveValidators, CompleteValidators, error)) + CachedDutiesProvider + SetDutiesCache( + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error), + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error), + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error), + ) SetForkVersion(forkVersion [4]byte) ClientForAddress(addr string) Client // Address returns the address of the beacon node. Address() string - // Headers returns custom headers to include in requests to the beacon node. Headers() map[string]string eth2client.AggregateAttestationProvider diff --git a/app/eth2wrap/genwrap/genwrap.go b/app/eth2wrap/genwrap/genwrap.go index 25a18af52..20910857f 100644 --- a/app/eth2wrap/genwrap/genwrap.go +++ b/app/eth2wrap/genwrap/genwrap.go @@ -51,8 +51,13 @@ type Client interface { CachedValidatorsProvider SetValidatorCache(func(context.Context) (ActiveValidators, CompleteValidators, error)) + CachedDutiesProvider + SetDutiesCache(func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error), func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error), func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error)) - SetForkVersion(forkVersion [4]byte) + SetForkVersion(forkVersion [4]byte) + + Address() string + Headers() map[string]string {{range .Providers}} eth2client.{{.}} {{end -}} diff --git a/app/eth2wrap/httpwrap.go b/app/eth2wrap/httpwrap.go index 535a8872f..9c75e3439 100644 --- a/app/eth2wrap/httpwrap.go +++ b/app/eth2wrap/httpwrap.go @@ -48,11 +48,15 @@ func newHTTPAdapter(ethSvc *eth2http.Service, address string, headers map[string type httpAdapter struct { *eth2http.Service - address string - headers map[string]string - timeout time.Duration - valCacheMu sync.RWMutex - valCache func(context.Context) (ActiveValidators, CompleteValidators, error) + address string + headers map[string]string + timeout time.Duration + cacheMu sync.RWMutex + valCache func(context.Context) (ActiveValidators, CompleteValidators, error) + proposerDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.ProposerDuty, error) + attesterDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.AttesterDuty, error) + syncCommDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.SyncCommitteeDuty, error) + forkVersion [4]byte } @@ -61,14 +65,14 @@ func (h *httpAdapter) SetForkVersion(forkVersion [4]byte) { } func (h *httpAdapter) SetValidatorCache(valCache func(context.Context) (ActiveValidators, CompleteValidators, error)) { - h.valCacheMu.Lock() + h.cacheMu.Lock() h.valCache = valCache - h.valCacheMu.Unlock() + h.cacheMu.Unlock() } func (h *httpAdapter) ActiveValidators(ctx context.Context) (ActiveValidators, error) { - h.valCacheMu.RLock() - defer h.valCacheMu.RUnlock() + h.cacheMu.RLock() + defer h.cacheMu.RUnlock() if h.valCache == nil { return nil, errors.New("no active validator cache") @@ -80,8 +84,8 @@ func (h *httpAdapter) ActiveValidators(ctx context.Context) (ActiveValidators, e } func (h *httpAdapter) CompleteValidators(ctx context.Context) (CompleteValidators, error) { - h.valCacheMu.RLock() - defer h.valCacheMu.RUnlock() + h.cacheMu.RLock() + defer h.cacheMu.RUnlock() if h.valCache == nil { return nil, errors.New("no active validator cache") @@ -92,6 +96,55 @@ func (h *httpAdapter) CompleteValidators(ctx context.Context) (CompleteValidator return complete, err } +func (h *httpAdapter) SetDutiesCache( + proposerDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.ProposerDuty, error), + attesterDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.AttesterDuty, error), + syncCommDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*apiv1.SyncCommitteeDuty, error), +) { + h.cacheMu.Lock() + h.proposerDutiesCache = proposerDutiesCache + h.attesterDutiesCache = attesterDutiesCache + h.syncCommDutiesCache = syncCommDutiesCache + h.cacheMu.Unlock() +} + +func (h *httpAdapter) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*apiv1.ProposerDuty, error) { + h.cacheMu.RLock() + defer h.cacheMu.RUnlock() + + if h.proposerDutiesCache == nil { + return nil, errors.New("no active proposer duties cache") + } + + return h.proposerDutiesCache(ctx, epoch, vidxs) +} + +func (h *httpAdapter) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*apiv1.AttesterDuty, error) { + h.cacheMu.RLock() + defer h.cacheMu.RUnlock() + + if h.attesterDutiesCache == nil { + return nil, errors.New("no active attester duties cache") + } + + return h.attesterDutiesCache(ctx, epoch, vidxs) +} + +func (h *httpAdapter) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*apiv1.SyncCommitteeDuty, error) { + h.cacheMu.RLock() + defer h.cacheMu.RUnlock() + + if h.syncCommDutiesCache == nil { + return nil, errors.New("no active sync duties cache") + } + + return h.syncCommDutiesCache(ctx, epoch, vidxs) +} + +func (*httpAdapter) UpdateCacheIndices(context.Context, []eth2p0.ValidatorIndex) { + // No-op +} + // Validators returns the validators as requested in opts. // If the amount of validators requested is greater than 200, exponentially increase the timeout: on crowded testnets // this HTTP call takes a long time. diff --git a/app/eth2wrap/lazy.go b/app/eth2wrap/lazy.go index 65c82a3f9..da6d5c929 100644 --- a/app/eth2wrap/lazy.go +++ b/app/eth2wrap/lazy.go @@ -6,6 +6,9 @@ import ( "context" "sync" "time" + + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" ) //go:generate mockery --name=Client --output=mocks --outpkg=mocks --case=underscore @@ -32,9 +35,12 @@ type lazy struct { providerMu sync.Mutex provider func(context.Context) (Client, error) - clientMu sync.RWMutex - client Client - valCache func(context.Context) (ActiveValidators, CompleteValidators, error) + clientMu sync.RWMutex + client Client + valCache func(context.Context) (ActiveValidators, CompleteValidators, error) + proposerDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) + attesterDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) + syncCommDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) } // getClient returns the client and true if it is available. @@ -184,3 +190,55 @@ func (l *lazy) SetValidatorCache(valCache func(context.Context) (ActiveValidator cl.SetValidatorCache(valCache) } } + +func (l *lazy) SetDutiesCache( + proposerDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error), + attesterDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error), + syncCommDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error), +) { + l.clientMu.Lock() + l.proposerDutiesCache = proposerDutiesCache + l.attesterDutiesCache = attesterDutiesCache + l.syncCommDutiesCache = syncCommDutiesCache + l.clientMu.Unlock() + + if cl, ok := l.getClient(); ok { + cl.SetDutiesCache(l.proposerDutiesCache, l.attesterDutiesCache, l.syncCommDutiesCache) + } +} + +func (l *lazy) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + cl, err := l.getOrCreateClient(ctx) + if err != nil { + return nil, err + } + + return cl.ProposerDutiesCache(ctx, epoch, vidxs) +} + +func (l *lazy) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + cl, err := l.getOrCreateClient(ctx) + if err != nil { + return nil, err + } + + return cl.AttesterDutiesCache(ctx, epoch, vidxs) +} + +func (l *lazy) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + cl, err := l.getOrCreateClient(ctx) + if err != nil { + return nil, err + } + + return cl.SyncCommDutiesCache(ctx, epoch, vidxs) +} + +func (l *lazy) UpdateCacheIndices(ctx context.Context, idxs []eth2p0.ValidatorIndex) { + cl, err := l.getOrCreateClient(ctx) + if err != nil { + return + } + + cl.UpdateCacheIndices(ctx, idxs) +} diff --git a/app/eth2wrap/lazy_test.go b/app/eth2wrap/lazy_test.go index 536ca8c98..1bda89ef0 100644 --- a/app/eth2wrap/lazy_test.go +++ b/app/eth2wrap/lazy_test.go @@ -7,6 +7,8 @@ import ( "net/http" "testing" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -98,3 +100,74 @@ func TestLazy_ClientForAddress(t *testing.T) { result := l.ClientForAddress("http://test:5051") require.NotNil(t, result) } + +func TestLazy_SetDutiesCache(t *testing.T) { + proposerDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return nil, nil + } + attesterDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return nil, nil + } + syncDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return nil, nil + } + + client := mocks.NewClient(t) + client.On("SetDutiesCache", mock.Anything, mock.Anything, mock.Anything).Once() + + l := eth2wrap.NewLazyForT(client) + l.SetDutiesCache(proposerDutiesCache, attesterDutiesCache, syncDutiesCache) +} + +func TestLazy_ProposerDutiesCache(t *testing.T) { + ctx := context.Background() + proposerDuties := make([]*eth2v1.ProposerDuty, 0) + + client := mocks.NewClient(t) + client.On("ProposerDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(proposerDuties, nil).Once() + + l := eth2wrap.NewLazyForT(client) + + proposerDuties2, err := l.ProposerDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, proposerDuties, proposerDuties2) +} + +func TestLazy_AttesterDutiesCache(t *testing.T) { + ctx := context.Background() + attesterDuties := make([]*eth2v1.AttesterDuty, 0) + + client := mocks.NewClient(t) + client.On("AttesterDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(attesterDuties, nil).Once() + + l := eth2wrap.NewLazyForT(client) + + attesterDuties2, err := l.AttesterDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, attesterDuties, attesterDuties2) +} + +func TestLazy_SyncDutiesCache(t *testing.T) { + ctx := context.Background() + syncDuties := make([]*eth2v1.SyncCommitteeDuty, 0) + + client := mocks.NewClient(t) + client.On("SyncCommDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(syncDuties, nil).Once() + + l := eth2wrap.NewLazyForT(client) + + syncDuties2, err := l.SyncCommDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, syncDuties, syncDuties2) +} + +func TestLazy_UpdateCacheIndices(t *testing.T) { + ctx := context.Background() + + client := mocks.NewClient(t) + client.On("UpdateCacheIndices", ctx, []eth2p0.ValidatorIndex{}).Return().Once() + + l := eth2wrap.NewLazyForT(client) + + l.UpdateCacheIndices(ctx, []eth2p0.ValidatorIndex{}) +} diff --git a/app/eth2wrap/mocks/client.go b/app/eth2wrap/mocks/client.go index a4d218724..b9fd61986 100644 --- a/app/eth2wrap/mocks/client.go +++ b/app/eth2wrap/mocks/client.go @@ -1,6 +1,6 @@ // Copyright © 2022-2026 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 -// Code generated by mockery v2.53.3. DO NOT EDIT. +// Code generated by mockery v2.53.5. DO NOT EDIT. package mocks @@ -9,10 +9,11 @@ import ( altair "github.com/attestantio/go-eth2-client/spec/altair" context "context" - http "net/http" eth2wrap "github.com/obolnetwork/charon/app/eth2wrap" + http "net/http" + mock "github.com/stretchr/testify/mock" phase0 "github.com/attestantio/go-eth2-client/spec/phase0" @@ -97,26 +98,6 @@ func (_m *Client) Address() string { return r0 } -// Headers provides a mock function with given fields: -func (_m *Client) Headers() map[string]string { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Headers") - } - - var r0 map[string]string - if rf, ok := ret.Get(0).(func() map[string]string); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]string) - } - } - - return r0 -} - // AggregateAttestation provides a mock function with given fields: ctx, opts func (_m *Client) AggregateAttestation(ctx context.Context, opts *api.AggregateAttestationOpts) (*api.Response[*spec.VersionedAttestation], error) { ret := _m.Called(ctx, opts) @@ -207,6 +188,36 @@ func (_m *Client) AttesterDuties(ctx context.Context, opts *api.AttesterDutiesOp return r0, r1 } +// AttesterDutiesCache provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Client) AttesterDutiesCache(_a0 context.Context, _a1 phase0.Epoch, _a2 []phase0.ValidatorIndex) ([]*v1.AttesterDuty, error) { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for AttesterDutiesCache") + } + + var r0 []*v1.AttesterDuty + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.AttesterDuty, error)); ok { + return rf(_a0, _a1, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) []*v1.AttesterDuty); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.AttesterDuty) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // BeaconBlockAttestations provides a mock function with given fields: ctx, opts func (_m *Client) BeaconBlockAttestations(ctx context.Context, opts *api.BeaconBlockAttestationsOpts) (*api.Response[[]*spec.VersionedAttestation], error) { ret := _m.Called(ctx, opts) @@ -327,9 +338,9 @@ func (_m *Client) BeaconCommittees(ctx context.Context, opts *api.BeaconCommitte return r0, r1 } -// CompleteValidators provides a mock function with given fields: ctx -func (_m *Client) CompleteValidators(ctx context.Context) (eth2wrap.CompleteValidators, error) { - ret := _m.Called(ctx) +// CompleteValidators provides a mock function with given fields: _a0 +func (_m *Client) CompleteValidators(_a0 context.Context) (eth2wrap.CompleteValidators, error) { + ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for CompleteValidators") @@ -338,10 +349,10 @@ func (_m *Client) CompleteValidators(ctx context.Context) (eth2wrap.CompleteVali var r0 eth2wrap.CompleteValidators var r1 error if rf, ok := ret.Get(0).(func(context.Context) (eth2wrap.CompleteValidators, error)); ok { - return rf(ctx) + return rf(_a0) } if rf, ok := ret.Get(0).(func(context.Context) eth2wrap.CompleteValidators); ok { - r0 = rf(ctx) + r0 = rf(_a0) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(eth2wrap.CompleteValidators) @@ -349,7 +360,7 @@ func (_m *Client) CompleteValidators(ctx context.Context) (eth2wrap.CompleteVali } if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) + r1 = rf(_a0) } else { r1 = ret.Error(1) } @@ -537,6 +548,26 @@ func (_m *Client) GenesisDomain(ctx context.Context, domainType phase0.DomainTyp return r0, r1 } +// Headers provides a mock function with no fields +func (_m *Client) Headers() map[string]string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Headers") + } + + var r0 map[string]string + if rf, ok := ret.Get(0).(func() map[string]string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]string) + } + } + + return r0 +} + // IsActive provides a mock function with no fields func (_m *Client) IsActive() bool { ret := _m.Called() @@ -591,36 +622,6 @@ func (_m *Client) Name() string { return r0 } -// Proxy provides a mock function with given fields: ctx, req -func (_m *Client) Proxy(ctx context.Context, req *http.Request) (*http.Response, error) { - ret := _m.Called(ctx, req) - - if len(ret) == 0 { - panic("no return value specified for Proxy") - } - - var r0 *http.Response - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *http.Request) (*http.Response, error)); ok { - return rf(ctx, req) - } - if rf, ok := ret.Get(0).(func(context.Context, *http.Request) *http.Response); ok { - r0 = rf(ctx, req) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*http.Response) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *http.Request) error); ok { - r1 = rf(ctx, req) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // NodePeerCount provides a mock function with given fields: ctx, opts func (_m *Client) NodePeerCount(ctx context.Context, opts *api.NodePeerCountOpts) (*api.Response[*v1.PeerCount], error) { ret := _m.Called(ctx, opts) @@ -771,6 +772,75 @@ func (_m *Client) ProposerDuties(ctx context.Context, opts *api.ProposerDutiesOp return r0, r1 } +// ProposerDutiesCache provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Client) ProposerDutiesCache(_a0 context.Context, _a1 phase0.Epoch, _a2 []phase0.ValidatorIndex) ([]*v1.ProposerDuty, error) { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for ProposerDutiesCache") + } + + var r0 []*v1.ProposerDuty + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.ProposerDuty, error)); ok { + return rf(_a0, _a1, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) []*v1.ProposerDuty); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.ProposerDuty) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Proxy provides a mock function with given fields: ctx, req +func (_m *Client) Proxy(ctx context.Context, req *http.Request) (*http.Response, error) { + ret := _m.Called(ctx, req) + + if len(ret) == 0 { + panic("no return value specified for Proxy") + } + + var r0 *http.Response + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *http.Request) (*http.Response, error)); ok { + return rf(ctx, req) + } + if rf, ok := ret.Get(0).(func(context.Context, *http.Request) *http.Response); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*http.Response) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *http.Request) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetDutiesCache provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Client) SetDutiesCache( + _a0 func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.ProposerDuty, error), + _a1 func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.AttesterDuty, error), + _a2 func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.SyncCommitteeDuty, error), +) { + _m.Called(_a0, _a1, _a2) +} + // SetForkVersion provides a mock function with given fields: forkVersion func (_m *Client) SetForkVersion(forkVersion [4]byte) { _m.Called(forkVersion) @@ -1185,6 +1255,41 @@ func (_m *Client) SyncCommitteeSelections(ctx context.Context, opts *api.SyncCom return r0, r1 } +// SyncCommDutiesCache provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Client) SyncCommDutiesCache(_a0 context.Context, _a1 phase0.Epoch, _a2 []phase0.ValidatorIndex) ([]*v1.SyncCommitteeDuty, error) { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for SyncCommDutiesCache") + } + + var r0 []*v1.SyncCommitteeDuty + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) ([]*v1.SyncCommitteeDuty, error)); ok { + return rf(_a0, _a1, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) []*v1.SyncCommitteeDuty); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.SyncCommitteeDuty) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, phase0.Epoch, []phase0.ValidatorIndex) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpdateCacheIndices provides a mock function with given fields: _a0, _a1 +func (_m *Client) UpdateCacheIndices(_a0 context.Context, _a1 []phase0.ValidatorIndex) { + _m.Called(_a0, _a1) +} + // Validators provides a mock function with given fields: ctx, opts func (_m *Client) Validators(ctx context.Context, opts *api.ValidatorsOpts) (*api.Response[map[phase0.ValidatorIndex]*v1.Validator], error) { ret := _m.Called(ctx, opts) diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index 08bd13b21..efe30eeca 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -8,6 +8,9 @@ import ( "io" "net/http" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/obolnetwork/charon/app/errors" ) @@ -168,6 +171,82 @@ func (m multi) CompleteValidators(ctx context.Context) (CompleteValidators, erro return res0, err } +func (m multi) SetDutiesCache( + proposerDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error), + attesterDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error), + syncCommDutiesCache func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error), +) { + for _, cl := range m.clients { + cl.SetDutiesCache(proposerDutiesCache, attesterDutiesCache, syncCommDutiesCache) + } +} + +func (m multi) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + const label = "proposer_duties_by_epoch" + // No latency since this is a cached endpoint. + + defer incRequest(label) + + res0, err := provide(ctx, m.clients, m.fallbacks, + func(ctx context.Context, args provideArgs) ([]*eth2v1.ProposerDuty, error) { + return args.client.ProposerDutiesCache(ctx, epoch, vidxs) + }, + nil, nil, + ) + if err != nil { + incError(label) + err = wrapError(ctx, err, label) + } + + return res0, err +} + +func (m multi) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + const label = "attester_duties_by_epoch" + // No latency since this is a cached endpoint. + + defer incRequest(label) + + res0, err := provide(ctx, m.clients, m.fallbacks, + func(ctx context.Context, args provideArgs) ([]*eth2v1.AttesterDuty, error) { + return args.client.AttesterDutiesCache(ctx, epoch, vidxs) + }, + nil, nil, + ) + if err != nil { + incError(label) + err = wrapError(ctx, err, label) + } + + return res0, err +} + +func (m multi) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + const label = "sync_duties_by_epoch" + // No latency since this is a cached endpoint. + + defer incRequest(label) + + res0, err := provide(ctx, m.clients, m.fallbacks, + func(ctx context.Context, args provideArgs) ([]*eth2v1.SyncCommitteeDuty, error) { + return args.client.SyncCommDutiesCache(ctx, epoch, vidxs) + }, + nil, nil, + ) + if err != nil { + incError(label) + err = wrapError(ctx, err, label) + } + + return res0, err +} + +func (m multi) UpdateCacheIndices(ctx context.Context, idxs []eth2p0.ValidatorIndex) { + for _, cl := range m.clients { + cl.UpdateCacheIndices(ctx, idxs) + } +} + func (m multi) Proxy(ctx context.Context, req *http.Request) (*http.Response, error) { // Duplicate the request body so each backend gets an independent reader // req.Clone(ctx) does NOT clone the body reader diff --git a/app/eth2wrap/multi_test.go b/app/eth2wrap/multi_test.go index 4a68a20af..da0513374 100644 --- a/app/eth2wrap/multi_test.go +++ b/app/eth2wrap/multi_test.go @@ -10,6 +10,8 @@ import ( "strings" "testing" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -90,6 +92,77 @@ func TestMulti_SetValidatorCache(t *testing.T) { m.SetValidatorCache(valCache) } +func TestMulti_SetDutiesCache(t *testing.T) { + proposerDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return nil, nil + } + attesterDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return nil, nil + } + syncDutiesCache := func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return nil, nil + } + + client := mocks.NewClient(t) + client.On("SetDutiesCache", mock.Anything, mock.Anything, mock.Anything).Once() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil) + m.SetDutiesCache(proposerDutiesCache, attesterDutiesCache, syncDutiesCache) +} + +func TestMulti_ProposerDutiesCache(t *testing.T) { + ctx := context.Background() + proposerDuties := make([]*eth2v1.ProposerDuty, 0) + + client := mocks.NewClient(t) + client.On("ProposerDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(proposerDuties, nil).Once() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil) + + proposerDuties2, err := m.ProposerDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, proposerDuties, proposerDuties2) +} + +func TestMulti_AttesterDutiesCache(t *testing.T) { + ctx := context.Background() + attesterDuties := make([]*eth2v1.AttesterDuty, 0) + + client := mocks.NewClient(t) + client.On("AttesterDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(attesterDuties, nil).Once() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil) + + attesterDuties2, err := m.AttesterDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, attesterDuties, attesterDuties2) +} + +func TestMulti_SyncDutiesCache(t *testing.T) { + ctx := context.Background() + syncDuties := make([]*eth2v1.SyncCommitteeDuty, 0) + + client := mocks.NewClient(t) + client.On("SyncCommDutiesCache", ctx, eth2p0.Epoch(0), []eth2p0.ValidatorIndex{}).Return(syncDuties, nil).Once() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil) + + syncDuties2, err := m.SyncCommDutiesCache(ctx, 0, []eth2p0.ValidatorIndex{}) + require.NoError(t, err) + require.Equal(t, syncDuties, syncDuties2) +} + +func TestMulti_UpdateCacheIndices(t *testing.T) { + ctx := context.Background() + + client := mocks.NewClient(t) + client.On("UpdateCacheIndices", ctx, []eth2p0.ValidatorIndex{}).Return().Once() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, nil) + + m.UpdateCacheIndices(ctx, []eth2p0.ValidatorIndex{}) +} + func TestMulti_Proxy(t *testing.T) { client := mocks.NewClient(t) client.On("Proxy", mock.Anything, mock.Anything).Return(nil, nil).Once() diff --git a/app/eth2wrap/synthproposer.go b/app/eth2wrap/synthproposer.go index ccb888418..292273f58 100644 --- a/app/eth2wrap/synthproposer.go +++ b/app/eth2wrap/synthproposer.go @@ -38,6 +38,7 @@ const ( type synthProposerEth2Provider interface { CachedValidatorsProvider + CachedDutiesProvider eth2client.SpecProvider eth2client.ProposerDutiesProvider } @@ -83,6 +84,16 @@ func (h *synthWrapper) ProposerDuties(ctx context.Context, opts *eth2api.Propose return wrapResponse(duties), nil } +// ProposerDutiesCache wraps ProposerDuties. We are not using cache for synthproposer. +func (h *synthWrapper) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + eth2Resp, err := h.ProposerDuties(ctx, ð2api.ProposerDutiesOpts{Epoch: epoch, Indices: vidxs}) + if err != nil { + return nil, err + } + + return eth2Resp.Data, nil +} + func (h *synthWrapper) SubmitProposalPreparations(ctx context.Context, preparations []*eth2v1.ProposalPreparation) error { h.setFeeRecipients(preparations) diff --git a/app/eth2wrap/synthproposer_test.go b/app/eth2wrap/synthproposer_test.go index 98b8f56c5..9defef6cf 100644 --- a/app/eth2wrap/synthproposer_test.go +++ b/app/eth2wrap/synthproposer_test.go @@ -252,10 +252,34 @@ func TestSynthProposer(t *testing.T) { }, }, nil } - cached := bmock.CachedValidatorsFunc + cachedValidators := bmock.CachedValidatorsFunc bmock.CachedValidatorsFunc = func(ctx context.Context) (eth2wrap.ActiveValidators, eth2wrap.CompleteValidators, error) { activeVals++ - return cached(ctx) + return cachedValidators(ctx) + } + bmock.CachedProposerDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + duties, err := bmock.ProposerDuties(ctx, ð2api.ProposerDutiesOpts{Epoch: epoch, Indices: vidxs}) + if err != nil { + return nil, err + } + + return duties.Data, nil + } + bmock.CachedAttesterDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + duties, err := bmock.AttesterDuties(ctx, ð2api.AttesterDutiesOpts{Epoch: epoch, Indices: vidxs}) + if err != nil { + return nil, err + } + + return duties.Data, nil + } + bmock.CachedSyncCommDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + duties, err := bmock.SyncCommitteeDuties(ctx, ð2api.SyncCommitteeDutiesOpts{Epoch: epoch, Indices: vidxs}) + if err != nil { + return nil, err + } + + return duties.Data, nil } bmock.SignedBeaconBlockFunc = func(ctx context.Context, blockID string) (*eth2spec.VersionedSignedBeaconBlock, error) { resp := test.versionedSignedBlock diff --git a/app/eth2wrap/valcache.go b/app/eth2wrap/valcache.go deleted file mode 100644 index 1968d0c17..000000000 --- a/app/eth2wrap/valcache.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright © 2022-2026 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 - -package eth2wrap - -import ( - "context" - "strconv" - "sync" - - eth2api "github.com/attestantio/go-eth2-client/api" - eth2v1 "github.com/attestantio/go-eth2-client/api/v1" - eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" - - "github.com/obolnetwork/charon/app/errors" -) - -// ActiveValidators is a map of active validator indices to pubkeys. -type ActiveValidators map[eth2p0.ValidatorIndex]eth2p0.BLSPubKey - -// CompleteValidators represents the complete response of the beacon node validators endpoint. -type CompleteValidators map[eth2p0.ValidatorIndex]*eth2v1.Validator - -// Pubkeys returns a list of active validator pubkeys. -func (m ActiveValidators) Pubkeys() []eth2p0.BLSPubKey { - var pubkeys []eth2p0.BLSPubKey - for _, pubkey := range m { - pubkeys = append(pubkeys, pubkey) - } - - return pubkeys -} - -// Indices returns a list of active validator indices. -func (m ActiveValidators) Indices() []eth2p0.ValidatorIndex { - var indices []eth2p0.ValidatorIndex - for index := range m { - indices = append(indices, index) - } - - return indices -} - -// CachedValidatorsProvider is the interface for providing current epoch's cached active validator -// identity information. -type CachedValidatorsProvider interface { - ActiveValidators(context.Context) (ActiveValidators, error) - CompleteValidators(ctx context.Context) (CompleteValidators, error) -} - -// NewValidatorCache creates a new validator cache. -func NewValidatorCache(eth2Cl Client, pubkeys []eth2p0.BLSPubKey) *ValidatorCache { - return &ValidatorCache{ - eth2Cl: eth2Cl, - pubkeys: pubkeys, - } -} - -// ValidatorCache caches active validators. -type ValidatorCache struct { - eth2Cl Client - pubkeys []eth2p0.BLSPubKey - - mu sync.RWMutex - active ActiveValidators - complete CompleteValidators -} - -// Trim trims the cache. -// This should be called on epoch boundary. -func (c *ValidatorCache) Trim() { - c.mu.Lock() - defer c.mu.Unlock() - - c.active = nil - c.complete = nil -} - -// activeCached returns the cached active validators and true if they are available. -func (c *ValidatorCache) activeCached() (ActiveValidators, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.active, c.active != nil -} - -// cached returns the cached complete validators and true if they are available. -func (c *ValidatorCache) cached() (CompleteValidators, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.complete, c.complete != nil -} - -// GetByHead returns the cached active validators, cached complete Validators response, or fetches them if not available populating the cache. -func (c *ValidatorCache) GetByHead(ctx context.Context) (ActiveValidators, CompleteValidators, error) { - completeCached, completeOk := c.cached() - activeCached, activeOk := c.activeCached() - - if completeOk && activeOk { - return activeCached, completeCached, nil - } - - // This code is only ever invoked by scheduler's slot ticking method. - // It's fine locking this way. - c.mu.Lock() - defer c.mu.Unlock() - - opts := ð2api.ValidatorsOpts{ - State: "head", - PubKeys: c.pubkeys, - } - - eth2Resp, err := c.eth2Cl.Validators(ctx, opts) - if err != nil { - return nil, nil, err - } - - vals := eth2Resp.Data - - resp := make(ActiveValidators) - - for _, val := range vals { - if val == nil || val.Validator == nil { - return nil, nil, errors.New("validator data is nil") - } - - if !val.Status.IsActive() { - continue - } - - resp[val.Index] = val.Validator.PublicKey - } - - c.active = resp - c.complete = eth2Resp.Data - - return resp, eth2Resp.Data, nil -} - -// GetBySlot fetches active and complete validator by slot populating the cache. -// If it fails to fetch by slot, it falls back to head state and retries to fetch by slot next slot. -func (c *ValidatorCache) GetBySlot(ctx context.Context, slot uint64) (ActiveValidators, CompleteValidators, bool, error) { - c.mu.Lock() - defer c.mu.Unlock() - - refreshedBySlot := true - - opts := ð2api.ValidatorsOpts{ - State: strconv.FormatUint(slot, 10), - PubKeys: c.pubkeys, - } - - eth2Resp, err := c.eth2Cl.Validators(ctx, opts) - if err != nil { - // Failed to fetch by slot, fall back to head state - refreshedBySlot = false - opts.State = "head" - - eth2Resp, err = c.eth2Cl.Validators(ctx, opts) - if err != nil { - return nil, nil, refreshedBySlot, err - } - } - - complete := eth2Resp.Data - - active := make(ActiveValidators) - - for _, val := range complete { - if val == nil || val.Validator == nil { - return nil, nil, refreshedBySlot, errors.New("validator data is nil") - } - - if !val.Status.IsActive() { - continue - } - - active[val.Index] = val.Validator.PublicKey - } - - c.active = active - c.complete = complete - - return active, complete, refreshedBySlot, nil -} diff --git a/app/featureset/config.go b/app/featureset/config.go index 7ca5f8ac8..ae8504bc6 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -123,6 +123,7 @@ func EnableForT(t *testing.T, feature Feature) { t.Cleanup(func() { initMu.Lock() defer initMu.Unlock() + state[feature] = cache }) @@ -141,6 +142,7 @@ func DisableForT(t *testing.T, feature Feature) { t.Cleanup(func() { initMu.Lock() defer initMu.Unlock() + state[feature] = cache }) diff --git a/app/sse/listener.go b/app/sse/listener.go index 4438a4c48..3e50d2e3b 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -20,8 +20,10 @@ import ( "github.com/obolnetwork/charon/eth2util" ) -type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) -type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string) +type ( + ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) + BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string) +) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) @@ -313,6 +315,7 @@ func (p *listener) storeBlockGossipTime(slot uint64, addr string, timestamp time if p.blockGossipTimes[slot] == nil { p.blockGossipTimes[slot] = make(map[string]time.Time) } + p.blockGossipTimes[slot][addr] = timestamp } @@ -340,6 +343,7 @@ func (p *listener) recordBlockProcessingTime(slot uint64, addr string, headTimes // Clean up this entry as it's no longer needed delete(addrMap, addr) + if len(addrMap) == 0 { delete(p.blockGossipTimes, slot) } diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 05f2dc0e3..77caacff5 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -288,7 +288,7 @@ func TestBlockProcessingTimeCleanup(t *testing.T) { // After processing slot 150, entries older than (150 - 32) = 118 are removed // Remaining entries: odd slots from 119-149 (never processed) = 16 entries // Even slots are immediately deleted after processing - require.Equal(t, 16, len(l.blockGossipTimes)) + require.Len(t, l.blockGossipTimes, 16) // Verify recent unprocessed entries are still there (odd slots from end) addrMap, found := l.blockGossipTimes[149] diff --git a/app/vmock.go b/app/vmock.go index 7dd4511d7..2122d3cdf 100644 --- a/app/vmock.go +++ b/app/vmock.go @@ -85,9 +85,13 @@ func newVMockEth2Provider(conf Config, pubshares []eth2p0.BLSPubKey) func() (eth } cached = eth2wrap.AdaptEth2HTTP(eth2Http, nil, timeout) + valCache := eth2wrap.NewValidatorCache(cached, pubshares) cached.SetValidatorCache(valCache.GetByHead) + dutiesCache := eth2wrap.NewDutiesCache(cached, []eth2p0.ValidatorIndex{}) + cached.SetDutiesCache(dutiesCache.ProposerDutiesCache, dutiesCache.AttesterDutiesCache, dutiesCache.SyncCommDutiesCache) + break } diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index f46103c7f..5a7f2222c 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -93,8 +93,10 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef // Check if attestation data was already fetched early and cached if cached, ok := f.attDataCache.Load(duty.Slot); ok { f.attDataCache.Delete(duty.Slot) + if data, valid := cached.(core.UnsignedDataSet); valid { unsignedSet = data + log.Debug(ctx, "Using early-fetched attestation data from cache", z.U64("slot", duty.Slot)) } else { log.Warn(ctx, "Cached attestation data has invalid type, re-fetching", err, z.U64("slot", duty.Slot)) diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index 437fa0bf5..9564cc0ba 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -672,6 +672,7 @@ func TestFetchOnly(t *testing.T) { // FetchOnly should not trigger subscribers subscriberCalled := false + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { subscriberCalled = true return nil @@ -730,6 +731,7 @@ func TestFetchOnly(t *testing.T) { fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { require.Equal(t, duty, resDuty) require.Len(t, resDataSet, 2) + return nil }) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 44cda91f5..f27c2f6cf 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -181,6 +181,7 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAd Slot: uint64(slot), Type: core.DutyAttester, } + defSet, ok := s.getDutyDefinitionSet(duty) if !ok { // Nothing for this duty @@ -317,6 +318,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { if !s.waitForBlockEventOrTimeout(dutyCtx, slot) { return // context cancelled } + s.eventTriggeredAttestations.Store(slot.Slot, true) } else if !delaySlotOffset(dutyCtx, slot, duty, s.delayFunc) { return // context cancelled @@ -376,11 +378,13 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl log.Warn(ctx, "Slot offset not found for attester duty, proceeding immediately", nil, z.U64("slot", slot.Slot)) return true } + offset := fn(slot.SlotDuration) // Add 300ms delay only if FetchAttOnBlockWithDelay is enabled if featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { offset += 300 * time.Millisecond } + fallbackDeadline := slot.Time.Add(offset) select { @@ -397,6 +401,7 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl z.U64("slot", slot.Slot)) } } + return true } } @@ -443,17 +448,12 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error { // resolveAttDuties resolves attester duties for the given validators. func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals validators) error { - opts := ð2api.AttesterDutiesOpts{ - Epoch: eth2p0.Epoch(slot.Epoch()), - Indices: vals.Indexes(), - } - - eth2Resp, err := s.eth2Cl.AttesterDuties(ctx, opts) + eth2Resp, err := s.eth2Cl.AttesterDutiesCache(ctx, eth2p0.Epoch(slot.Epoch()), vals.Indexes()) if err != nil { return err } - attDuties := eth2Resp.Data + attDuties := eth2Resp // Check if any of the attester duties returned are nil. for _, duty := range attDuties { @@ -524,17 +524,12 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v // resolveProDuties resolves proposer duties for the given validators. func (s *Scheduler) resolveProDuties(ctx context.Context, slot core.Slot, vals validators) error { - opts := ð2api.ProposerDutiesOpts{ - Epoch: eth2p0.Epoch(slot.Epoch()), - Indices: vals.Indexes(), - } - - eth2Resp, err := s.eth2Cl.ProposerDuties(ctx, opts) + eth2Resp, err := s.eth2Cl.ProposerDutiesCache(ctx, eth2p0.Epoch(slot.Epoch()), vals.Indexes()) if err != nil { return err } - proDuties := eth2Resp.Data + proDuties := eth2Resp // Check if any of the proposer duties returned are nil. for _, duty := range proDuties { @@ -578,17 +573,12 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot core.Slot, vals v // resolveSyncCommDuties resolves sync committee duties for the validators in the given slot's epoch, caching the results. func (s *Scheduler) resolveSyncCommDuties(ctx context.Context, slot core.Slot, vals validators) error { - opts := ð2api.SyncCommitteeDutiesOpts{ - Epoch: eth2p0.Epoch(slot.Epoch()), - Indices: vals.Indexes(), - } - - eth2Resp, err := s.eth2Cl.SyncCommitteeDuties(ctx, opts) + eth2Resp, err := s.eth2Cl.SyncCommDutiesCache(ctx, eth2p0.Epoch(slot.Epoch()), vals.Indexes()) if err != nil { return err } - duties := eth2Resp.Data + duties := eth2Resp // Check if any of the sync committee duties returned are nil. for _, duty := range duties { @@ -767,6 +757,7 @@ func (s *Scheduler) trimDuties(epoch uint64) { // trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { ctx := context.Background() + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) if err != nil { log.Warn(ctx, "Failed to fetch slots config for trimming event triggered attestations", err, z.U64("epoch", epoch)) @@ -774,14 +765,17 @@ func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { } minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch + s.eventTriggeredAttestations.Range(func(key, _ any) bool { slot, ok := key.(uint64) if !ok { return true // continue iteration } + if slot < minSlotToKeep { s.eventTriggeredAttestations.Delete(slot) } + return true // continue iteration }) } diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index f74e0d505..5f2ddd8ef 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -636,18 +636,13 @@ func (a *InclusionChecker) Run(ctx context.Context) { attesterDuties = []*eth2v1.AttesterDuty{} } else { // TODO: This can be optimised by not calling attester duties on every slot, in the case of small clusters, where there are <32 validators per cluster. - opts := ð2api.AttesterDutiesOpts{ - Epoch: epoch, - Indices: indices, - } - - resp, err := a.eth2Cl.AttesterDuties(ctx, opts) + resp, err := a.eth2Cl.AttesterDutiesCache(ctx, epoch, indices) if err != nil { log.Warn(ctx, "Failed to fetch attester duties for epoch", err, z.U64("epoch", uint64(epoch)), z.Any("indices", indices)) attesterDuties = []*eth2v1.AttesterDuty{} } else { - attesterDuties = resp.Data + attesterDuties = resp } } diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 1cf8df598..9a6cd5799 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -1115,29 +1115,28 @@ func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDut span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) defer span.End() - eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts) + cachedResp, err := c.eth2Cl.ProposerDutiesCache(ctx, opts.Epoch, opts.Indices) if err != nil { return nil, err } - duties := eth2Resp.Data + // Replace root public keys with public shares. + // Duties are copied into new slice, as otherwise the cached duties would be modified. + dutiesShareKey := []*eth2v1.ProposerDuty{} - // Replace root public keys with public shares - for i := range len(duties) { - if duties[i] == nil { - return nil, errors.New("proposer duty cannot be nil") - } + for _, d := range cachedResp { + duty := *d - pubshare, ok := c.getPubShareFunc(duties[i].PubKey) + pubshare, ok := c.getPubShareFunc(duty.PubKey) if !ok { - // Ignore unknown validators since ProposerDuties returns ALL proposers for the epoch if validatorIndices is empty. - continue + return nil, errors.New("pubshare not found") } - duties[i].PubKey = pubshare + duty.PubKey = pubshare + dutiesShareKey = append(dutiesShareKey, &duty) } - return wrapResponseWithMetadata(duties, eth2Resp.Metadata), nil + return wrapResponse(dutiesShareKey), nil } func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOpts) (*eth2api.Response[[]*eth2v1.AttesterDuty], error) { @@ -1148,51 +1147,56 @@ func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDut span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) defer span.End() - eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts) + cachedResp, err := c.eth2Cl.AttesterDutiesCache(ctx, opts.Epoch, opts.Indices) if err != nil { return nil, err } - duties := eth2Resp.Data - + duties := []*eth2v1.AttesterDuty{} // Replace root public keys with public shares. - for i := range len(duties) { - if duties[i] == nil { + for _, d := range cachedResp { + if d == nil { return nil, errors.New("attester duty cannot be nil") } - pubshare, ok := c.getPubShareFunc(duties[i].PubKey) + duty := *d + + pubshare, ok := c.getPubShareFunc(duty.PubKey) if !ok { return nil, errors.New("pubshare not found") } - duties[i].PubKey = pubshare + duty.PubKey = pubshare + duties = append(duties, &duty) } - return wrapResponseWithMetadata(duties, eth2Resp.Metadata), nil + return wrapResponse(duties), nil } // SyncCommitteeDuties obtains sync committee duties. If validatorIndices is nil it will return all duties for the given epoch. func (c Component) SyncCommitteeDuties(ctx context.Context, opts *eth2api.SyncCommitteeDutiesOpts) (*eth2api.Response[[]*eth2v1.SyncCommitteeDuty], error) { - eth2Resp, err := c.eth2Cl.SyncCommitteeDuties(ctx, opts) + cachedResp, err := c.eth2Cl.SyncCommDutiesCache(ctx, opts.Epoch, opts.Indices) if err != nil { return nil, err } - duties := eth2Resp.Data + duties := []*eth2v1.SyncCommitteeDuty{} // Replace root public keys with public shares. - for i := range len(duties) { - if duties[i] == nil { + for _, d := range cachedResp { + if d == nil { return nil, errors.New("sync committee duty cannot be nil") } - pubshare, ok := c.getPubShareFunc(duties[i].PubKey) + duty := *d + + pubshare, ok := c.getPubShareFunc(duty.PubKey) if !ok { return nil, errors.New("pubshare not found") } - duties[i].PubKey = pubshare + duty.PubKey = pubshare + duties = append(duties, &duty) } return wrapResponse(duties), nil diff --git a/core/validatorapi/validatorapi_test.go b/core/validatorapi/validatorapi_test.go index 679466239..f74f60443 100644 --- a/core/validatorapi/validatorapi_test.go +++ b/core/validatorapi/validatorapi_test.go @@ -1782,6 +1782,16 @@ func TestComponent_Duties(t *testing.T) { }}, nil } + bmock.CachedProposerDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, indices []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + require.Equal(t, epoch, eth2p0.Epoch(epch)) + require.Equal(t, []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}, indices) + + return []*eth2v1.ProposerDuty{{ + PubKey: eth2Pubkey, + ValidatorIndex: vIdx, + }}, nil + } + // Construct the validator api component vapi, err := validatorapi.NewComponent(bmock, allPubSharesByKey, shareIdx, nil, false, 30000000, nil) require.NoError(t, err) @@ -1809,6 +1819,16 @@ func TestComponent_Duties(t *testing.T) { }}, nil } + bmock.CachedAttesterDutiesFunc = func(_ context.Context, epoch eth2p0.Epoch, indices []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + require.Equal(t, epoch, eth2p0.Epoch(epch)) + require.Equal(t, []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}, indices) + + return []*eth2v1.AttesterDuty{{ + PubKey: eth2Pubkey, + ValidatorIndex: vIdx, + }}, nil + } + // Construct the validator api component vapi, err := validatorapi.NewComponent(bmock, allPubSharesByKey, shareIdx, nil, false, 30000000, nil) require.NoError(t, err) @@ -1836,6 +1856,16 @@ func TestComponent_Duties(t *testing.T) { }}, nil } + bmock.CachedSyncCommDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, indices []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + require.Equal(t, epoch, eth2p0.Epoch(epch)) + require.Equal(t, []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}, indices) + + return []*eth2v1.SyncCommitteeDuty{{ + PubKey: eth2Pubkey, + ValidatorIndex: vIdx, + }}, nil + } + // Construct the validator api component vapi, err := validatorapi.NewComponent(bmock, allPubSharesByKey, shareIdx, nil, false, 30000000, nil) require.NoError(t, err) diff --git a/testutil/beaconmock/beaconmock.go b/testutil/beaconmock/beaconmock.go index 7cb176f1a..1afa8015a 100644 --- a/testutil/beaconmock/beaconmock.go +++ b/testutil/beaconmock/beaconmock.go @@ -188,9 +188,11 @@ type Mock struct { IsActiveFunc func() bool IsSyncedFunc func() bool + UpdateCacheIndicesFunc func(context.Context, []eth2p0.ValidatorIndex) CachedValidatorsFunc func(ctx context.Context) (eth2wrap.ActiveValidators, eth2wrap.CompleteValidators, error) AttestationDataFunc func(context.Context, eth2p0.Slot, eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) AttesterDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) + CachedAttesterDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) BlockFunc func(ctx context.Context, stateID string) (*eth2spec.VersionedSignedBeaconBlock, error) BeaconBlockAttestationsFunc func(context.Context, *eth2api.BeaconBlockAttestationsOpts) ([]*eth2spec.VersionedAttestation, error) BeaconCommitteesFunc func(ctx context.Context, opts *eth2api.BeaconCommitteesOpts) ([]*eth2v1.BeaconCommittee, error) @@ -198,6 +200,7 @@ type Mock struct { ProposalFunc func(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.VersionedProposal, error) SignedBeaconBlockFunc func(ctx context.Context, blockID string) (*eth2spec.VersionedSignedBeaconBlock, error) ProposerDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) + CachedProposerDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) SubmitAttestationsFunc func(context.Context, *eth2api.SubmitAttestationsOpts) error SubmitProposalFunc func(context.Context, *eth2api.SubmitProposalOpts) error SubmitBlindedProposalFunc func(context.Context, *eth2api.SubmitBlindedProposalOpts) error @@ -214,6 +217,7 @@ type Mock struct { AggregateAttestationFunc func(ctx context.Context, slot eth2p0.Slot, attestationDataRoot eth2p0.Root) (*eth2spec.VersionedAttestation, error) SubmitAggregateAttestationsFunc func(ctx context.Context, aggregateAndProofs *eth2api.SubmitAggregateAttestationsOpts) error SyncCommitteeDutiesFunc func(ctx context.Context, epoch eth2p0.Epoch, validatorIndices []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) + CachedSyncCommDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) SubmitSyncCommitteeMessagesFunc func(ctx context.Context, messages []*altair.SyncCommitteeMessage) error SubmitSyncCommitteeContributionsFunc func(ctx context.Context, contributionAndProofs []*altair.SignedContributionAndProof) error SyncCommitteeContributionFunc func(ctx context.Context, slot eth2p0.Slot, subcommitteeIndex uint64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error) @@ -224,6 +228,10 @@ type Mock struct { ProxyFunc func(context.Context, *http.Request) (*http.Response, error) } +func (m Mock) UpdateCacheIndices(ctx context.Context, idxs []eth2p0.ValidatorIndex) { + m.UpdateCacheIndicesFunc(ctx, idxs) +} + func (m Mock) AggregateAttestation(ctx context.Context, opts *eth2api.AggregateAttestationOpts) (*eth2api.Response[*eth2spec.VersionedAttestation], error) { aggAtt, err := m.AggregateAttestationFunc(ctx, opts.Slot, opts.AttestationDataRoot) if err != nil { @@ -251,6 +259,10 @@ func (m Mock) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOp return wrapResponseWithMetadata(duties), nil } +func (m Mock) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return m.CachedAttesterDutiesFunc(ctx, epoch, vidxs) +} + func (m Mock) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.Response[*eth2api.VersionedProposal], error) { block, err := m.ProposalFunc(ctx, opts) if err != nil { @@ -295,6 +307,10 @@ func (m Mock) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDutiesOp return wrapResponseWithMetadata(duties), nil } +func (m Mock) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return m.CachedProposerDutiesFunc(ctx, epoch, vidxs) +} + func (m Mock) SignedBeaconBlock(ctx context.Context, opts *eth2api.SignedBeaconBlockOpts) (*eth2api.Response[*eth2spec.VersionedSignedBeaconBlock], error) { block, err := m.SignedBeaconBlockFunc(ctx, opts.Block) if err != nil { @@ -322,6 +338,10 @@ func (m Mock) SyncCommitteeDuties(ctx context.Context, opts *eth2api.SyncCommitt return wrapResponse(duties), nil } +func (m Mock) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return m.CachedSyncCommDutiesFunc(ctx, epoch, vidxs) +} + func (m Mock) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts) (*eth2api.Response[map[eth2p0.ValidatorIndex]*eth2v1.Validator], error) { vals, err := m.ValidatorsFunc(ctx, opts) if err != nil { @@ -335,6 +355,14 @@ func (Mock) SetValidatorCache(func(context.Context) (eth2wrap.ActiveValidators, // Ignore this, only rely on WithValidator functional option. } +func (Mock) SetDutiesCache( + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error), + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error), + func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error), +) { + // Ignore this, only rely on duties functional option. +} + func (m Mock) ActiveValidators(ctx context.Context) (eth2wrap.ActiveValidators, error) { active, _, err := m.CachedValidatorsFunc(ctx) return active, err diff --git a/testutil/beaconmock/beaconmock_fuzz.go b/testutil/beaconmock/beaconmock_fuzz.go index 510a88398..de959081e 100644 --- a/testutil/beaconmock/beaconmock_fuzz.go +++ b/testutil/beaconmock/beaconmock_fuzz.go @@ -98,6 +98,9 @@ func WithBeaconMockFuzzer() Option { return duties, nil } + mock.CachedAttesterDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return mock.AttesterDutiesFunc(ctx, epoch, vidxs) + } mock.ProposerDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { var duties []*eth2v1.ProposerDuty @@ -105,6 +108,12 @@ func WithBeaconMockFuzzer() Option { return duties, nil } + mock.CachedProposerDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + var duties []*eth2v1.ProposerDuty + fuzz.New().Fuzz(&duties) + + return duties, nil + } mock.AttestationDataFunc = func(context.Context, eth2p0.Slot, eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) { var attData *eth2p0.AttestationData @@ -173,6 +182,9 @@ func WithBeaconMockFuzzer() Option { return duties, nil } + mock.CachedSyncCommDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return mock.SyncCommitteeDutiesFunc(ctx, epoch, vidxs) + } mock.SyncCommitteeContributionFunc = func(context.Context, eth2p0.Slot, uint64, eth2p0.Root) (*altair.SyncCommitteeContribution, error) { var contribution *altair.SyncCommitteeContribution diff --git a/testutil/beaconmock/options.go b/testutil/beaconmock/options.go index 10c8d0304..96b9f785a 100644 --- a/testutil/beaconmock/options.go +++ b/testutil/beaconmock/options.go @@ -390,6 +390,9 @@ func WithDeterministicAttesterDuties(factor int) Option { return resp, nil } + mock.CachedAttesterDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return mock.AttesterDutiesFunc(ctx, epoch, vidxs) + } } } @@ -438,6 +441,9 @@ func WithDeterministicProposerDuties(factor int) Option { return resp, nil } + mock.CachedProposerDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return mock.ProposerDutiesFunc(ctx, epoch, vidxs) + } } } @@ -447,6 +453,9 @@ func WithNoProposerDuties() Option { mock.ProposerDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { return nil, nil } + mock.CachedProposerDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return mock.ProposerDutiesFunc(ctx, epoch, vidxs) + } } } @@ -456,6 +465,9 @@ func WithNoAttesterDuties() Option { mock.AttesterDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { return nil, nil } + mock.CachedAttesterDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return mock.AttesterDutiesFunc(ctx, epoch, vidxs) + } } } @@ -465,6 +477,9 @@ func WithNoSyncCommitteeDuties() Option { mock.SyncCommitteeDutiesFunc = func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { return nil, nil } + mock.CachedSyncCommDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return mock.SyncCommitteeDutiesFunc(ctx, epoch, vidxs) + } } } @@ -506,6 +521,9 @@ func WithDeterministicSyncCommDuties(n, k int) Option { return resp, nil } + mock.CachedSyncCommDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return mock.SyncCommitteeDutiesFunc(ctx, epoch, vidxs) + } mock.overrides = append(mock.overrides, staticOverride{ Endpoint: "/eth/v1/config/spec", @@ -586,9 +604,15 @@ func defaultMock(httpMock HTTPMock, httpServer *http.Server, clock clockwork.Clo ProposerDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { return []*eth2v1.ProposerDuty{}, nil }, + CachedProposerDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + return []*eth2v1.ProposerDuty{}, nil + }, AttesterDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { return []*eth2v1.AttesterDuty{}, nil }, + CachedAttesterDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + return []*eth2v1.AttesterDuty{}, nil + }, BeaconBlockAttestationsFunc: func(context.Context, *eth2api.BeaconBlockAttestationsOpts) ([]*eth2spec.VersionedAttestation, error) { return []*eth2spec.VersionedAttestation{}, nil }, @@ -686,6 +710,9 @@ func defaultMock(httpMock HTTPMock, httpServer *http.Server, clock clockwork.Clo SyncCommitteeDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { return []*eth2v1.SyncCommitteeDuty{}, nil }, + CachedSyncCommDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + return []*eth2v1.SyncCommitteeDuty{}, nil + }, SyncCommitteeSelectionsFunc: func(_ context.Context, opts *eth2api.SyncCommitteeSelectionsOpts) ([]*eth2v1.SyncCommitteeSelection, error) { return opts.Selections, nil }, diff --git a/testutil/integration/helpers_test.go b/testutil/integration/helpers_test.go index 54a2d530f..2516aadb5 100644 --- a/testutil/integration/helpers_test.go +++ b/testutil/integration/helpers_test.go @@ -6,13 +6,11 @@ import ( "context" "flag" "fmt" - "net" "sync" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" @@ -68,54 +66,6 @@ func (a *asserter) await(ctx context.Context, t *testing.T, expect int) error { return nil } -// externalIP returns the hosts external IP. -// Copied from https://stackoverflow.com/questions/23558425/how-do-i-get-the-local-ip-address-in-go. -func externalIP(t *testing.T) string { - t.Helper() - - ifaces, err := net.Interfaces() - require.NoError(t, err) - - for _, iface := range ifaces { - if iface.Flags&net.FlagUp == 0 { - continue // interface down - } - - if iface.Flags&net.FlagLoopback != 0 { - continue // loopback interface - } - - addrs, err := iface.Addrs() - require.NoError(t, err) - - for _, addr := range addrs { - var ip net.IP - - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - - if ip == nil || ip.IsLoopback() { - continue - } - - ip = ip.To4() - if ip == nil { - continue // not an ipv4 address - } - - return ip.String() - } - } - - t.Fatal("no network?") - - return "" -} - func peerCtx(ctx context.Context, idx int) context.Context { return log.WithCtx(ctx, z.Int("peer_index", idx)) } diff --git a/testutil/integration/simnet_test.go b/testutil/integration/simnet_test.go index 52abeea60..4d71a6f4c 100644 --- a/testutil/integration/simnet_test.go +++ b/testutil/integration/simnet_test.go @@ -4,14 +4,7 @@ package integration_test import ( "context" - "fmt" "math/rand" - "net/http" - "os" - "os/exec" - "path" - "strconv" - "strings" "sync" "testing" "time" @@ -28,7 +21,6 @@ import ( "github.com/obolnetwork/charon/cluster" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/parsigex" - "github.com/obolnetwork/charon/eth2util/keystore" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/tbls" "github.com/obolnetwork/charon/testutil" @@ -42,7 +34,6 @@ type vcType int const ( vcUnknown vcType = 0 vcVmock vcType = 1 - vcTeku vcType = 2 ) //go:generate go test . -integration -v -run=TestSimnetDuties @@ -55,7 +46,6 @@ func TestSimnetDuties(t *testing.T) { scheduledType core.DutyType duties []core.DutyType builderAPI bool - tekuRegistration bool pregenRegistration bool exit bool vcType vcType @@ -66,59 +56,18 @@ func TestSimnetDuties(t *testing.T) { duties: []core.DutyType{core.DutyPrepareAggregator, core.DutyAttester, core.DutyAggregator}, vcType: vcVmock, }, - // TODO(kalo): Teku tests fail with invalid signature since removal of v1 endpoints. - // TODO(kalo): Assess if it is still the case, once pre-electra is implemented for v2 endpoints. - // { - // name: "attester with teku", - // scheduledType: core.DutyAttester, - // duties: []core.DutyType{core.DutyAttester}, // Teku does not support beacon committee selection - // vcType: vcTeku, - // }, { name: "proposer with mock VCs", scheduledType: core.DutyProposer, duties: []core.DutyType{core.DutyProposer, core.DutyRandao}, vcType: vcVmock, }, - // TODO(kalo): Teku tests fail with invalid signature since removal of v1 endpoints. - // TODO(kalo): Assess if it is still the case, once pre-electra is implemented for v2 endpoints. - // { - // name: "proposer with teku", - // scheduledType: core.DutyProposer, - // duties: []core.DutyType{core.DutyProposer, core.DutyRandao}, - // vcType: vcTeku, - // }, - // TODO(kalo): Teku tests fail with invalid signature since removal of v1 endpoints. - // TODO(kalo): Assess if it is still the case, once pre-electra is implemented for v2 endpoints. - // { - // name: "builder registration with teku", - // duties: []core.DutyType{core.DutyBuilderRegistration}, - // tekuRegistration: true, - // builderAPI: true, - // vcType: vcTeku, - // }, { name: "sync committee with mock VCs", scheduledType: core.DutySyncMessage, duties: []core.DutyType{core.DutyPrepareSyncContribution, core.DutySyncMessage, core.DutySyncContribution}, vcType: vcVmock, }, - // TODO(kalo): Teku tests fail with invalid signature since removal of v1 endpoints. - // TODO(kalo): Assess if it is still the case, once pre-electra is implemented for v2 endpoints. - // { - // name: "sync committee with teku", - // scheduledType: core.DutySyncMessage, - // duties: []core.DutyType{core.DutySyncMessage}, // Teku doesn't support sync committee selection. - // vcType: vcTeku, - // }, - // TODO(kalo): Teku tests fail with invalid signature since removal of v1 endpoints. - // TODO(kalo): Assess if it is still the case, once pre-electra is implemented for v2 endpoints. - // { - // name: "voluntary exit with teku", - // duties: []core.DutyType{core.DutyExit}, - // exit: true, - // vcType: vcTeku, - // }, // TODO(andrei): Need a redesign due to how builder registration is handled now. // { // name: "builder registration with mock VCs", @@ -146,15 +95,10 @@ func TestSimnetDuties(t *testing.T) { t.Logf("Running test: %v", t.Name()) args := newSimnetArgs(t) - args.TekuRegistration = test.tekuRegistration args.BuilderAPI = test.builderAPI args.VoluntaryExit = test.exit switch test.vcType { - case vcTeku: - for i := range args.N { - args = startTeku(t, args, i) - } case vcVmock: args.VMocks = true case vcUnknown: @@ -197,7 +141,6 @@ type simnetArgs struct { Lock cluster.Lock ErrChan chan error BuilderAPI bool - TekuRegistration bool SyntheticProposals bool VoluntaryExit bool } @@ -311,7 +254,6 @@ func testSimnet(t *testing.T, args simnetArgs, expect *simnetExpect) { ctx, cancel := context.WithCancel(context.Background()) relayAddr := relay.StartRelay(ctx, t) - // NOTE: We can add support for in-memory transport to QBFT. parSigExFunc := parsigex.NewMemExFunc(args.N) type simResult struct { @@ -464,134 +406,3 @@ func testSimnet(t *testing.T, args simnetArgs, expect *simnetExpect) { testutil.SkipIfBindErr(t, err) testutil.RequireNoError(t, err) } - -type tekuCmd []string - -var ( - tekuVC tekuCmd = []string{ - "validator-client", - "--network=auto", - "--log-destination=console", - "--validators-external-signer-slashing-protection-enabled=true", - "--validators-proposer-default-fee-recipient=0x000000000000000000000000000000000000dead", - "--Xattestations-v2-apis-enabled=true", - } - tekuExit tekuCmd = []string{ - "voluntary-exit", - "--confirmation-enabled=false", - "--epoch=1", - } -) - -// startTeku starts a teku validator client for the provided node and returns updated args. -// See https://docs.teku.consensys.net/en/latest/Reference/CLI/CLI-Syntax/. -func startTeku(t *testing.T, args simnetArgs, node int) simnetArgs { - t.Helper() - - cmd := tekuVC - if args.VoluntaryExit { - cmd = tekuExit - } - - tempDir := t.TempDir() - // Support specifying a custom base directory for docker mounts (required if running colima on macOS). - if dir, ok := os.LookupEnv("TEST_DOCKER_DIR"); ok { - var err error - - tempDir, err = os.MkdirTemp(dir, "") //nolint: usetesting // support custom base directory - require.NoError(t, err) - } - - // Write private share keystore and password - err := keystore.StoreKeysInsecure([]tbls.PrivateKey{args.SimnetKeys[node]}, tempDir, keystore.ConfirmInsecureKeys) - require.NoError(t, err) - err = os.WriteFile(path.Join(tempDir, "keystore-simnet-0.txt"), []byte("simnet"), 0o644) - require.NoError(t, err) - - // Change VAPI bind address to host external IP - args.VAPIAddrs[node] = strings.Replace(args.VAPIAddrs[node], "127.0.0.1", externalIP(t), 1) - - var tekuArgs []string - - tekuArgs = append(tekuArgs, cmd...) - tekuArgs = append(tekuArgs, - "--validator-keys=/keys:/keys", - "--beacon-node-api-endpoint=http://"+args.VAPIAddrs[node], - ) - - if args.TekuRegistration { - tekuArgs = append(tekuArgs, - "--validators-proposer-config-refresh-enabled=true", - fmt.Sprintf("--validators-proposer-config=http://%s/teku_proposer_config", args.VAPIAddrs[node]), - ) - } - - if args.BuilderAPI { - tekuArgs = append(tekuArgs, - "--validators-proposer-blinded-blocks-enabled=true", - ) - } - - // Configure docker - name := strconv.FormatInt(time.Now().UnixNano(), 10) - dockerArgs := []string{ - "run", - "--rm", - "--name=" + name, - fmt.Sprintf("--volume=%s:/keys", tempDir), - "--user=root", // Root required to read volume files in GitHub actions. - "consensys/teku:25.4.1", - } - dockerArgs = append(dockerArgs, tekuArgs...) - t.Logf("docker args: %v", dockerArgs) - - // Start teku - ctx, cancel := context.WithCancel(context.Background()) - - go func() { - // wait for beaconmock to be available - tout := time.After(10 * time.Second) - - bnOnline := false - for !bnOnline { - select { - case <-tout: - args.ErrChan <- errors.New("beaconmock wasn't available after 10s") - return - default: - _, err := http.Get("http://" + args.VAPIAddrs[node] + "/up") - if err != nil { - t.Logf("beaconmock not available yet...") - time.Sleep(500 * time.Millisecond) - - continue - } - - bnOnline = true - - t.Logf("beaconmock online, starting up teku") - } - } - - c := exec.CommandContext(ctx, "docker", dockerArgs...) - c.Stdout = os.Stdout - c.Stderr = os.Stderr - - err = c.Run() - if err == nil || ctx.Err() != nil { - // Expected shutdown - return - } - - args.ErrChan <- errors.Wrap(err, "docker command failed (see logging)") - }() - - // Kill the container when done (context cancel is not enough for some reason). - testutil.EnsureCleanup(t, func() { - cancel() - t.Log("stopping teku docker container", name) - _ = exec.Command("docker", "kill", name).Run() - }) - - return args -}