From 935799c3fcf55ccdf15f2fc66339be3d26189015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ste=CC=81phane=20Duchesneau?= Date: Thu, 16 Apr 2026 15:47:52 -0400 Subject: [PATCH] pass 'SUBSTREAMS_STORE_SIZE_LIMIT' from tier1 to tier2 and override env var value on tier2 if it is set from tier1 --- orchestrator/stage/fetchstorage_test.go | 2 +- orchestrator/work/worker.go | 2 ++ .../conversation/v1/conversation_grpc.pb.go | 8 ++--- pb/sf/substreams/intern/v2/service.pb.go | 13 ++++++-- pb/sf/substreams/intern/v2/service_grpc.pb.go | 6 ++-- .../intern/v2/service_vtproto.pb.go | 33 +++++++++++++++++++ pb/sf/substreams/rpc/v2/service_grpc.pb.go | 10 +++--- pb/sf/substreams/rpc/v3/service_grpc.pb.go | 6 ++-- pb/sf/substreams/rpc/v4/service_grpc.pb.go | 6 ++-- .../sink/service/v1/service_grpc.pb.go | 20 +++++------ pipeline/pipeline_test.go | 2 +- proto/sf/substreams/intern/v2/service.proto | 2 ++ reqctx/context.go | 1 + reqctx/storelimit.go | 20 +++++++++++ service/tier1.go | 2 +- service/tier1_test.go | 1 + service/tier2.go | 6 +++- storage/store/config.go | 10 ++++-- storage/store/configmap.go | 5 ++- storage/store/init_test.go | 2 +- tools/analytics_store_stats.go | 1 + tools/check.go | 2 +- tools/decode.go | 4 +-- tools/module.go | 1 + wasm/call_test.go | 2 +- 25 files changed, 125 insertions(+), 42 deletions(-) create mode 100644 reqctx/storelimit.go diff --git a/orchestrator/stage/fetchstorage_test.go b/orchestrator/stage/fetchstorage_test.go index 2b2a35c65..96169abb8 100644 --- a/orchestrator/stage/fetchstorage_test.go +++ b/orchestrator/stage/fetchstorage_test.go @@ -569,7 +569,7 @@ func testStoreConfig( valueType string, objStore dstore.Store, ) *store.Config { - conf, err := store.NewConfig(name, moduleInitialBlock, moduleHash, updatePolicy, valueType, objStore, nil) + conf, err := store.NewConfig(name, moduleInitialBlock, moduleHash, updatePolicy, valueType, objStore, nil, 0) require.NoError(t, err) return conf } diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index db7a41989..579f6b109 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -23,6 +23,7 @@ import ( pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/streamingfast/substreams/reqctx" + "github.com/streamingfast/substreams/storage/store" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" otelCodes "go.opentelemetry.io/otel/codes" @@ -90,6 +91,7 @@ func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, FoundationalStoreEndpoints: tier2ReqParams.FoundationalStoreEndpoints, EthCallFallbackToLatestDuration: int64(reqctx.EthCallFallbackToLatestDuration(ctx)), EthCallFallbackToNumberDuration: int64(reqctx.EthCallUseBlockNumberDuration(ctx)), + StoreSizeLimit: store.StoreSizeLimit, } } diff --git a/pb/sf/codegen/conversation/v1/conversation_grpc.pb.go b/pb/sf/codegen/conversation/v1/conversation_grpc.pb.go index 91cf95a6e..c32558e36 100644 --- a/pb/sf/codegen/conversation/v1/conversation_grpc.pb.go +++ b/pb/sf/codegen/conversation/v1/conversation_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/codegen/conversation/v1/conversation.proto @@ -78,10 +78,10 @@ type ConversationServiceServer interface { type UnimplementedConversationServiceServer struct{} func (UnimplementedConversationServiceServer) Converse(grpc.BidiStreamingServer[UserInput, SystemOutput]) error { - return status.Errorf(codes.Unimplemented, "method Converse not implemented") + return status.Error(codes.Unimplemented, "method Converse not implemented") } func (UnimplementedConversationServiceServer) Discover(context.Context, *DiscoveryRequest) (*DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Discover not implemented") + return nil, status.Error(codes.Unimplemented, "method Discover not implemented") } func (UnimplementedConversationServiceServer) testEmbeddedByValue() {} @@ -93,7 +93,7 @@ type UnsafeConversationServiceServer interface { } func RegisterConversationServiceServer(s grpc.ServiceRegistrar, srv ConversationServiceServer) { - // If the following call pancis, it indicates UnimplementedConversationServiceServer was + // If the following call panics, it indicates UnimplementedConversationServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/intern/v2/service.pb.go b/pb/sf/substreams/intern/v2/service.pb.go index e0947623e..769441135 100644 --- a/pb/sf/substreams/intern/v2/service.pb.go +++ b/pb/sf/substreams/intern/v2/service.pb.go @@ -94,6 +94,7 @@ type ProcessRangeRequest struct { FoundationalStoreEndpoints map[string]string `protobuf:"bytes,18,rep,name=foundational_store_endpoints,json=foundationalStoreEndpoints,proto3" json:"foundational_store_endpoints,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` EthCallFallbackToLatestDuration int64 `protobuf:"varint,19,opt,name=eth_call_fallback_to_latest_duration,json=ethCallFallbackToLatestDuration,proto3" json:"eth_call_fallback_to_latest_duration,omitempty"` // if non-zero, above this duration in nanoseconds, the eth_calls will use "latest" instead of block ref. It will also automatically retry 'block not found' with 'latest'. It has precedence over eth_call_fallback_to_number_duration. EthCallFallbackToNumberDuration int64 `protobuf:"varint,20,opt,name=eth_call_fallback_to_number_duration,json=ethCallFallbackToNumberDuration,proto3" json:"eth_call_fallback_to_number_duration,omitempty"` // if non-zero, above this duration in nanoseconds, the eth_calls will use block number instead of block hash + StoreSizeLimit uint64 `protobuf:"varint,21,opt,name=store_size_limit,json=storeSizeLimit,proto3" json:"store_size_limit,omitempty"` // if non-zero, overrides the default store size limit (in bytes) for stores loaded during this request unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -255,6 +256,13 @@ func (x *ProcessRangeRequest) GetEthCallFallbackToNumberDuration() int64 { return 0 } +func (x *ProcessRangeRequest) GetStoreSizeLimit() uint64 { + if x != nil { + return x.StoreSizeLimit + } + return 0 +} + type ProcessRangeResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Types that are valid to be assigned to Type: @@ -837,7 +845,7 @@ var File_sf_substreams_intern_v2_service_proto protoreflect.FileDescriptor const file_sf_substreams_intern_v2_service_proto_rawDesc = "" + "\n" + - "%sf/substreams/intern/v2/service.proto\x12\x19sf.substreams.internal.v2\x1a\x19google/protobuf/any.proto\x1a\x1csf/substreams/v1/clock.proto\x1a\x1esf/substreams/v1/modules.proto\"\xa2\t\n" + + "%sf/substreams/intern/v2/service.proto\x12\x19sf.substreams.internal.v2\x1a\x19google/protobuf/any.proto\x1a\x1csf/substreams/v1/clock.proto\x1a\x1esf/substreams/v1/modules.proto\"\xcc\t\n" + "\x13ProcessRangeRequest\x12(\n" + "\x0estop_block_num\x18\x02 \x01(\x04B\x02\x18\x01R\fstopBlockNum\x12#\n" + "\routput_module\x18\x03 \x01(\tR\foutputModule\x123\n" + @@ -859,7 +867,8 @@ const file_sf_substreams_intern_v2_service_proto_rawDesc = "" + "\rstream_output\x18\x11 \x01(\bR\fstreamOutput\x12\x90\x01\n" + "\x1cfoundational_store_endpoints\x18\x12 \x03(\v2N.sf.substreams.internal.v2.ProcessRangeRequest.FoundationalStoreEndpointsEntryR\x1afoundationalStoreEndpoints\x12M\n" + "$eth_call_fallback_to_latest_duration\x18\x13 \x01(\x03R\x1fethCallFallbackToLatestDuration\x12M\n" + - "$eth_call_fallback_to_number_duration\x18\x14 \x01(\x03R\x1fethCallFallbackToNumberDuration\x1aG\n" + + "$eth_call_fallback_to_number_duration\x18\x14 \x01(\x03R\x1fethCallFallbackToNumberDuration\x12(\n" + + "\x10store_size_limit\x18\x15 \x01(\x04R\x0estoreSizeLimit\x1aG\n" + "\x19WasmExtensionConfigsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\x1aM\n" + diff --git a/pb/sf/substreams/intern/v2/service_grpc.pb.go b/pb/sf/substreams/intern/v2/service_grpc.pb.go index 3557010df..4c7f366a8 100644 --- a/pb/sf/substreams/intern/v2/service_grpc.pb.go +++ b/pb/sf/substreams/intern/v2/service_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/substreams/intern/v2/service.proto @@ -71,7 +71,7 @@ type SubstreamsServer interface { type UnimplementedSubstreamsServer struct{} func (UnimplementedSubstreamsServer) ProcessRange(*ProcessRangeRequest, grpc.ServerStreamingServer[ProcessRangeResponse]) error { - return status.Errorf(codes.Unimplemented, "method ProcessRange not implemented") + return status.Error(codes.Unimplemented, "method ProcessRange not implemented") } func (UnimplementedSubstreamsServer) testEmbeddedByValue() {} @@ -83,7 +83,7 @@ type UnsafeSubstreamsServer interface { } func RegisterSubstreamsServer(s grpc.ServiceRegistrar, srv SubstreamsServer) { - // If the following call pancis, it indicates UnimplementedSubstreamsServer was + // If the following call panics, it indicates UnimplementedSubstreamsServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/intern/v2/service_vtproto.pb.go b/pb/sf/substreams/intern/v2/service_vtproto.pb.go index 00ee2e9aa..e8d4bba86 100644 --- a/pb/sf/substreams/intern/v2/service_vtproto.pb.go +++ b/pb/sf/substreams/intern/v2/service_vtproto.pb.go @@ -43,6 +43,7 @@ func (m *ProcessRangeRequest) CloneVT() *ProcessRangeRequest { r.StreamOutput = m.StreamOutput r.EthCallFallbackToLatestDuration = m.EthCallFallbackToLatestDuration r.EthCallFallbackToNumberDuration = m.EthCallFallbackToNumberDuration + r.StoreSizeLimit = m.StoreSizeLimit if rhs := m.WasmExtensionConfigs; rhs != nil { tmpContainer := make(map[string]string, len(rhs)) for k, v := range rhs { @@ -363,6 +364,9 @@ func (this *ProcessRangeRequest) EqualVT(that *ProcessRangeRequest) bool { if this.EthCallFallbackToNumberDuration != that.EthCallFallbackToNumberDuration { return false } + if this.StoreSizeLimit != that.StoreSizeLimit { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -769,6 +773,13 @@ func (m *ProcessRangeRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.StoreSizeLimit != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.StoreSizeLimit)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xa8 + } if m.EthCallFallbackToNumberDuration != 0 { i = protohelpers.EncodeVarint(dAtA, i, uint64(m.EthCallFallbackToNumberDuration)) i-- @@ -1534,6 +1545,9 @@ func (m *ProcessRangeRequest) SizeVT() (n int) { if m.EthCallFallbackToNumberDuration != 0 { n += 2 + protohelpers.SizeOfVarint(uint64(m.EthCallFallbackToNumberDuration)) } + if m.StoreSizeLimit != 0 { + n += 2 + protohelpers.SizeOfVarint(uint64(m.StoreSizeLimit)) + } n += len(m.unknownFields) return n } @@ -2448,6 +2462,25 @@ func (m *ProcessRangeRequest) UnmarshalVT(dAtA []byte) error { break } } + case 21: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StoreSizeLimit", wireType) + } + m.StoreSizeLimit = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StoreSizeLimit |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pb/sf/substreams/rpc/v2/service_grpc.pb.go b/pb/sf/substreams/rpc/v2/service_grpc.pb.go index 50d06b884..55d3b02c1 100644 --- a/pb/sf/substreams/rpc/v2/service_grpc.pb.go +++ b/pb/sf/substreams/rpc/v2/service_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/substreams/rpc/v2/service.proto @@ -72,7 +72,7 @@ type StreamServer interface { type UnimplementedStreamServer struct{} func (UnimplementedStreamServer) Blocks(*Request, grpc.ServerStreamingServer[Response]) error { - return status.Errorf(codes.Unimplemented, "method Blocks not implemented") + return status.Error(codes.Unimplemented, "method Blocks not implemented") } func (UnimplementedStreamServer) testEmbeddedByValue() {} @@ -84,7 +84,7 @@ type UnsafeStreamServer interface { } func RegisterStreamServer(s grpc.ServiceRegistrar, srv StreamServer) { - // If the following call pancis, it indicates UnimplementedStreamServer was + // If the following call panics, it indicates UnimplementedStreamServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -166,7 +166,7 @@ type EndpointInfoServer interface { type UnimplementedEndpointInfoServer struct{} func (UnimplementedEndpointInfoServer) Info(context.Context, *v2.InfoRequest) (*v2.InfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Info not implemented") + return nil, status.Error(codes.Unimplemented, "method Info not implemented") } func (UnimplementedEndpointInfoServer) testEmbeddedByValue() {} @@ -178,7 +178,7 @@ type UnsafeEndpointInfoServer interface { } func RegisterEndpointInfoServer(s grpc.ServiceRegistrar, srv EndpointInfoServer) { - // If the following call pancis, it indicates UnimplementedEndpointInfoServer was + // If the following call panics, it indicates UnimplementedEndpointInfoServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/rpc/v3/service_grpc.pb.go b/pb/sf/substreams/rpc/v3/service_grpc.pb.go index 321d2bed5..e801d7c71 100644 --- a/pb/sf/substreams/rpc/v3/service_grpc.pb.go +++ b/pb/sf/substreams/rpc/v3/service_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/substreams/rpc/v3/service.proto @@ -84,7 +84,7 @@ type StreamServer interface { type UnimplementedStreamServer struct{} func (UnimplementedStreamServer) Blocks(*Request, grpc.ServerStreamingServer[v2.Response]) error { - return status.Errorf(codes.Unimplemented, "method Blocks not implemented") + return status.Error(codes.Unimplemented, "method Blocks not implemented") } func (UnimplementedStreamServer) testEmbeddedByValue() {} @@ -96,7 +96,7 @@ type UnsafeStreamServer interface { } func RegisterStreamServer(s grpc.ServiceRegistrar, srv StreamServer) { - // If the following call pancis, it indicates UnimplementedStreamServer was + // If the following call panics, it indicates UnimplementedStreamServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/rpc/v4/service_grpc.pb.go b/pb/sf/substreams/rpc/v4/service_grpc.pb.go index ea6bcc5e6..73b7433e2 100644 --- a/pb/sf/substreams/rpc/v4/service_grpc.pb.go +++ b/pb/sf/substreams/rpc/v4/service_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/substreams/rpc/v4/service.proto @@ -84,7 +84,7 @@ type StreamServer interface { type UnimplementedStreamServer struct{} func (UnimplementedStreamServer) Blocks(*v3.Request, grpc.ServerStreamingServer[Response]) error { - return status.Errorf(codes.Unimplemented, "method Blocks not implemented") + return status.Error(codes.Unimplemented, "method Blocks not implemented") } func (UnimplementedStreamServer) testEmbeddedByValue() {} @@ -96,7 +96,7 @@ type UnsafeStreamServer interface { } func RegisterStreamServer(s grpc.ServiceRegistrar, srv StreamServer) { - // If the following call pancis, it indicates UnimplementedStreamServer was + // If the following call panics, it indicates UnimplementedStreamServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/sink/service/v1/service_grpc.pb.go b/pb/sf/substreams/sink/service/v1/service_grpc.pb.go index 3c2a40116..f1dd1f8d3 100644 --- a/pb/sf/substreams/sink/service/v1/service_grpc.pb.go +++ b/pb/sf/substreams/sink/service/v1/service_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc (unknown) // source: sf/substreams/sink/service/v1/service.proto @@ -153,28 +153,28 @@ type ProviderServer interface { type UnimplementedProviderServer struct{} func (UnimplementedProviderServer) Deploy(context.Context, *DeployRequest) (*DeployResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Deploy not implemented") + return nil, status.Error(codes.Unimplemented, "method Deploy not implemented") } func (UnimplementedProviderServer) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Update not implemented") + return nil, status.Error(codes.Unimplemented, "method Update not implemented") } func (UnimplementedProviderServer) Info(context.Context, *InfoRequest) (*InfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Info not implemented") + return nil, status.Error(codes.Unimplemented, "method Info not implemented") } func (UnimplementedProviderServer) List(context.Context, *ListRequest) (*ListResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method List not implemented") + return nil, status.Error(codes.Unimplemented, "method List not implemented") } func (UnimplementedProviderServer) Pause(context.Context, *PauseRequest) (*PauseResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Pause not implemented") + return nil, status.Error(codes.Unimplemented, "method Pause not implemented") } func (UnimplementedProviderServer) Stop(context.Context, *StopRequest) (*StopResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Stop not implemented") + return nil, status.Error(codes.Unimplemented, "method Stop not implemented") } func (UnimplementedProviderServer) Resume(context.Context, *ResumeRequest) (*ResumeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Resume not implemented") + return nil, status.Error(codes.Unimplemented, "method Resume not implemented") } func (UnimplementedProviderServer) Remove(context.Context, *RemoveRequest) (*RemoveResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Remove not implemented") + return nil, status.Error(codes.Unimplemented, "method Remove not implemented") } func (UnimplementedProviderServer) testEmbeddedByValue() {} @@ -186,7 +186,7 @@ type UnsafeProviderServer interface { } func RegisterProviderServer(s grpc.ServiceRegistrar, srv ProviderServer) { - // If the following call pancis, it indicates UnimplementedProviderServer was + // If the following call panics, it indicates UnimplementedProviderServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 87724e576..94111a9d7 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -189,7 +189,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap { objStore := dstore.NewMockStore(nil) for _, conf := range configs { - newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, nil) + newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, nil, 0) require.NoError(t, err) confMap[newStore.Name()] = newStore diff --git a/proto/sf/substreams/intern/v2/service.proto b/proto/sf/substreams/intern/v2/service.proto index 35555d098..9867fb97f 100644 --- a/proto/sf/substreams/intern/v2/service.proto +++ b/proto/sf/substreams/intern/v2/service.proto @@ -51,6 +51,8 @@ message ProcessRangeRequest { int64 eth_call_fallback_to_latest_duration = 19; // if non-zero, above this duration in nanoseconds, the eth_calls will use "latest" instead of block ref. It will also automatically retry 'block not found' with 'latest'. It has precedence over eth_call_fallback_to_number_duration. int64 eth_call_fallback_to_number_duration = 20; // if non-zero, above this duration in nanoseconds, the eth_calls will use block number instead of block hash + + uint64 store_size_limit = 21; // if non-zero, overrides the default store size limit (in bytes) for stores loaded during this request } message ProcessRangeResponse { diff --git a/reqctx/context.go b/reqctx/context.go index 65bf78a30..97a398eba 100644 --- a/reqctx/context.go +++ b/reqctx/context.go @@ -35,6 +35,7 @@ var activeRequestsHandlerKey = contextKeyType(12) var partialBlocksKey = contextKeyType(13) var ethCallFallbackToLatestDuration = contextKeyType(14) var ethCallUseBlockNumberDuration = contextKeyType(15) +var storeSizeLimitKey = contextKeyType(16) func WithSpkg(ctx context.Context, pkg *pbsubstreams.Package) context.Context { return context.WithValue(ctx, spkgKey, pkg) diff --git a/reqctx/storelimit.go b/reqctx/storelimit.go new file mode 100644 index 000000000..4ecc4f9b6 --- /dev/null +++ b/reqctx/storelimit.go @@ -0,0 +1,20 @@ +package reqctx + +import "context" + +// WithStoreSizeLimit stores a per-request store size limit override in the context. +// When non-zero, this value overrides the default store.StoreSizeLimit for stores +// loaded during a tier2 request. +func WithStoreSizeLimit(ctx context.Context, limit uint64) context.Context { + return context.WithValue(ctx, storeSizeLimitKey, limit) +} + +// StoreSizeLimit returns the per-request store size limit from the context, or 0 +// if no override was set (meaning the default store.StoreSizeLimit should be used). +func StoreSizeLimit(ctx context.Context) uint64 { + limit, ok := ctx.Value(storeSizeLimitKey).(uint64) + if !ok { + return 0 + } + return limit +} diff --git a/service/tier1.go b/service/tier1.go index 8e31f98b4..7c5e471be 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -726,7 +726,7 @@ func (s *Tier1Service) blocks( return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, quickSaveStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock) + storeConfigs, err := store.NewConfigMap(cacheStore, quickSaveStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock, 0) if err != nil { return fmt.Errorf("configuring stores: %w", err) } diff --git a/service/tier1_test.go b/service/tier1_test.go index c19014983..f89cfcac4 100644 --- a/service/tier1_test.go +++ b/service/tier1_test.go @@ -23,6 +23,7 @@ func createTestStoreConfig(name string, initialBlock uint64, mockStore dstore.St "string", mockStore, nil, // quickSaveStore + 0, // storeSizeLimit: use global default ) } diff --git a/service/tier2.go b/service/tier2.go index dd59e6d23..08b3a52a1 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -217,6 +217,9 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s if request.EthCallFallbackToNumberDuration > 0 { ctx = reqctx.WithEthCallUseBlockNumberDuration(ctx, time.Duration(request.EthCallFallbackToNumberDuration)) } + if request.StoreSizeLimit != 0 { + ctx = reqctx.WithStoreSizeLimit(ctx, request.StoreSizeLimit) + } stage := request.OutputModule logger := reqctx.Logger(ctx).Named("tier2").With(zap.String("output_module", stage), zap.Uint64("segment_number", request.SegmentNumber)) @@ -397,7 +400,8 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, nil, execGraph.Stores(), execGraph.ModuleHashes(), request.FirstStreamableBlock) + storeSizeLimit := reqctx.StoreSizeLimit(ctx) + storeConfigs, err := store.NewConfigMap(cacheStore, nil, execGraph.Stores(), execGraph.ModuleHashes(), request.FirstStreamableBlock, storeSizeLimit) if err != nil { return fmt.Errorf("configuring stores: %w", err) } diff --git a/storage/store/config.go b/storage/store/config.go index 042ab5a24..8c334240c 100644 --- a/storage/store/config.go +++ b/storage/store/config.go @@ -40,6 +40,7 @@ func NewConfig( valueType string, store dstore.Store, quickSaveStore dstore.Store, + storeSizeLimit uint64, ) (*Config, error) { subStore, err := store.SubStore(fmt.Sprintf("%s/states", moduleHash)) if err != nil { @@ -67,8 +68,13 @@ func NewConfig( moduleInitialBlock: moduleInitialBlock, moduleHash: moduleHash, appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024, - totalSizeLimit: StoreSizeLimit, - itemSizeLimit: 10_485_760, // 10MiB + totalSizeLimit: func() uint64 { + if storeSizeLimit != 0 { + return storeSizeLimit + } + return StoreSizeLimit + }(), + itemSizeLimit: 10_485_760, // 10MiB }, nil } diff --git a/storage/store/configmap.go b/storage/store/configmap.go index dfc579ed4..4a7341953 100644 --- a/storage/store/configmap.go +++ b/storage/store/configmap.go @@ -9,7 +9,9 @@ import ( type ConfigMap map[string]*Config -func NewConfigMap(baseObjectStore, quickSaveStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes map[string]string, firstStreamableBlock uint64) (out ConfigMap, err error) { +// NewConfigMap creates a ConfigMap for the given store modules. +// storeSizeLimit, if non-zero, overrides the default StoreSizeLimit for all stores in this map. +func NewConfigMap(baseObjectStore, quickSaveStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes map[string]string, firstStreamableBlock uint64, storeSizeLimit uint64) (out ConfigMap, err error) { out = make(ConfigMap) for _, storeModule := range storeModules { initialBlock := max(firstStreamableBlock, storeModule.InitialBlock) @@ -21,6 +23,7 @@ func NewConfigMap(baseObjectStore, quickSaveStore dstore.Store, storeModules []* storeModule.GetKindStore().ValueType, baseObjectStore, quickSaveStore, + storeSizeLimit, ) if err != nil { return nil, fmt.Errorf("new store config for %q: %w", storeModule.Name, err) diff --git a/storage/store/init_test.go b/storage/store/init_test.go index 39c7211ac..ae2544ef2 100644 --- a/storage/store/init_test.go +++ b/storage/store/init_test.go @@ -25,7 +25,7 @@ func newTestBaseStore( appendLimit = 10 } - config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store, nil) + config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store, nil, 0) config.appendLimit = appendLimit config.totalSizeLimit = 9999 config.itemSizeLimit = 10_485_760 diff --git a/tools/analytics_store_stats.go b/tools/analytics_store_stats.go index 191104a7d..e1b1f5ec4 100644 --- a/tools/analytics_store_stats.go +++ b/tools/analytics_store_stats.go @@ -116,6 +116,7 @@ func StoreStatsE(cmd *cobra.Command, args []string) error { module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.ValueType, baseDStore, nil, + 0, ) if err != nil { zlog.Error("creating store config", zap.Error(err)) diff --git a/tools/check.go b/tools/check.go index 6b83f9e6c..bc9bb032d 100644 --- a/tools/check.go +++ b/tools/check.go @@ -63,7 +63,7 @@ func newStore(storeURL string) (*store2.FullKV, dstore.Store, error) { return nil, nil, fmt.Errorf("could not create store from %s: %w", storeURL, err) } - config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore, nil) + config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore, nil, 0) if err != nil { return nil, nil, err } diff --git a/tools/decode.go b/tools/decode.go index 5da24a777..2e1c377b7 100644 --- a/tools/decode.go +++ b/tools/decode.go @@ -477,7 +477,7 @@ func printStateModule( stateStore dstore.Store, protoFiles []*descriptorpb.FileDescriptorProto, ) error { - config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, nil) + config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, nil, 0) if err != nil { return fmt.Errorf("initializing store config module %q: %w", module.Name, err) } @@ -530,7 +530,7 @@ func searchStateModule( stateStore dstore.Store, protoFiles []*descriptorpb.FileDescriptorProto, ) error { - config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, nil) + config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, nil, 0) if err != nil { return fmt.Errorf("initializing store config module %q: %w", module.Name, err) } diff --git a/tools/module.go b/tools/module.go index fb203da63..b030b0177 100644 --- a/tools/module.go +++ b/tools/module.go @@ -120,6 +120,7 @@ func moduleRunE(cmd *cobra.Command, args []string) error { module.GetKindStore().ValueType, stateStore, nil, + 0, ) cli.NoError(err, "unable to create store config") diff --git a/wasm/call_test.go b/wasm/call_test.go index c6009d3a8..f9205a66d 100644 --- a/wasm/call_test.go +++ b/wasm/call_test.go @@ -482,7 +482,7 @@ func Test_CallStoreOps(t *testing.T) { func newTestCall(updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy, valueType string) *Call { myStore := dstore.NewMockStore(nil) - storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore, nil) + storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore, nil, 0) if err != nil { panic("failed") }