Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orchestrator/stage/fetchstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func testStoreConfig(
valueType string,
objStore dstore.Store,
) *store.Config {
conf, err := store.NewConfig(name, moduleInitialBlock, moduleHash, updatePolicy, valueType, objStore, nil)
conf, err := store.NewConfig(name, moduleInitialBlock, moduleHash, updatePolicy, valueType, objStore, nil, 0)
require.NoError(t, err)
return conf
}
Expand Down
2 changes: 2 additions & 0 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelCodes "go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int,
FoundationalStoreEndpoints: tier2ReqParams.FoundationalStoreEndpoints,
EthCallFallbackToLatestDuration: int64(reqctx.EthCallFallbackToLatestDuration(ctx)),
EthCallFallbackToNumberDuration: int64(reqctx.EthCallUseBlockNumberDuration(ctx)),
StoreSizeLimit: store.StoreSizeLimit,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pb/sf/codegen/conversation/v1/conversation_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions pb/sf/substreams/intern/v2/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pb/sf/substreams/intern/v2/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions pb/sf/substreams/intern/v2/service_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pb/sf/substreams/rpc/v2/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pb/sf/substreams/rpc/v3/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pb/sf/substreams/rpc/v4/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions pb/sf/substreams/sink/service/v1/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap {
objStore := dstore.NewMockStore(nil)

for _, conf := range configs {
newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, nil)
newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, nil, 0)
require.NoError(t, err)
confMap[newStore.Name()] = newStore

Expand Down
2 changes: 2 additions & 0 deletions proto/sf/substreams/intern/v2/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ message ProcessRangeRequest {

int64 eth_call_fallback_to_latest_duration = 19; // if non-zero, above this duration in nanoseconds, the eth_calls will use "latest" instead of block ref. It will also automatically retry 'block not found' with 'latest'. It has precedence over eth_call_fallback_to_number_duration.
int64 eth_call_fallback_to_number_duration = 20; // if non-zero, above this duration in nanoseconds, the eth_calls will use block number instead of block hash

uint64 store_size_limit = 21; // if non-zero, overrides the default store size limit (in bytes) for stores loaded during this request
}

message ProcessRangeResponse {
Expand Down
1 change: 1 addition & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var activeRequestsHandlerKey = contextKeyType(12)
var partialBlocksKey = contextKeyType(13)
var ethCallFallbackToLatestDuration = contextKeyType(14)
var ethCallUseBlockNumberDuration = contextKeyType(15)
var storeSizeLimitKey = contextKeyType(16)

func WithSpkg(ctx context.Context, pkg *pbsubstreams.Package) context.Context {
return context.WithValue(ctx, spkgKey, pkg)
Expand Down
20 changes: 20 additions & 0 deletions reqctx/storelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package reqctx

import "context"

// WithStoreSizeLimit stores a per-request store size limit override in the context.
// When non-zero, this value overrides the default store.StoreSizeLimit for stores
// loaded during a tier2 request.
func WithStoreSizeLimit(ctx context.Context, limit uint64) context.Context {
return context.WithValue(ctx, storeSizeLimitKey, limit)
}

// StoreSizeLimit returns the per-request store size limit from the context, or 0
// if no override was set (meaning the default store.StoreSizeLimit should be used).
func StoreSizeLimit(ctx context.Context) uint64 {
limit, ok := ctx.Value(storeSizeLimitKey).(uint64)
if !ok {
return 0
}
return limit
}
2 changes: 1 addition & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func (s *Tier1Service) blocks(
return fmt.Errorf("new config map: %w", err)
}

storeConfigs, err := store.NewConfigMap(cacheStore, quickSaveStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock)
storeConfigs, err := store.NewConfigMap(cacheStore, quickSaveStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock, 0)
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
Expand Down
Loading
Loading