From 0e233387d7ad9f054e394525caef3908eab9221d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 17:25:22 +0000 Subject: [PATCH 1/4] Initial plan From 70f3a66ddec4015f46f01ab3ac753776c5911b58 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 17:49:25 +0000 Subject: [PATCH 2/4] Add LastValidBlockTimestamp to BlockUndoSignal messages Agent-Logs-Url: https://github.com/streamingfast/substreams/sessions/946c1b7d-643c-49d1-8107-94d423ec869a Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- pb/sf/substreams/rpc/v2/service.pb.go | 90 ++++--- pb/sf/substreams/rpc/v2/service_grpc.pb.go | 12 +- pb/sf/substreams/rpc/v2/service_vtproto.pb.go | 224 +++++++++++++++--- pipeline/pipeline.go | 4 + pipeline/process_block.go | 15 +- pipeline/process_block_test.go | 16 +- proto/sf/substreams/rpc/v2/service.proto | 2 + 7 files changed, 279 insertions(+), 84 deletions(-) diff --git a/pb/sf/substreams/rpc/v2/service.pb.go b/pb/sf/substreams/rpc/v2/service.pb.go index ed8980918..e8d39b419 100644 --- a/pb/sf/substreams/rpc/v2/service.pb.go +++ b/pb/sf/substreams/rpc/v2/service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 -// protoc (unknown) +// protoc-gen-go v1.36.11 +// protoc v3.21.12 // source: sf/substreams/rpc/v2/service.proto package pbsubstreamsrpcv2 @@ -12,6 +12,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -416,11 +417,12 @@ func (*Response_DebugSnapshotComplete) isResponse_Message() {} // with a block number above 'last_valid_block' has been reverted // on-chain. Delete that data and restart from 'last_valid_cursor' type BlockUndoSignal struct { - state protoimpl.MessageState `protogen:"open.v1"` - LastValidBlock *v1.BlockRef `protobuf:"bytes,1,opt,name=last_valid_block,json=lastValidBlock,proto3" json:"last_valid_block,omitempty"` - LastValidCursor string `protobuf:"bytes,2,opt,name=last_valid_cursor,json=lastValidCursor,proto3" json:"last_valid_cursor,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + LastValidBlock *v1.BlockRef `protobuf:"bytes,1,opt,name=last_valid_block,json=lastValidBlock,proto3" json:"last_valid_block,omitempty"` + LastValidCursor string `protobuf:"bytes,2,opt,name=last_valid_cursor,json=lastValidCursor,proto3" json:"last_valid_cursor,omitempty"` + LastValidBlockTimestamp *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_valid_block_timestamp,json=lastValidBlockTimestamp,proto3" json:"last_valid_block_timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *BlockUndoSignal) Reset() { @@ -467,6 +469,13 @@ func (x *BlockUndoSignal) GetLastValidCursor() string { return "" } +func (x *BlockUndoSignal) GetLastValidBlockTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.LastValidBlockTimestamp + } + return nil +} + type BlockScopedData struct { state protoimpl.MessageState `protogen:"open.v1"` Output *MapModuleOutput `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` @@ -1682,7 +1691,7 @@ var File_sf_substreams_rpc_v2_service_proto protoreflect.FileDescriptor const file_sf_substreams_rpc_v2_service_proto_rawDesc = "" + "\n" + - "\"sf/substreams/rpc/v2/service.proto\x12\x14sf.substreams.rpc.v2\x1a\x19google/protobuf/any.proto\x1a\x1dsf/firehose/v2/firehose.proto\x1a\x1csf/substreams/v1/clock.proto\x1a\x1esf/substreams/v1/modules.proto\"\xf1\x04\n" + + "\"sf/substreams/rpc/v2/service.proto\x12\x14sf.substreams.rpc.v2\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dsf/firehose/v2/firehose.proto\x1a\x1csf/substreams/v1/clock.proto\x1a\x1esf/substreams/v1/modules.proto\"\xf1\x04\n" + "\aRequest\x12&\n" + "\x0fstart_block_num\x18\x01 \x01(\x03R\rstartBlockNum\x12!\n" + "\fstart_cursor\x18\x02 \x01(\tR\vstartCursor\x12$\n" + @@ -1708,10 +1717,11 @@ const file_sf_substreams_rpc_v2_service_proto_rawDesc = "" + "\x13debug_snapshot_data\x18\n" + " \x01(\v2).sf.substreams.rpc.v2.InitialSnapshotDataH\x00R\x11debugSnapshotData\x12g\n" + "\x17debug_snapshot_complete\x18\v \x01(\v2-.sf.substreams.rpc.v2.InitialSnapshotCompleteH\x00R\x15debugSnapshotCompleteB\t\n" + - "\amessage\"\x83\x01\n" + + "\amessage\"\xdc\x01\n" + "\x0fBlockUndoSignal\x12D\n" + "\x10last_valid_block\x18\x01 \x01(\v2\x1a.sf.substreams.v1.BlockRefR\x0elastValidBlock\x12*\n" + - "\x11last_valid_cursor\x18\x02 \x01(\tR\x0flastValidCursor\"\xaf\x04\n" + + "\x11last_valid_cursor\x18\x02 \x01(\tR\x0flastValidCursor\x12W\n" + + "\x1alast_valid_block_timestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x17lastValidBlockTimestamp\"\xaf\x04\n" + "\x0fBlockScopedData\x12=\n" + "\x06output\x18\x01 \x01(\v2%.sf.substreams.rpc.v2.MapModuleOutputR\x06output\x12-\n" + "\x05clock\x18\x02 \x01(\v2\x17.sf.substreams.v1.ClockR\x05clock\x12\x16\n" + @@ -1872,10 +1882,11 @@ var file_sf_substreams_rpc_v2_service_proto_goTypes = []any{ (*BlockRange)(nil), // 19: sf.substreams.rpc.v2.BlockRange (*v1.Modules)(nil), // 20: sf.substreams.v1.Modules (*v1.BlockRef)(nil), // 21: sf.substreams.v1.BlockRef - (*v1.Clock)(nil), // 22: sf.substreams.v1.Clock - (*anypb.Any)(nil), // 23: google.protobuf.Any - (*v2.InfoRequest)(nil), // 24: sf.firehose.v2.InfoRequest - (*v2.InfoResponse)(nil), // 25: sf.firehose.v2.InfoResponse + (*timestamppb.Timestamp)(nil), // 22: google.protobuf.Timestamp + (*v1.Clock)(nil), // 23: sf.substreams.v1.Clock + (*anypb.Any)(nil), // 24: google.protobuf.Any + (*v2.InfoRequest)(nil), // 25: sf.firehose.v2.InfoRequest + (*v2.InfoResponse)(nil), // 26: sf.firehose.v2.InfoResponse } var file_sf_substreams_rpc_v2_service_proto_depIdxs = []int32{ 20, // 0: sf.substreams.rpc.v2.Request.modules:type_name -> sf.substreams.v1.Modules @@ -1887,31 +1898,32 @@ var file_sf_substreams_rpc_v2_service_proto_depIdxs = []int32{ 7, // 6: sf.substreams.rpc.v2.Response.debug_snapshot_data:type_name -> sf.substreams.rpc.v2.InitialSnapshotData 6, // 7: sf.substreams.rpc.v2.Response.debug_snapshot_complete:type_name -> sf.substreams.rpc.v2.InitialSnapshotComplete 21, // 8: sf.substreams.rpc.v2.BlockUndoSignal.last_valid_block:type_name -> sf.substreams.v1.BlockRef - 8, // 9: sf.substreams.rpc.v2.BlockScopedData.output:type_name -> sf.substreams.rpc.v2.MapModuleOutput - 22, // 10: sf.substreams.rpc.v2.BlockScopedData.clock:type_name -> sf.substreams.v1.Clock - 8, // 11: sf.substreams.rpc.v2.BlockScopedData.debug_map_outputs:type_name -> sf.substreams.rpc.v2.MapModuleOutput - 9, // 12: sf.substreams.rpc.v2.BlockScopedData.debug_store_outputs:type_name -> sf.substreams.rpc.v2.StoreModuleOutput - 18, // 13: sf.substreams.rpc.v2.InitialSnapshotData.deltas:type_name -> sf.substreams.rpc.v2.StoreDelta - 23, // 14: sf.substreams.rpc.v2.MapModuleOutput.map_output:type_name -> google.protobuf.Any - 10, // 15: sf.substreams.rpc.v2.MapModuleOutput.debug_info:type_name -> sf.substreams.rpc.v2.OutputDebugInfo - 18, // 16: sf.substreams.rpc.v2.StoreModuleOutput.debug_store_deltas:type_name -> sf.substreams.rpc.v2.StoreDelta - 10, // 17: sf.substreams.rpc.v2.StoreModuleOutput.debug_info:type_name -> sf.substreams.rpc.v2.OutputDebugInfo - 14, // 18: sf.substreams.rpc.v2.ModulesProgress.running_jobs:type_name -> sf.substreams.rpc.v2.Job - 16, // 19: sf.substreams.rpc.v2.ModulesProgress.modules_stats:type_name -> sf.substreams.rpc.v2.ModuleStats - 15, // 20: sf.substreams.rpc.v2.ModulesProgress.stages:type_name -> sf.substreams.rpc.v2.Stage - 12, // 21: sf.substreams.rpc.v2.ModulesProgress.processed_bytes:type_name -> sf.substreams.rpc.v2.ProcessedBytes - 19, // 22: sf.substreams.rpc.v2.Stage.completed_ranges:type_name -> sf.substreams.rpc.v2.BlockRange - 17, // 23: sf.substreams.rpc.v2.ModuleStats.external_call_metrics:type_name -> sf.substreams.rpc.v2.ExternalCallMetric - 0, // 24: sf.substreams.rpc.v2.StoreDelta.operation:type_name -> sf.substreams.rpc.v2.StoreDelta.Operation - 1, // 25: sf.substreams.rpc.v2.Stream.Blocks:input_type -> sf.substreams.rpc.v2.Request - 24, // 26: sf.substreams.rpc.v2.EndpointInfo.Info:input_type -> sf.firehose.v2.InfoRequest - 2, // 27: sf.substreams.rpc.v2.Stream.Blocks:output_type -> sf.substreams.rpc.v2.Response - 25, // 28: sf.substreams.rpc.v2.EndpointInfo.Info:output_type -> sf.firehose.v2.InfoResponse - 27, // [27:29] is the sub-list for method output_type - 25, // [25:27] is the sub-list for method input_type - 25, // [25:25] is the sub-list for extension type_name - 25, // [25:25] is the sub-list for extension extendee - 0, // [0:25] is the sub-list for field type_name + 22, // 9: sf.substreams.rpc.v2.BlockUndoSignal.last_valid_block_timestamp:type_name -> google.protobuf.Timestamp + 8, // 10: sf.substreams.rpc.v2.BlockScopedData.output:type_name -> sf.substreams.rpc.v2.MapModuleOutput + 23, // 11: sf.substreams.rpc.v2.BlockScopedData.clock:type_name -> sf.substreams.v1.Clock + 8, // 12: sf.substreams.rpc.v2.BlockScopedData.debug_map_outputs:type_name -> sf.substreams.rpc.v2.MapModuleOutput + 9, // 13: sf.substreams.rpc.v2.BlockScopedData.debug_store_outputs:type_name -> sf.substreams.rpc.v2.StoreModuleOutput + 18, // 14: sf.substreams.rpc.v2.InitialSnapshotData.deltas:type_name -> sf.substreams.rpc.v2.StoreDelta + 24, // 15: sf.substreams.rpc.v2.MapModuleOutput.map_output:type_name -> google.protobuf.Any + 10, // 16: sf.substreams.rpc.v2.MapModuleOutput.debug_info:type_name -> sf.substreams.rpc.v2.OutputDebugInfo + 18, // 17: sf.substreams.rpc.v2.StoreModuleOutput.debug_store_deltas:type_name -> sf.substreams.rpc.v2.StoreDelta + 10, // 18: sf.substreams.rpc.v2.StoreModuleOutput.debug_info:type_name -> sf.substreams.rpc.v2.OutputDebugInfo + 14, // 19: sf.substreams.rpc.v2.ModulesProgress.running_jobs:type_name -> sf.substreams.rpc.v2.Job + 16, // 20: sf.substreams.rpc.v2.ModulesProgress.modules_stats:type_name -> sf.substreams.rpc.v2.ModuleStats + 15, // 21: sf.substreams.rpc.v2.ModulesProgress.stages:type_name -> sf.substreams.rpc.v2.Stage + 12, // 22: sf.substreams.rpc.v2.ModulesProgress.processed_bytes:type_name -> sf.substreams.rpc.v2.ProcessedBytes + 19, // 23: sf.substreams.rpc.v2.Stage.completed_ranges:type_name -> sf.substreams.rpc.v2.BlockRange + 17, // 24: sf.substreams.rpc.v2.ModuleStats.external_call_metrics:type_name -> sf.substreams.rpc.v2.ExternalCallMetric + 0, // 25: sf.substreams.rpc.v2.StoreDelta.operation:type_name -> sf.substreams.rpc.v2.StoreDelta.Operation + 1, // 26: sf.substreams.rpc.v2.Stream.Blocks:input_type -> sf.substreams.rpc.v2.Request + 25, // 27: sf.substreams.rpc.v2.EndpointInfo.Info:input_type -> sf.firehose.v2.InfoRequest + 2, // 28: sf.substreams.rpc.v2.Stream.Blocks:output_type -> sf.substreams.rpc.v2.Response + 26, // 29: sf.substreams.rpc.v2.EndpointInfo.Info:output_type -> sf.firehose.v2.InfoResponse + 28, // [28:30] is the sub-list for method output_type + 26, // [26:28] is the sub-list for method input_type + 26, // [26:26] is the sub-list for extension type_name + 26, // [26:26] is the sub-list for extension extendee + 0, // [0:26] is the sub-list for field type_name } func init() { file_sf_substreams_rpc_v2_service_proto_init() } diff --git a/pb/sf/substreams/rpc/v2/service_grpc.pb.go b/pb/sf/substreams/rpc/v2/service_grpc.pb.go index 50d06b884..72da01edc 100644 --- a/pb/sf/substreams/rpc/v2/service_grpc.pb.go +++ b/pb/sf/substreams/rpc/v2/service_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 -// - protoc (unknown) +// - protoc-gen-go-grpc v1.6.1 +// - protoc v3.21.12 // source: sf/substreams/rpc/v2/service.proto package pbsubstreamsrpcv2 @@ -72,7 +72,7 @@ type StreamServer interface { type UnimplementedStreamServer struct{} func (UnimplementedStreamServer) Blocks(*Request, grpc.ServerStreamingServer[Response]) error { - return status.Errorf(codes.Unimplemented, "method Blocks not implemented") + return status.Error(codes.Unimplemented, "method Blocks not implemented") } func (UnimplementedStreamServer) testEmbeddedByValue() {} @@ -84,7 +84,7 @@ type UnsafeStreamServer interface { } func RegisterStreamServer(s grpc.ServiceRegistrar, srv StreamServer) { - // If the following call pancis, it indicates UnimplementedStreamServer was + // If the following call panics, it indicates UnimplementedStreamServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -166,7 +166,7 @@ type EndpointInfoServer interface { type UnimplementedEndpointInfoServer struct{} func (UnimplementedEndpointInfoServer) Info(context.Context, *v2.InfoRequest) (*v2.InfoResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Info not implemented") + return nil, status.Error(codes.Unimplemented, "method Info not implemented") } func (UnimplementedEndpointInfoServer) testEmbeddedByValue() {} @@ -178,7 +178,7 @@ type UnsafeEndpointInfoServer interface { } func RegisterEndpointInfoServer(s grpc.ServiceRegistrar, srv EndpointInfoServer) { - // If the following call pancis, it indicates UnimplementedEndpointInfoServer was + // If the following call panics, it indicates UnimplementedEndpointInfoServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pb/sf/substreams/rpc/v2/service_vtproto.pb.go b/pb/sf/substreams/rpc/v2/service_vtproto.pb.go index a1f287add..1e1d50a55 100644 --- a/pb/sf/substreams/rpc/v2/service_vtproto.pb.go +++ b/pb/sf/substreams/rpc/v2/service_vtproto.pb.go @@ -8,10 +8,12 @@ import ( fmt "fmt" protohelpers "github.com/planetscale/vtprotobuf/protohelpers" anypb1 "github.com/planetscale/vtprotobuf/types/known/anypb" + timestamppb1 "github.com/planetscale/vtprotobuf/types/known/timestamppb" v1 "github.com/streamingfast/substreams/pb/sf/substreams/v1" proto "google.golang.org/protobuf/proto" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" io "io" ) @@ -33,11 +35,17 @@ func (m *Request) CloneVT() *Request { r.FinalBlocksOnly = m.FinalBlocksOnly r.ProductionMode = m.ProductionMode r.OutputModule = m.OutputModule - r.Modules = m.Modules.CloneVT() r.NoopMode = m.NoopMode r.LimitProcessedBlocks = m.LimitProcessedBlocks r.ProgressMessagesIntervalMs = m.ProgressMessagesIntervalMs r.PartialBlocks = m.PartialBlocks + if rhs := m.Modules; rhs != nil { + if vtpb, ok := interface{}(rhs).(interface{ CloneVT() *v1.Modules }); ok { + r.Modules = vtpb.CloneVT() + } else { + r.Modules = proto.Clone(rhs).(*v1.Modules) + } + } if rhs := m.DebugInitialStoreSnapshotForModules; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -146,8 +154,15 @@ func (m *BlockUndoSignal) CloneVT() *BlockUndoSignal { return (*BlockUndoSignal)(nil) } r := new(BlockUndoSignal) - r.LastValidBlock = m.LastValidBlock.CloneVT() r.LastValidCursor = m.LastValidCursor + r.LastValidBlockTimestamp = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.LastValidBlockTimestamp).CloneVT()) + if rhs := m.LastValidBlock; rhs != nil { + if vtpb, ok := interface{}(rhs).(interface{ CloneVT() *v1.BlockRef }); ok { + r.LastValidBlock = vtpb.CloneVT() + } else { + r.LastValidBlock = proto.Clone(rhs).(*v1.BlockRef) + } + } if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -165,11 +180,17 @@ func (m *BlockScopedData) CloneVT() *BlockScopedData { } r := new(BlockScopedData) r.Output = m.Output.CloneVT() - r.Clock = m.Clock.CloneVT() r.Cursor = m.Cursor r.FinalBlockHeight = m.FinalBlockHeight r.Attestation = m.Attestation r.IsPartial = m.IsPartial + if rhs := m.Clock; rhs != nil { + if vtpb, ok := interface{}(rhs).(interface{ CloneVT() *v1.Clock }); ok { + r.Clock = vtpb.CloneVT() + } else { + r.Clock = proto.Clone(rhs).(*v1.Clock) + } + } if rhs := m.DebugMapOutputs; rhs != nil { tmpContainer := make([]*MapModuleOutput, len(rhs)) for k, v := range rhs { @@ -593,7 +614,11 @@ func (this *Request) EqualVT(that *Request) bool { if this.OutputModule != that.OutputModule { return false } - if !this.Modules.EqualVT(that.Modules) { + if equal, ok := interface{}(this.Modules).(interface{ EqualVT(*v1.Modules) bool }); ok { + if !equal.EqualVT(that.Modules) { + return false + } + } else if !proto.Equal(this.Modules, that.Modules) { return false } if len(this.DebugInitialStoreSnapshotForModules) != len(that.DebugInitialStoreSnapshotForModules) { @@ -843,12 +868,19 @@ func (this *BlockUndoSignal) EqualVT(that *BlockUndoSignal) bool { } else if this == nil || that == nil { return false } - if !this.LastValidBlock.EqualVT(that.LastValidBlock) { + if equal, ok := interface{}(this.LastValidBlock).(interface{ EqualVT(*v1.BlockRef) bool }); ok { + if !equal.EqualVT(that.LastValidBlock) { + return false + } + } else if !proto.Equal(this.LastValidBlock, that.LastValidBlock) { return false } if this.LastValidCursor != that.LastValidCursor { return false } + if !(*timestamppb1.Timestamp)(this.LastValidBlockTimestamp).EqualVT((*timestamppb1.Timestamp)(that.LastValidBlockTimestamp)) { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -868,7 +900,11 @@ func (this *BlockScopedData) EqualVT(that *BlockScopedData) bool { if !this.Output.EqualVT(that.Output) { return false } - if !this.Clock.EqualVT(that.Clock) { + if equal, ok := interface{}(this.Clock).(interface{ EqualVT(*v1.Clock) bool }); ok { + if !equal.EqualVT(that.Clock) { + return false + } + } else if !proto.Equal(this.Clock, that.Clock) { return false } if this.Cursor != that.Cursor { @@ -1562,12 +1598,24 @@ func (m *Request) MarshalToSizedBufferVT(dAtA []byte) (int, error) { } } if m.Modules != nil { - size, err := m.Modules.MarshalToSizedBufferVT(dAtA[:i]) - if err != nil { - return 0, err + if vtmsg, ok := interface{}(m.Modules).(interface { + MarshalToSizedBufferVT([]byte) (int, error) + }); ok { + size, err := vtmsg.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + } else { + encoded, err := proto.Marshal(m.Modules) + if err != nil { + return 0, err + } + i -= len(encoded) + copy(dAtA[i:], encoded) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(encoded))) } - i -= size - i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) i-- dAtA[i] = 0x3a } @@ -1823,6 +1871,16 @@ func (m *BlockUndoSignal) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.LastValidBlockTimestamp != nil { + size, err := (*timestamppb1.Timestamp)(m.LastValidBlockTimestamp).MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x1a + } if len(m.LastValidCursor) > 0 { i -= len(m.LastValidCursor) copy(dAtA[i:], m.LastValidCursor) @@ -1831,12 +1889,24 @@ func (m *BlockUndoSignal) MarshalToSizedBufferVT(dAtA []byte) (int, error) { dAtA[i] = 0x12 } if m.LastValidBlock != nil { - size, err := m.LastValidBlock.MarshalToSizedBufferVT(dAtA[:i]) - if err != nil { - return 0, err + if vtmsg, ok := interface{}(m.LastValidBlock).(interface { + MarshalToSizedBufferVT([]byte) (int, error) + }); ok { + size, err := vtmsg.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + } else { + encoded, err := proto.Marshal(m.LastValidBlock) + if err != nil { + return 0, err + } + i -= len(encoded) + copy(dAtA[i:], encoded) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(encoded))) } - i -= size - i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) i-- dAtA[i] = 0xa } @@ -1942,12 +2012,24 @@ func (m *BlockScopedData) MarshalToSizedBufferVT(dAtA []byte) (int, error) { dAtA[i] = 0x1a } if m.Clock != nil { - size, err := m.Clock.MarshalToSizedBufferVT(dAtA[:i]) - if err != nil { - return 0, err + if vtmsg, ok := interface{}(m.Clock).(interface { + MarshalToSizedBufferVT([]byte) (int, error) + }); ok { + size, err := vtmsg.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + } else { + encoded, err := proto.Marshal(m.Clock) + if err != nil { + return 0, err + } + i -= len(encoded) + copy(dAtA[i:], encoded) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(encoded))) } - i -= size - i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) i-- dAtA[i] = 0x12 } @@ -2933,7 +3015,13 @@ func (m *Request) SizeVT() (n int) { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } if m.Modules != nil { - l = m.Modules.SizeVT() + if size, ok := interface{}(m.Modules).(interface { + SizeVT() int + }); ok { + l = size.SizeVT() + } else { + l = proto.Size(m.Modules) + } n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } if len(m.DebugInitialStoreSnapshotForModules) > 0 { @@ -3068,13 +3156,23 @@ func (m *BlockUndoSignal) SizeVT() (n int) { var l int _ = l if m.LastValidBlock != nil { - l = m.LastValidBlock.SizeVT() + if size, ok := interface{}(m.LastValidBlock).(interface { + SizeVT() int + }); ok { + l = size.SizeVT() + } else { + l = proto.Size(m.LastValidBlock) + } n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } l = len(m.LastValidCursor) if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.LastValidBlockTimestamp != nil { + l = (*timestamppb1.Timestamp)(m.LastValidBlockTimestamp).SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -3090,7 +3188,13 @@ func (m *BlockScopedData) SizeVT() (n int) { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } if m.Clock != nil { - l = m.Clock.SizeVT() + if size, ok := interface{}(m.Clock).(interface { + SizeVT() int + }); ok { + l = size.SizeVT() + } else { + l = proto.Size(m.Clock) + } n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } l = len(m.Cursor) @@ -3721,8 +3825,16 @@ func (m *Request) UnmarshalVT(dAtA []byte) error { if m.Modules == nil { m.Modules = &v1.Modules{} } - if err := m.Modules.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err + if unmarshal, ok := interface{}(m.Modules).(interface { + UnmarshalVT([]byte) error + }); ok { + if err := unmarshal.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + if err := proto.Unmarshal(dAtA[iNdEx:postIndex], m.Modules); err != nil { + return err + } } iNdEx = postIndex case 10: @@ -4288,8 +4400,16 @@ func (m *BlockUndoSignal) UnmarshalVT(dAtA []byte) error { if m.LastValidBlock == nil { m.LastValidBlock = &v1.BlockRef{} } - if err := m.LastValidBlock.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err + if unmarshal, ok := interface{}(m.LastValidBlock).(interface { + UnmarshalVT([]byte) error + }); ok { + if err := unmarshal.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + if err := proto.Unmarshal(dAtA[iNdEx:postIndex], m.LastValidBlock); err != nil { + return err + } } iNdEx = postIndex case 2: @@ -4324,6 +4444,42 @@ func (m *BlockUndoSignal) UnmarshalVT(dAtA []byte) error { } m.LastValidCursor = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastValidBlockTimestamp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LastValidBlockTimestamp == nil { + m.LastValidBlockTimestamp = ×tamppb.Timestamp{} + } + if err := (*timestamppb1.Timestamp)(m.LastValidBlockTimestamp).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -4443,8 +4599,16 @@ func (m *BlockScopedData) UnmarshalVT(dAtA []byte) error { if m.Clock == nil { m.Clock = &v1.Clock{} } - if err := m.Clock.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { - return err + if unmarshal, ok := interface{}(m.Clock).(interface { + UnmarshalVT([]byte) error + }); ok { + if err := unmarshal.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + if err := proto.Unmarshal(dAtA[iNdEx:postIndex], m.Clock); err != nil { + return err + } } iNdEx = postIndex case 3: diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 086a4ba37..a41217b30 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -37,6 +37,7 @@ import ( "go.opentelemetry.io/otel" "go.uber.org/zap" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" ) // moduleKey combines binary index and type to handle cases where the same binary is used with different runtime extensions (wasm-bindgen-shims) @@ -123,6 +124,8 @@ type Pipeline struct { forkHandler *ForkHandler insideReorgUpTo bstream.BlockRef + reversibleBlockTimestamps map[string]*timestamppb.Timestamp + execOutputCache *cache.Engine blockType string @@ -178,6 +181,7 @@ func New( execoutStorage: execoutStorage, forkHandler: NewForkHandler(), blockStepMap: make(map[bstream.StepType]uint64), + reversibleBlockTimestamps: make(map[string]*timestamppb.Timestamp), startTime: time.Now(), executionTimeout: executionTimeout, workerPoolFactory: workerPoolFactory, diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 3ae782dd2..d6d751d78 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -224,6 +224,7 @@ func (p *Pipeline) processBlock( func (p *Pipeline) handleStepStalled(clock *pbsubstreams.Clock) error { p.execOutputCache.HandleStalled(clock) p.forkHandler.removeReversibleOutput(clock.Id) + delete(p.reversibleBlockTimestamps, clock.Id) return nil } @@ -250,6 +251,7 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor, if err := p.forkHandler.handleUndo(clock); err != nil { return fmt.Errorf("reverting outputs: %w", err) } + delete(p.reversibleBlockTimestamps, clock.Id) if bstream.EqualsBlockRefs(p.insideReorgUpTo, reorgJunctionBlock) { return nil @@ -264,6 +266,7 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor, } targetClock := blockRefToPB(reorgJunctionBlock) + lastValidBlockTimestamp := p.reversibleBlockTimestamps[reorgJunctionBlock.ID()] p.lastProcessedBlockRef = reorgJunctionBlock p.lastCursor = targetCursor @@ -271,8 +274,9 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor, &pbsubstreamsrpc.Response{ Message: &pbsubstreamsrpc.Response_BlockUndoSignal{ BlockUndoSignal: &pbsubstreamsrpc.BlockUndoSignal{ - LastValidBlock: targetClock, - LastValidCursor: normalizedOpaqueCursor(*targetCursor), + LastValidBlock: targetClock, + LastValidCursor: normalizedOpaqueCursor(*targetCursor), + LastValidBlockTimestamp: lastValidBlockTimestamp, }, }, }) @@ -326,6 +330,7 @@ func (p *Pipeline) handleStepFinal(clock *pbsubstreams.Clock) error { return fmt.Errorf("exec output cache: handle final: %w", err) } p.forkHandler.removeReversibleOutput(clock.Id) + delete(p.reversibleBlockTimestamps, clock.Id) return nil } @@ -382,6 +387,9 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl // allow an 'undo' on this 'last' partial block to undo the whole thing -- must be run AFTER the executeModules() // important because we will lose all information on the partialProcessingState p.forkHandler.joinReversibleOutputs(clock, p.partialProcessingState.processedPartials) + if clock.Timestamp != nil { + p.reversibleBlockTimestamps[clock.Id] = clock.Timestamp + } p.partialProcessingState = nil p.previousLastPartialBlock = bstream.NewBlockRef(clock.Id, clock.Number) p.lastProcessedBlockRef = p.previousLastPartialBlock // acts as new @@ -435,6 +443,9 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, } p.insideReorgUpTo = nil + if !isFinalBlock && clock.Timestamp != nil { + p.reversibleBlockTimestamps[clock.Id] = clock.Timestamp + } reqDetails := reqctx.Details(ctx) if p.respFunc != nil { diff --git a/pipeline/process_block_test.go b/pipeline/process_block_test.go index 5f5c76bc9..2bd65157d 100644 --- a/pipeline/process_block_test.go +++ b/pipeline/process_block_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestHandleStepPartial(t *testing.T) { @@ -479,13 +480,14 @@ func setupTestPipeline(t *testing.T, executors [][]exec.ModuleExecutor) *Pipelin ctx := setupTestContext(t, "test_module", 200) pipe := &Pipeline{ - ctx: ctx, - StagedModuleExecutors: executors, - forkHandler: NewForkHandler(), - stores: &Stores{logger: zap.NewNop()}, - moduleNameToStage: make(map[string]int), - blockStepMap: make(map[bstream.StepType]uint64), - executionTimeout: 10000000000, // 10 seconds timeout for tests + ctx: ctx, + StagedModuleExecutors: executors, + forkHandler: NewForkHandler(), + stores: &Stores{logger: zap.NewNop()}, + moduleNameToStage: make(map[string]int), + blockStepMap: make(map[bstream.StepType]uint64), + reversibleBlockTimestamps: make(map[string]*timestamppb.Timestamp), + executionTimeout: 10000000000, // 10 seconds timeout for tests } // Build module name to stage mapping diff --git a/proto/sf/substreams/rpc/v2/service.proto b/proto/sf/substreams/rpc/v2/service.proto index fbaa383f2..bcf871ac4 100644 --- a/proto/sf/substreams/rpc/v2/service.proto +++ b/proto/sf/substreams/rpc/v2/service.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package sf.substreams.rpc.v2; import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; import "sf/firehose/v2/firehose.proto"; import "sf/substreams/v1/clock.proto"; import "sf/substreams/v1/modules.proto"; @@ -101,6 +102,7 @@ message Response { message BlockUndoSignal { sf.substreams.v1.BlockRef last_valid_block = 1; string last_valid_cursor = 2; + google.protobuf.Timestamp last_valid_block_timestamp = 3; } message BlockScopedData { From ccf7dcaa7493b63c0ea277b0e9c4ad8f1ccd4a01 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 17:52:46 +0000 Subject: [PATCH 3/4] Add debug log when reorg junction block timestamp is missing Agent-Logs-Url: https://github.com/streamingfast/substreams/sessions/946c1b7d-643c-49d1-8107-94d423ec869a Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- pipeline/process_block.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pipeline/process_block.go b/pipeline/process_block.go index d6d751d78..c3db22a01 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -267,6 +267,9 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor, targetClock := blockRefToPB(reorgJunctionBlock) lastValidBlockTimestamp := p.reversibleBlockTimestamps[reorgJunctionBlock.ID()] + if lastValidBlockTimestamp == nil { + reqctx.Logger(p.ctx).Debug("no timestamp found for reorg junction block, last_valid_block_timestamp will be unset in undo signal", zap.Stringer("reorg_junction_block", reorgJunctionBlock)) + } p.lastProcessedBlockRef = reorgJunctionBlock p.lastCursor = targetCursor From e6c862e994a8d3fac8da61ab67f2855acb4ee166 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 14 Apr 2026 17:37:40 +0000 Subject: [PATCH 4/4] Address review feedback: always assign timestamps, use Warn, use ok-idiom for missing entry Agent-Logs-Url: https://github.com/streamingfast/substreams/sessions/d82a5fdd-2db3-4ee7-8413-d2f5a11eaec4 Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- pipeline/process_block.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pipeline/process_block.go b/pipeline/process_block.go index c3db22a01..ba4ca088b 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -266,9 +266,9 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor, } targetClock := blockRefToPB(reorgJunctionBlock) - lastValidBlockTimestamp := p.reversibleBlockTimestamps[reorgJunctionBlock.ID()] - if lastValidBlockTimestamp == nil { - reqctx.Logger(p.ctx).Debug("no timestamp found for reorg junction block, last_valid_block_timestamp will be unset in undo signal", zap.Stringer("reorg_junction_block", reorgJunctionBlock)) + lastValidBlockTimestamp, found := p.reversibleBlockTimestamps[reorgJunctionBlock.ID()] + if !found { + reqctx.Logger(p.ctx).Warn("no timestamp found for reorg junction block, last_valid_block_timestamp will be unset in undo signal", zap.Stringer("reorg_junction_block", reorgJunctionBlock)) } p.lastProcessedBlockRef = reorgJunctionBlock @@ -390,9 +390,7 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl // allow an 'undo' on this 'last' partial block to undo the whole thing -- must be run AFTER the executeModules() // important because we will lose all information on the partialProcessingState p.forkHandler.joinReversibleOutputs(clock, p.partialProcessingState.processedPartials) - if clock.Timestamp != nil { - p.reversibleBlockTimestamps[clock.Id] = clock.Timestamp - } + p.reversibleBlockTimestamps[clock.Id] = clock.Timestamp p.partialProcessingState = nil p.previousLastPartialBlock = bstream.NewBlockRef(clock.Id, clock.Number) p.lastProcessedBlockRef = p.previousLastPartialBlock // acts as new @@ -446,7 +444,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, } p.insideReorgUpTo = nil - if !isFinalBlock && clock.Timestamp != nil { + if !isFinalBlock { p.reversibleBlockTimestamps[clock.Id] = clock.Timestamp } reqDetails := reqctx.Details(ctx)