From 65f9131d42e0b0d73f3ca148f2ea1753fb5d599b Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Tue, 7 Oct 2025 11:07:21 +0200 Subject: [PATCH 1/2] Initial impl for committer's notification service Signed-off-by: Marcus Brandenburger --- .../core/committer/notify/notify_test.go | 94 ++++++++++++ platform/fabricx/core/finality/config.go | 38 +++++ platform/fabricx/core/finality/grpc.go | 71 +++++++++ .../fabricx/core/finality/listenermanager.go | 57 ++++--- platform/fabricx/core/finality/nlm.go | 141 ++++++++++++++++++ 5 files changed, 372 insertions(+), 29 deletions(-) create mode 100644 platform/fabricx/core/committer/notify/notify_test.go create mode 100644 platform/fabricx/core/finality/config.go create mode 100644 platform/fabricx/core/finality/grpc.go create mode 100644 platform/fabricx/core/finality/nlm.go diff --git a/platform/fabricx/core/committer/notify/notify_test.go b/platform/fabricx/core/committer/notify/notify_test.go new file mode 100644 index 000000000..f6b4a134d --- /dev/null +++ b/platform/fabricx/core/committer/notify/notify_test.go @@ -0,0 +1,94 @@ +package notify + +import ( + "fmt" + "testing" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/vault/queryservice" + "github.com/hyperledger/fabric-x-committer/api/protonotify" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestNotificationService(t *testing.T) { + + table := []struct { + name string + cfg map[string]any + checks func(t *testing.T, client *grpc.ClientConn, err error) + }{ + { + name: "connect", + cfg: map[string]any{ + "queryService.queryTimeout": 10 * time.Second, + "queryService.Endpoints": []any{ + map[string]any{ + "address": "localhost:5411", + "connectionTimeout": 0 * time.Second, + }, + }, + }, + checks: func(t *testing.T, client *grpc.ClientConn, err error) { + t.Helper() + require.NotNil(t, client) + require.NoError(t, err) + + nf := protonotify.NewNotifierClient(client) + + notifyStream, err := nf.OpenNotificationStream(t.Context()) + require.NoError(t, err) + + txIDs := make([]string, 1) + err = notifyStream.Send(&protonotify.NotificationRequest{ + TxStatusRequest: &protonotify.TxStatusRequest{ + TxIds: txIDs, + }, + Timeout: durationpb.New(3 * time.Minute), + }) + require.NoError(t, err) + + var actual []*protonotify.TxStatusEvent + require.EventuallyWithT(t, func(ct *assert.CollectT) { + res, err := notifyStream.Recv() + require.NoError(t, err) + require.NotNil(t, res) + require.Nil(t, res.TimeoutTxIds) + actual = append(actual, res.TxStatusEvents...) + //test.RequireProtoElementsMatch(ct, expected, actual) + }, 15*time.Second, 50*time.Millisecond) + }, + }, + } + + for _, tc := range table { + t.Run(fmt.Sprintf("grpcClient %v", tc.name), func(t *testing.T) { + t.Parallel() + cs := newConfigService(tc.cfg) + c, err := queryservice.NewConfig(cs) + client, err := queryservice.GrpcClient(c) + require.NoError(t, err) + tc.checks(t, client, err) + }) + } + +} + +type configService struct { + V *viper.Viper +} + +func newConfigService(c map[string]any) *configService { + v := viper.New() + for k, val := range c { + v.Set(k, val) + } + return &configService{V: v} +} + +func (c configService) UnmarshalKey(key string, rawVal interface{}) error { + return c.V.UnmarshalKey(key, rawVal) +} diff --git a/platform/fabricx/core/finality/config.go b/platform/fabricx/core/finality/config.go new file mode 100644 index 000000000..402ccf555 --- /dev/null +++ b/platform/fabricx/core/finality/config.go @@ -0,0 +1,38 @@ +package finality + +import ( + "fmt" + "time" +) + +const DefaultRequestTimeout = 30 * time.Second + +type Config struct { + Endpoints []Endpoint `yaml:"endpoints,omitempty"` + RequestTimeout time.Duration `yaml:"queryTimeout,omitempty"` +} + +type Endpoint struct { + Address string `yaml:"address,omitempty"` + ConnectionTimeout time.Duration `yaml:"connectionTimeout,omitempty"` + TLSEnabled bool `yaml:"tlsEnabled,omitempty"` + TLSRootCertFile string `yaml:"tlsRootCertFile,omitempty"` + TLSServerNameOverride string `yaml:"tlsServerNameOverride,omitempty"` +} + +type ConfigService interface { + UnmarshalKey(key string, rawVal interface{}) error +} + +func NewConfig(configService ConfigService) (*Config, error) { + config := &Config{ + RequestTimeout: DefaultRequestTimeout, + } + + err := configService.UnmarshalKey("notify", &config) + if err != nil { + return config, fmt.Errorf("cannot get notify service config: %w", err) + } + + return config, nil +} diff --git a/platform/fabricx/core/finality/grpc.go b/platform/fabricx/core/finality/grpc.go new file mode 100644 index 000000000..99a37ff98 --- /dev/null +++ b/platform/fabricx/core/finality/grpc.go @@ -0,0 +1,71 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package finality + +import ( + "errors" + "fmt" + "os" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +var ErrInvalidAddress = fmt.Errorf("empty address") + +func GrpcClient(c *Config) (*grpc.ClientConn, error) { + // no endpoints in config + if len(c.Endpoints) != 1 { + return nil, fmt.Errorf("we need a single query service endpoint") + } + + // currently we only support connections to a single query service + endpoint := c.Endpoints[0] + + // check endpoint address + if len(endpoint.Address) == 0 { + return nil, ErrInvalidAddress + } + + var opts []grpc.DialOption + opts = append(opts, WithConnectionTime(endpoint.ConnectionTimeout)) + opts = append(opts, WithTLS(endpoint)) + + return grpc.NewClient(endpoint.Address, opts...) +} + +func WithTLS(endpoint Endpoint) grpc.DialOption { + if !endpoint.TLSEnabled { + return grpc.WithTransportCredentials(insecure.NewCredentials()) + } + + if _, err := os.Stat(endpoint.TLSRootCertFile); errors.Is(err, os.ErrNotExist) { + if err != nil { + panic(err) + } + } + + creds, err := credentials.NewClientTLSFromFile(endpoint.TLSRootCertFile, endpoint.TLSServerNameOverride) + if err != nil { + panic(err) + } + + return grpc.WithTransportCredentials(creds) +} + +func WithConnectionTime(timeout time.Duration) grpc.DialOption { + if timeout <= 0 { + timeout = DefaultRequestTimeout + } + return grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: timeout, + }) +} diff --git a/platform/fabricx/core/finality/listenermanager.go b/platform/fabricx/core/finality/listenermanager.go index 7d1febe0a..3f629f184 100644 --- a/platform/fabricx/core/finality/listenermanager.go +++ b/platform/fabricx/core/finality/listenermanager.go @@ -7,16 +7,15 @@ SPDX-License-Identifier: Apache-2.0 package finality import ( + "context" "reflect" "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors" "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" - "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/driver/config" - "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/events" - "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/finality" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services" + "github.com/hyperledger/fabric-x-committer/api/protonotify" ) type ListenerManager interface { @@ -47,42 +46,42 @@ func (p *listenerManagerProvider) NewManager(network, channel string) (ListenerM return nil, err } - ch, err := nw.Channel(channel) - if err != nil { - return nil, err - } + //ch, err := nw.Channel(channel) + //if err != nil { + // return nil, err + //} cfg, err := p.configProvider.GetConfig(nw.Name()) if err != nil { return nil, err } - switch cfg.GetString("network.finality.type") { - case "delivery": - return finality.NewDeliveryFLM(logging.MustGetLogger("delivery-flm"), events.DeliveryListenerManagerConfig{ - MapperParallelism: cfg.GetInt("network.finality.delivery.mapperParallelism"), - BlockProcessParallelism: cfg.GetInt("network.finality.delivery.blockProcessParallelism"), - ListenerTimeout: cfg.GetDuration("network.finality.delivery.listenerTimeout"), - LRUSize: cfg.GetInt("network.finality.delivery.lruSize"), - LRUBuffer: cfg.GetInt("network.finality.delivery.lruBuffer"), - }, nw.Name(), ch) - case "committer": - case "": - return &committerListenerManager{committer: ch.Committer()}, nil - } - panic("unknown finality type") + return newNotifi(cfg) } -type committerListenerManager struct { - committer *fabric.Committer -} +func newNotifi(cfg config.ConfigService) (*notificationListenerManager, error) { -func (m *committerListenerManager) AddFinalityListener(txID driver.TxID, listener fabric.FinalityListener) error { - return m.committer.AddFinalityListener(txID, listener) -} + c, err := NewConfig(cfg) + if err != nil { + return nil, err + } + + cc, err := GrpcClient(c) + if err != nil { + return nil, err + } -func (m *committerListenerManager) RemoveFinalityListener(txID string, listener fabric.FinalityListener) error { - return m.committer.RemoveFinalityListener(txID, listener) + notifyClient := protonotify.NewNotifierClient(cc) + notifyStream, err := notifyClient.OpenNotificationStream(context.TODO()) + if err != nil { + return nil, err + } + return ¬ificationListenerManager{ + notifyStream: notifyStream, + requestQueue: make(chan *protonotify.NotificationRequest, 1), + responseQueue: make(chan *protonotify.NotificationResponse, 1), + handlers: make(map[string][]fabric.FinalityListener), + }, nil } func GetListenerManager(sp services.Provider, network, channel string) (ListenerManager, error) { diff --git a/platform/fabricx/core/finality/nlm.go b/platform/fabricx/core/finality/nlm.go new file mode 100644 index 000000000..2bae800ae --- /dev/null +++ b/platform/fabricx/core/finality/nlm.go @@ -0,0 +1,141 @@ +package finality + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + "github.com/hyperledger/fabric-x-committer/api/protonotify" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/types/known/durationpb" +) + +var logger = logging.MustGetLogger() + +type notificationListenerManager struct { + notifyStream protonotify.Notifier_OpenNotificationStreamClient + requestQueue chan *protonotify.NotificationRequest + responseQueue chan *protonotify.NotificationResponse + + handlers map[driver.TxID][]fabric.FinalityListener + lock sync.RWMutex +} + +func (n *notificationListenerManager) Listen(ctx context.Context) error { + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + var resp *protonotify.NotificationResponse + + select { + case <-gCtx.Done(): + case resp = <-n.responseQueue: + } + + // TODO: + _ = resp.GetTimeoutTxIds() + + for _, r := range resp.TxStatusEvents { + txID := r.GetTxId() + //status := r.GetStatusWithHeight().GetCode() + + n.lock.Lock() + handlers, ok := n.handlers[txID] + if !ok { + // nobody registered + n.lock.Unlock() + continue + } + // delete + delete(n.handlers, txID) + n.lock.Unlock() + + // now it is time to call the handlers + logger.Warnf("calling handlers for txID=%v", txID) + for _, h := range handlers { + // TODO fix status + h.OnStatus(gCtx, txID, 1, "TODO") + } + } + return nil + }) + + return g.Wait() +} + +func (n *notificationListenerManager) OpenNotificationStream(stream protonotify.Notifier_OpenNotificationStreamClient) error { + g, gCtx := errgroup.WithContext(stream.Context()) + + g.Go(func() error { + for gCtx.Err() == nil { + res, err := stream.Recv() + if err != nil { + return err + } + select { + case <-gCtx.Done(): + return gCtx.Err() + case n.responseQueue <- res: + } + } + return gCtx.Err() + }) + + g.Go(func() error { + for gCtx.Err() == nil { + var req *protonotify.NotificationRequest + select { + case <-gCtx.Done(): + case req = <-n.requestQueue: + } + + if err := stream.Send(req); err != nil { + return err + } + } + return gCtx.Err() + }) + + return g.Wait() +} + +func (n *notificationListenerManager) AddFinalityListener(txID driver.TxID, listener fabric.FinalityListener) error { + if listener == nil { + return errors.New("listener nil") + } + + n.lock.Lock() + defer n.lock.Unlock() + + handlers := n.handlers[txID] + n.handlers[txID] = append(handlers, listener) + + if len(handlers) > 1 { + logger.Warnf("callback for txID=%v already exists", txID) + + // there is someone already requested a notification for this txID + return nil + } + + logger.Warnf("register new callback for txID=%v", txID) + + // this is our first listener registered for the given txID + txIDs := []string{txID} + n.requestQueue <- &protonotify.NotificationRequest{ + TxStatusRequest: &protonotify.TxStatusRequest{ + TxIds: txIDs, + }, + Timeout: durationpb.New(3 * time.Minute), + } + + return nil +} + +func (n *notificationListenerManager) RemoveFinalityListener(txID string, listener fabric.FinalityListener) error { + // TODO: implement me + return nil +} From 910bc93ff33a31285fcbd06d2d3c7b5ef67a3abb Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Thu, 23 Oct 2025 14:16:47 +0200 Subject: [PATCH 2/2] fixup! Initial impl for committer's notification service Signed-off-by: Marcus Brandenburger --- .../nwo/fabricx/extensions/scv2/ext.go | 1 + .../extensions/scv2/notificationservice.go | 84 +++++++++++ .../core/committer/notify/notify_test.go | 9 +- platform/fabricx/core/finality/config.go | 10 +- platform/fabricx/core/finality/grpc.go | 2 +- .../fabricx/core/finality/listenermanager.go | 29 ++-- platform/fabricx/core/finality/nlm.go | 134 +++++++++++------- 7 files changed, 195 insertions(+), 74 deletions(-) create mode 100644 integration/nwo/fabricx/extensions/scv2/notificationservice.go diff --git a/integration/nwo/fabricx/extensions/scv2/ext.go b/integration/nwo/fabricx/extensions/scv2/ext.go index 78fe6b9ea..70a292f36 100644 --- a/integration/nwo/fabricx/extensions/scv2/ext.go +++ b/integration/nwo/fabricx/extensions/scv2/ext.go @@ -59,6 +59,7 @@ func (e *Extension) CheckTopology() { func (e *Extension) GenerateArtifacts() { generateQSExtension(e.network) + generateNSExtension(e.network) } func (e *Extension) PostRun(load bool) { diff --git a/integration/nwo/fabricx/extensions/scv2/notificationservice.go b/integration/nwo/fabricx/extensions/scv2/notificationservice.go new file mode 100644 index 000000000..cf15aafc8 --- /dev/null +++ b/integration/nwo/fabricx/extensions/scv2/notificationservice.go @@ -0,0 +1,84 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package scv2 + +import ( + "bytes" + "errors" + "fmt" + "html/template" + "io" + "time" + + api2 "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/api" + "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fabricx/network" + "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fsc" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/finality" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/grpc" +) + +// generateNSExtensions adds the committers notification service information to the config +func generateNSExtension(n *network.Network) { + context := n.Context + + fscTop, ok := context.TopologyByName("fsc").(*fsc.Topology) + if !ok { + utils.Must(errors.New("cannot get fsc topo instance")) + } + + // TODO set correct values + notificationServiceHost := "localhost" + notificationServicePort := 5417 + + // TODO: most of this logic should go somewhere + + config := finality.Config{ + RequestTimeout: 10 * time.Second, + Endpoints: []finality.Endpoint{ + { + Address: fmt.Sprintf("%s:%v", notificationServiceHost, notificationServicePort), + ConnectionTimeout: grpc.DefaultConnectionTimeout, + TLSEnabled: false, + TLSRootCertFile: n.CACertsBundlePath(), + }, + }, + } + + t, err := template.New("view_extension").Funcs(template.FuncMap{ + "NetworkName": func() string { return n.Topology().Name() }, + "RequestTimeout": func() time.Duration { return config.RequestTimeout }, + "Endpoints": func() []finality.Endpoint { return config.Endpoints }, + }).Parse(nsExtensionTemplate) + utils.Must(err) + + extension := bytes.NewBuffer([]byte{}) + err = t.Execute(io.MultiWriter(extension), nil) + utils.Must(err) + + for _, fscNode := range fscTop.Nodes { + // TODO: find the correct SC instance to connect ... + + logger.Infof(">>> %v", fscNode) + for _, uniqueName := range fscNode.ReplicaUniqueNames() { + context.AddExtension(uniqueName, api2.FabricExtension, extension.String()) + } + } +} + +const nsExtensionTemplate = ` +fabric: + {{ NetworkName }}: + notificationService: + requestTimeout: {{ RequestTimeout }} + endpoints:{{- range Endpoints }} + - address: {{ .Address }} + connectionTimeout: {{ .ConnectionTimeout }} + tlsEnabled: {{ .TLSEnabled }} + tlsRootCertFile: {{ .TLSRootCertFile }} + {{- end }} +` diff --git a/platform/fabricx/core/committer/notify/notify_test.go b/platform/fabricx/core/committer/notify/notify_test.go index f6b4a134d..829260151 100644 --- a/platform/fabricx/core/committer/notify/notify_test.go +++ b/platform/fabricx/core/committer/notify/notify_test.go @@ -1,3 +1,9 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + package notify import ( @@ -42,7 +48,7 @@ func TestNotificationService(t *testing.T) { notifyStream, err := nf.OpenNotificationStream(t.Context()) require.NoError(t, err) - txIDs := make([]string, 1) + txIDs := []string{"1"} err = notifyStream.Send(&protonotify.NotificationRequest{ TxStatusRequest: &protonotify.TxStatusRequest{ TxIds: txIDs, @@ -69,6 +75,7 @@ func TestNotificationService(t *testing.T) { t.Parallel() cs := newConfigService(tc.cfg) c, err := queryservice.NewConfig(cs) + require.NoError(t, err) client, err := queryservice.GrpcClient(c) require.NoError(t, err) tc.checks(t, client, err) diff --git a/platform/fabricx/core/finality/config.go b/platform/fabricx/core/finality/config.go index 402ccf555..6035c3054 100644 --- a/platform/fabricx/core/finality/config.go +++ b/platform/fabricx/core/finality/config.go @@ -1,3 +1,9 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + package finality import ( @@ -9,7 +15,7 @@ const DefaultRequestTimeout = 30 * time.Second type Config struct { Endpoints []Endpoint `yaml:"endpoints,omitempty"` - RequestTimeout time.Duration `yaml:"queryTimeout,omitempty"` + RequestTimeout time.Duration `yaml:"requestTimeout,omitempty"` } type Endpoint struct { @@ -29,7 +35,7 @@ func NewConfig(configService ConfigService) (*Config, error) { RequestTimeout: DefaultRequestTimeout, } - err := configService.UnmarshalKey("notify", &config) + err := configService.UnmarshalKey("notificationService", &config) if err != nil { return config, fmt.Errorf("cannot get notify service config: %w", err) } diff --git a/platform/fabricx/core/finality/grpc.go b/platform/fabricx/core/finality/grpc.go index 99a37ff98..72ae50cec 100644 --- a/platform/fabricx/core/finality/grpc.go +++ b/platform/fabricx/core/finality/grpc.go @@ -23,7 +23,7 @@ var ErrInvalidAddress = fmt.Errorf("empty address") func GrpcClient(c *Config) (*grpc.ClientConn, error) { // no endpoints in config if len(c.Endpoints) != 1 { - return nil, fmt.Errorf("we need a single query service endpoint") + return nil, fmt.Errorf("we need a single endpoint") } // currently we only support connections to a single query service diff --git a/platform/fabricx/core/finality/listenermanager.go b/platform/fabricx/core/finality/listenermanager.go index 3f629f184..d138c3493 100644 --- a/platform/fabricx/core/finality/listenermanager.go +++ b/platform/fabricx/core/finality/listenermanager.go @@ -20,12 +20,12 @@ import ( type ListenerManager interface { AddFinalityListener(txID driver.TxID, listener fabric.FinalityListener) error - RemoveFinalityListener(txID driver.TxID, listener fabric.FinalityListener) error + Listen(ctx context.Context) error } type ListenerManagerProvider interface { - NewManager(network, channel string) (ListenerManager, error) + NewManager(ctx context.Context, network, channel string) (ListenerManager, error) } func NewListenerManagerProvider(fnsp *fabric.NetworkServiceProvider, configProvider config.Provider) ListenerManagerProvider { @@ -40,27 +40,21 @@ type listenerManagerProvider struct { configProvider config.Provider } -func (p *listenerManagerProvider) NewManager(network, channel string) (ListenerManager, error) { +func (p *listenerManagerProvider) NewManager(ctx context.Context, network, channel string) (ListenerManager, error) { nw, err := p.fnsp.FabricNetworkService(network) if err != nil { return nil, err } - //ch, err := nw.Channel(channel) - //if err != nil { - // return nil, err - //} - cfg, err := p.configProvider.GetConfig(nw.Name()) if err != nil { return nil, err } - return newNotifi(cfg) + return newNotifi(ctx, cfg) } -func newNotifi(cfg config.ConfigService) (*notificationListenerManager, error) { - +func newNotifi(ctx context.Context, cfg config.ConfigService) (*notificationListenerManager, error) { c, err := NewConfig(cfg) if err != nil { return nil, err @@ -72,22 +66,25 @@ func newNotifi(cfg config.ConfigService) (*notificationListenerManager, error) { } notifyClient := protonotify.NewNotifierClient(cc) - notifyStream, err := notifyClient.OpenNotificationStream(context.TODO()) + notifyStream, err := notifyClient.OpenNotificationStream(ctx) if err != nil { return nil, err } - return ¬ificationListenerManager{ + + nlm := ¬ificationListenerManager{ notifyStream: notifyStream, requestQueue: make(chan *protonotify.NotificationRequest, 1), responseQueue: make(chan *protonotify.NotificationResponse, 1), handlers: make(map[string][]fabric.FinalityListener), - }, nil + } + + return nlm, nil } -func GetListenerManager(sp services.Provider, network, channel string) (ListenerManager, error) { +func GetListenerManager(ctx context.Context, sp services.Provider, network, channel string) (ListenerManager, error) { lmp, err := sp.GetService(reflect.TypeOf((*ListenerManagerProvider)(nil))) if err != nil { return nil, errors.Wrapf(err, "could not find provider") } - return lmp.(ListenerManagerProvider).NewManager(network, channel) + return lmp.(ListenerManagerProvider).NewManager(ctx, network, channel) } diff --git a/platform/fabricx/core/finality/nlm.go b/platform/fabricx/core/finality/nlm.go index 2bae800ae..9278c8008 100644 --- a/platform/fabricx/core/finality/nlm.go +++ b/platform/fabricx/core/finality/nlm.go @@ -1,3 +1,9 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + package finality import ( @@ -9,6 +15,8 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + fdriver "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" + "github.com/hyperledger/fabric-x-committer/api/protoblocktx" "github.com/hyperledger/fabric-x-committer/api/protonotify" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/durationpb" @@ -25,84 +33,104 @@ type notificationListenerManager struct { lock sync.RWMutex } -func (n *notificationListenerManager) Listen(ctx context.Context) error { - g, gCtx := errgroup.WithContext(ctx) +func (n *notificationListenerManager) Listen(_ context.Context) error { + g, gCtx := errgroup.WithContext(n.notifyStream.Context()) + // spawn stream receiver g.Go(func() error { - var resp *protonotify.NotificationResponse - - select { - case <-gCtx.Done(): - case resp = <-n.responseQueue: - } - - // TODO: - _ = resp.GetTimeoutTxIds() - - for _, r := range resp.TxStatusEvents { - txID := r.GetTxId() - //status := r.GetStatusWithHeight().GetCode() - - n.lock.Lock() - handlers, ok := n.handlers[txID] - if !ok { - // nobody registered - n.lock.Unlock() - continue + for { + res, err := n.notifyStream.Recv() + if err != nil { + return err } - // delete - delete(n.handlers, txID) - n.lock.Unlock() - // now it is time to call the handlers - logger.Warnf("calling handlers for txID=%v", txID) - for _, h := range handlers { - // TODO fix status - h.OnStatus(gCtx, txID, 1, "TODO") + select { + case <-gCtx.Done(): + return gCtx.Err() + case n.responseQueue <- res: } } - return nil }) - return g.Wait() -} - -func (n *notificationListenerManager) OpenNotificationStream(stream protonotify.Notifier_OpenNotificationStreamClient) error { - g, gCtx := errgroup.WithContext(stream.Context()) - + // spawn stream sender g.Go(func() error { - for gCtx.Err() == nil { - res, err := stream.Recv() - if err != nil { - return err - } + var req *protonotify.NotificationRequest + for { select { case <-gCtx.Done(): return gCtx.Err() - case n.responseQueue <- res: + case req = <-n.requestQueue: + } + + if err := n.notifyStream.Send(req); err != nil { + return err } } - return gCtx.Err() }) + // spawn notification dispatcher g.Go(func() error { - for gCtx.Err() == nil { - var req *protonotify.NotificationRequest + var resp *protonotify.NotificationResponse + for { select { case <-gCtx.Done(): - case req = <-n.requestQueue: + return gCtx.Err() + case resp = <-n.responseQueue: } - if err := stream.Send(req); err != nil { - return err + res := parseResponse(resp) + + n.lock.Lock() + for txID, v := range res { + handlers, ok := n.handlers[txID] + if !ok { + // nobody registered + continue + } + // delete + delete(n.handlers, txID) + + // now it is time to call the handlers + for _, h := range handlers { + h.OnStatus(gCtx, txID, v, "") + } } + n.lock.Unlock() } - return gCtx.Err() }) return g.Wait() } +func parseResponse(resp *protonotify.NotificationResponse) map[string]int { + res := make(map[string]int) + + // first parse all timeouts + for _, txID := range resp.GetTimeoutTxIds() { + res[txID] = fdriver.Unknown + } + + var s int + // next we parse the status events + for _, r := range resp.GetTxStatusEvents() { + txID := r.GetTxId() + status := r.GetStatusWithHeight() + + switch status.GetCode() { + case protoblocktx.Status_COMMITTED: + s = fdriver.Valid + case protoblocktx.Status_NOT_VALIDATED: + s = fdriver.Unknown + default: + s = fdriver.Invalid + } + + res[txID] = s + } + + return res +} + func (n *notificationListenerManager) AddFinalityListener(txID driver.TxID, listener fabric.FinalityListener) error { if listener == nil { return errors.New("listener nil") @@ -116,20 +144,18 @@ func (n *notificationListenerManager) AddFinalityListener(txID driver.TxID, list if len(handlers) > 1 { logger.Warnf("callback for txID=%v already exists", txID) - // there is someone already requested a notification for this txID return nil } - logger.Warnf("register new callback for txID=%v", txID) - // this is our first listener registered for the given txID txIDs := []string{txID} n.requestQueue <- &protonotify.NotificationRequest{ TxStatusRequest: &protonotify.TxStatusRequest{ TxIds: txIDs, }, - Timeout: durationpb.New(3 * time.Minute), + // TODO: set a proper timeout + Timeout: durationpb.New(10 * time.Second), } return nil