Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/nwo/fabricx/extensions/scv2/ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (e *Extension) CheckTopology() {

func (e *Extension) GenerateArtifacts() {
generateQSExtension(e.network)
generateNSExtension(e.network)
}

func (e *Extension) PostRun(load bool) {
Expand Down
84 changes: 84 additions & 0 deletions integration/nwo/fabricx/extensions/scv2/notificationservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package scv2

import (
"bytes"
"errors"
"fmt"
"html/template"
"io"
"time"

api2 "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/api"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fabricx/network"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fsc"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/finality"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/grpc"
)

// generateNSExtensions adds the committers notification service information to the config
func generateNSExtension(n *network.Network) {
context := n.Context

fscTop, ok := context.TopologyByName("fsc").(*fsc.Topology)
if !ok {
utils.Must(errors.New("cannot get fsc topo instance"))
}

// TODO set correct values
notificationServiceHost := "localhost"
notificationServicePort := 5417

// TODO: most of this logic should go somewhere

config := finality.Config{
RequestTimeout: 10 * time.Second,
Endpoints: []finality.Endpoint{
{
Address: fmt.Sprintf("%s:%v", notificationServiceHost, notificationServicePort),
ConnectionTimeout: grpc.DefaultConnectionTimeout,
TLSEnabled: false,
TLSRootCertFile: n.CACertsBundlePath(),
},
},
}

t, err := template.New("view_extension").Funcs(template.FuncMap{
"NetworkName": func() string { return n.Topology().Name() },
"RequestTimeout": func() time.Duration { return config.RequestTimeout },
"Endpoints": func() []finality.Endpoint { return config.Endpoints },
}).Parse(nsExtensionTemplate)
utils.Must(err)

extension := bytes.NewBuffer([]byte{})
err = t.Execute(io.MultiWriter(extension), nil)
utils.Must(err)

for _, fscNode := range fscTop.Nodes {
// TODO: find the correct SC instance to connect ...

logger.Infof(">>> %v", fscNode)
for _, uniqueName := range fscNode.ReplicaUniqueNames() {
context.AddExtension(uniqueName, api2.FabricExtension, extension.String())
}
}
}

const nsExtensionTemplate = `
fabric:
{{ NetworkName }}:
notificationService:
requestTimeout: {{ RequestTimeout }}
endpoints:{{- range Endpoints }}
- address: {{ .Address }}
connectionTimeout: {{ .ConnectionTimeout }}
tlsEnabled: {{ .TLSEnabled }}
tlsRootCertFile: {{ .TLSRootCertFile }}
{{- end }}
`
101 changes: 101 additions & 0 deletions platform/fabricx/core/committer/notify/notify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package notify

import (
"fmt"
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabricx/core/vault/queryservice"
"github.com/hyperledger/fabric-x-committer/api/protonotify"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
)

func TestNotificationService(t *testing.T) {

table := []struct {
name string
cfg map[string]any
checks func(t *testing.T, client *grpc.ClientConn, err error)
}{
{
name: "connect",
cfg: map[string]any{
"queryService.queryTimeout": 10 * time.Second,
"queryService.Endpoints": []any{
map[string]any{
"address": "localhost:5411",
"connectionTimeout": 0 * time.Second,
},
},
},
checks: func(t *testing.T, client *grpc.ClientConn, err error) {
t.Helper()
require.NotNil(t, client)
require.NoError(t, err)

nf := protonotify.NewNotifierClient(client)

notifyStream, err := nf.OpenNotificationStream(t.Context())
require.NoError(t, err)

txIDs := []string{"1"}
err = notifyStream.Send(&protonotify.NotificationRequest{
TxStatusRequest: &protonotify.TxStatusRequest{
TxIds: txIDs,
},
Timeout: durationpb.New(3 * time.Minute),
})
require.NoError(t, err)

var actual []*protonotify.TxStatusEvent
require.EventuallyWithT(t, func(ct *assert.CollectT) {
res, err := notifyStream.Recv()
require.NoError(t, err)
require.NotNil(t, res)
require.Nil(t, res.TimeoutTxIds)
actual = append(actual, res.TxStatusEvents...)
//test.RequireProtoElementsMatch(ct, expected, actual)
}, 15*time.Second, 50*time.Millisecond)
},
},
}

for _, tc := range table {
t.Run(fmt.Sprintf("grpcClient %v", tc.name), func(t *testing.T) {
t.Parallel()
cs := newConfigService(tc.cfg)
c, err := queryservice.NewConfig(cs)
require.NoError(t, err)
client, err := queryservice.GrpcClient(c)
require.NoError(t, err)
tc.checks(t, client, err)
})
}

}

type configService struct {
V *viper.Viper
}

func newConfigService(c map[string]any) *configService {
v := viper.New()
for k, val := range c {
v.Set(k, val)
}
return &configService{V: v}
}

func (c configService) UnmarshalKey(key string, rawVal interface{}) error {
return c.V.UnmarshalKey(key, rawVal)
}
44 changes: 44 additions & 0 deletions platform/fabricx/core/finality/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package finality

import (
"fmt"
"time"
)

const DefaultRequestTimeout = 30 * time.Second

type Config struct {
Endpoints []Endpoint `yaml:"endpoints,omitempty"`
RequestTimeout time.Duration `yaml:"requestTimeout,omitempty"`
}

type Endpoint struct {
Address string `yaml:"address,omitempty"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout,omitempty"`
TLSEnabled bool `yaml:"tlsEnabled,omitempty"`
TLSRootCertFile string `yaml:"tlsRootCertFile,omitempty"`
TLSServerNameOverride string `yaml:"tlsServerNameOverride,omitempty"`
}

type ConfigService interface {
UnmarshalKey(key string, rawVal interface{}) error
}

func NewConfig(configService ConfigService) (*Config, error) {
config := &Config{
RequestTimeout: DefaultRequestTimeout,
}

err := configService.UnmarshalKey("notificationService", &config)
if err != nil {
return config, fmt.Errorf("cannot get notify service config: %w", err)
}

return config, nil
}
71 changes: 71 additions & 0 deletions platform/fabricx/core/finality/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package finality

import (
"errors"
"fmt"
"os"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

var ErrInvalidAddress = fmt.Errorf("empty address")

func GrpcClient(c *Config) (*grpc.ClientConn, error) {
// no endpoints in config
if len(c.Endpoints) != 1 {
return nil, fmt.Errorf("we need a single endpoint")
}

// currently we only support connections to a single query service
endpoint := c.Endpoints[0]

// check endpoint address
if len(endpoint.Address) == 0 {
return nil, ErrInvalidAddress
}

var opts []grpc.DialOption
opts = append(opts, WithConnectionTime(endpoint.ConnectionTimeout))
opts = append(opts, WithTLS(endpoint))

return grpc.NewClient(endpoint.Address, opts...)
}

func WithTLS(endpoint Endpoint) grpc.DialOption {
if !endpoint.TLSEnabled {
return grpc.WithTransportCredentials(insecure.NewCredentials())
}

if _, err := os.Stat(endpoint.TLSRootCertFile); errors.Is(err, os.ErrNotExist) {
if err != nil {
panic(err)
}
}

creds, err := credentials.NewClientTLSFromFile(endpoint.TLSRootCertFile, endpoint.TLSServerNameOverride)
if err != nil {
panic(err)
}

return grpc.WithTransportCredentials(creds)
}

func WithConnectionTime(timeout time.Duration) grpc.DialOption {
if timeout <= 0 {
timeout = DefaultRequestTimeout
}
return grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: timeout,
})
}
Loading
Loading