diff --git a/authmailbox/client.go b/authmailbox/client.go index 64d74bf70..854ae725d 100644 --- a/authmailbox/client.go +++ b/authmailbox/client.go @@ -202,9 +202,6 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, msgChan chan<- *ReceivedMessages, acctKey keychain.KeyDescriptor, filter MessageFilter) (*receiveSubscription, error) { - var receiverKey [33]byte - copy(receiverKey[:], acctKey.PubKey.SerializeCompressed()) - // Before we can expect to receive any updates, we need to perform the // 3-way authentication handshake. sub := newReceiveSubscription(c.cfg, msgChan, acctKey, filter, c.client) diff --git a/authmailbox/client_test.go b/authmailbox/client_test.go index 88f9d793e..5aec694b7 100644 --- a/authmailbox/client_test.go +++ b/authmailbox/client_test.go @@ -157,7 +157,9 @@ func TestServerClientAuthAndRestart(t *testing.T) { // We also add a multi-subscription to the same two keys, so we can make // sure we can receive messages from multiple clients at once. - multiSub := NewMultiSubscription(*clientCfg) + multiSub := NewMultiSubscription(MultiSubscriptionConfig{ + BaseClientConfig: *clientCfg, + }) err := multiSub.Subscribe( ctx, url.URL{Host: clientCfg.ServerAddress}, clientKey1, filter, ) diff --git a/authmailbox/multi_subscription.go b/authmailbox/multi_subscription.go index e224ed2bd..8f6cdc867 100644 --- a/authmailbox/multi_subscription.go +++ b/authmailbox/multi_subscription.go @@ -5,12 +5,20 @@ import ( "fmt" "net/url" "sync" + "time" "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/fn" lfn "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/keychain" ) +const ( + // DefaultTimeout is the default timeout we use for RPC and database + // operations. + DefaultTimeout = 30 * time.Second +) + // clientSubscriptions holds the subscriptions and cancel functions for a // specific mailbox client. type clientSubscriptions struct { @@ -26,18 +34,105 @@ type clientSubscriptions struct { cancels map[asset.SerializedKey]context.CancelFunc } +// clientRegistry is a thread-safe registry for managing mailbox clients. +// It encapsulates the clients map and provides a safe API for accessing +// and modifying client subscriptions. +type clientRegistry struct { + sync.RWMutex + + // clients holds the active mailbox clients, keyed by their server URL. + clients map[url.URL]*clientSubscriptions +} + +// newClientRegistry creates a new client registry instance. +func newClientRegistry() *clientRegistry { + return &clientRegistry{ + clients: make(map[url.URL]*clientSubscriptions), + } +} + +// Get retrieves an existing client or creates a new one if it doesn't +// exist. It returns the client and a boolean indicating whether the client +// was newly created. +func (r *clientRegistry) Get(serverURL url.URL, + cfgCopy ClientConfig) (*clientSubscriptions, bool, error) { + + r.Lock() + defer r.Unlock() + + client, ok := r.clients[serverURL] + if ok { + return client, false, nil + } + + // Create a new client connection. + cfgCopy.ServerAddress = serverURL.Host + mboxClient := NewClient(&cfgCopy) + + client = &clientSubscriptions{ + client: mboxClient, + subscriptions: make( + map[asset.SerializedKey]ReceiveSubscription, + ), + cancels: make( + map[asset.SerializedKey]context.CancelFunc, + ), + } + r.clients[serverURL] = client + + return client, true, nil +} + +// RemoveClient removes a client from the registry. +func (r *clientRegistry) RemoveClient(serverURL url.URL) { + r.Lock() + defer r.Unlock() + + delete(r.clients, serverURL) +} + +// AddSubscription adds a subscription and its cancel function to a client. If +// the client does not exist, an error is returned. +func (r *clientRegistry) AddSubscription(serverURL url.URL, + key asset.SerializedKey, subscription ReceiveSubscription, + cancel context.CancelFunc) error { + + r.Lock() + defer r.Unlock() + + client, ok := r.clients[serverURL] + if !ok { + return fmt.Errorf("no client found for %s", serverURL.String()) + } + + client.subscriptions[key] = subscription + client.cancels[key] = cancel + + return nil +} + +// ForEach executes a function for each client in the registry. The function +// receives a copy of the client subscriptions to avoid holding the lock +// during potentially long operations. +func (r *clientRegistry) ForEach(fn func(*clientSubscriptions)) { + r.RLock() + defer r.RUnlock() + + for _, client := range r.clients { + fn(client) + } +} + // MultiSubscription is a subscription manager that can handle multiple mailbox // clients, allowing subscriptions to different accounts across different // mailbox servers. It manages subscriptions and message queues for each client // and provides a unified interface for receiving messages. type MultiSubscription struct { - // baseClientConfig holds the basic configuration for the mailbox - // clients. All fields except the ServerAddress are used to create - // new mailbox clients when needed. - baseClientConfig ClientConfig + // cfg holds the configuration for the MultiSubscription instance. + cfg MultiSubscriptionConfig - // clients holds the active mailbox clients, keyed by their server URL. - clients map[url.URL]*clientSubscriptions + // registry manages the active mailbox clients in a thread-safe manner. + registry *clientRegistry // msgQueue is the concurrent queue that holds received messages from // all subscriptions across all clients. This allows for a unified @@ -45,63 +140,123 @@ type MultiSubscription struct { // subscribed account, regardless of which mailbox server it belongs to. msgQueue *lfn.ConcurrentQueue[*ReceivedMessages] - sync.RWMutex + // ContextGuard provides a wait group and main quit channel that can be + // used to create guarded contexts. + *fn.ContextGuard +} + +// MultiSubscriptionConfig holds the configuration parameters for creating a +// MultiSubscription instance. +type MultiSubscriptionConfig struct { + // baseClientConfig holds the basic configuration for the mailbox + // clients. All fields except the ServerAddress are used to create + // new mailbox clients when needed. + BaseClientConfig ClientConfig + + // FallbackMboxURLs are fallback proof courier AuthMailbox services. + FallbackMboxURLs []url.URL } // NewMultiSubscription creates a new MultiSubscription instance. -func NewMultiSubscription(baseClientConfig ClientConfig) *MultiSubscription { +func NewMultiSubscription(cfg MultiSubscriptionConfig) *MultiSubscription { queue := lfn.NewConcurrentQueue[*ReceivedMessages](lfn.DefaultQueueSize) queue.Start() return &MultiSubscription{ - baseClientConfig: baseClientConfig, - clients: make(map[url.URL]*clientSubscriptions), - msgQueue: queue, + cfg: cfg, + registry: newClientRegistry(), + msgQueue: queue, + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, } } -// Subscribe adds a new subscription for the specified client URL and receiver -// key. It starts a new mailbox client if one does not already exist for the -// given URL. The subscription will receive messages that match the provided -// filter and will send them to the shared message queue. -func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL, - receiverKey keychain.KeyDescriptor, filter MessageFilter) error { +// Subscribe adds a subscription for the given client URL and receiver key. +// It launches a goroutine to asynchronously establish any fallback +// subscriptions. +func (m *MultiSubscription) Subscribe(ctx context.Context, + primaryServerURL url.URL, receiverKey keychain.KeyDescriptor, + filter MessageFilter) error { - // We hold the mutex for access to common resources. - m.Lock() - cfgCopy := m.baseClientConfig - client, ok := m.clients[serverURL] + // Attempt to subscribe to all fallback mailbox servers in parallel and + // in a non-blocking manner. + m.Goroutine(func() error { + errMap, err := fn.ParSliceErrCollect( + ctx, m.cfg.FallbackMboxURLs, + func(ctx context.Context, serverURL url.URL) error { + return m.establishSubscription( + ctx, serverURL, receiverKey, filter, + ) + }, + ) + if err != nil { + return fmt.Errorf("parallel subscription attempt "+ + "failed: %w", err) + } - // If this is the first time we're seeing a server URL, we first create - // a network connection to the mailbox server. - if !ok { - cfgCopy.ServerAddress = serverURL.Host - - mboxClient := NewClient(&cfgCopy) - client = &clientSubscriptions{ - client: mboxClient, - subscriptions: make( - map[asset.SerializedKey]ReceiveSubscription, - ), - cancels: make( - map[asset.SerializedKey]context.CancelFunc, - ), + for idx, subErr := range errMap { + serverURL := m.cfg.FallbackMboxURLs[idx] + + log.ErrorS(ctx, "Subscription to fallback server "+ + "failed", subErr, "server_addr", + serverURL.String()) } - m.clients[serverURL] = client - err := mboxClient.Start() + return nil + }, func(err error) { + log.ErrorS(ctx, "Fallback server subscription goroutine "+ + "exited with error", err) + }) + + // Subscribe to the primary mailbox server in a blocking manner. This + // ensures that we have at least one active subscription before + // returning. + err := m.establishSubscription( + ctx, primaryServerURL, receiverKey, filter, + ) + if err != nil { + return fmt.Errorf("primary server subscription failed: %w", err) + } + + return nil +} + +// establishSubscription synchronously subscribes to a server. +// It creates a mailbox client for the URL if none exists. +// The subscription routes messages matching the filter to the shared queue. +func (m *MultiSubscription) establishSubscription(ctx context.Context, + serverURL url.URL, receiverKey keychain.KeyDescriptor, + filter MessageFilter) error { + + // Get or create a client for the given server URL. This call is + // thread-safe and will handle locking internally. + cfgCopy := m.cfg.BaseClientConfig + client, isNewClient, err := m.registry.Get(serverURL, cfgCopy) + if err != nil { + return err + } + + // Start the mailbox client if it's not already started. This is safe to + // do without holding any locks since the client itself manages its own + // state. + if isNewClient { + log.Debugf("Starting new mailbox client for %s", + serverURL.String()) + + err = client.client.Start() if err != nil { - m.Unlock() - return fmt.Errorf("unable to create mailbox client: %w", + // Remove the client from the map if we failed to start + // it. + m.registry.RemoveClient(serverURL) + return fmt.Errorf("unable to start mailbox client: %w", err) } } - // We release the lock here again, because StartAccountSubscription - // might block for a while, and we don't want to hold the lock - // unnecessarily long. - m.Unlock() - + // Start the subscription. We don't hold any locks during this call + // since StartAccountSubscription might block for a while. ctx, cancel := context.WithCancel(ctx) subscription, err := client.client.StartAccountSubscription( ctx, m.msgQueue.ChanIn(), receiverKey, filter, @@ -112,13 +267,15 @@ func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL, err) } - // We hold the lock again to safely add the subscription and cancel - // function to the client's maps. - m.Lock() + // Add the subscription and cancel function to the client's maps. + // This is thread-safe and handled internally by the registry. key := asset.ToSerialized(receiverKey.PubKey) - client.subscriptions[key] = subscription - client.cancels[key] = cancel - m.Unlock() + err = m.registry.AddSubscription(serverURL, key, subscription, cancel) + if err != nil { + cancel() + return fmt.Errorf("unable to add subscription to registry: %w", + err) + } return nil } @@ -138,11 +295,10 @@ func (m *MultiSubscription) Stop() error { log.Info("Stopping all mailbox clients and subscriptions...") - m.RLock() - defer m.RUnlock() - var lastErr error - for _, client := range m.clients { + + // Iterate through all clients in a thread-safe manner and stop them. + m.registry.ForEach(func(client *clientSubscriptions) { for _, cancel := range client.cancels { cancel() } @@ -160,7 +316,7 @@ func (m *MultiSubscription) Stop() error { log.Errorf("Error stopping client: %v", err) lastErr = err } - } + }) return lastErr } diff --git a/proof/courier.go b/proof/courier.go index 7e463fa25..ab74cb475 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -267,6 +267,24 @@ func ValidateCourierAddress(addr *url.URL) error { } } +// ParseMboxCourierAddress parses and validates that the given address is a +// valid authmailbox+universerpc courier address. +func ParseMboxCourierAddress(addrStr string) (url.URL, error) { + var zero url.URL + + addr, err := ParseCourierAddress(addrStr) + if err != nil { + return zero, err + } + + if addr.Scheme != AuthMailboxUniRpcCourierType { + return zero, fmt.Errorf("expected authmailbox+universerpc "+ + "scheme, got %s", addr.Scheme) + } + + return *addr, nil +} + // ProofMailbox represents an abstract store-and-forward mailbox that can be // used to send/receive proofs. type ProofMailbox interface { @@ -1202,6 +1220,29 @@ type UniverseRpcCourierCfg struct { // a courier service to handle our outgoing request during a connection // attempt, or when delivering or retrieving a proof. ServiceRequestTimeout time.Duration `long:"servicerequestimeout" description:"The maximum duration we'll wait for a courier service to handle our outgoing request during a connection attempt, or when delivering or retrieving a proof."` + + // FallbackMboxURLStrings specifies additional proof courier + // AuthMailbox services using the authmailbox+universerpc:// scheme. The + // daemon subscribes to receive events from each fallback service in + // addition to any proof courier URL specified in a Taproot Asset + // address. Repeat the flag to add multiple entries. + FallbackMboxURLStrings []string `long:"fallbackauthmailboxurl" description:"Fallback proof courier AuthMailbox service URLs using the authmailbox+universerpc:// scheme. The daemon subscribes to receive events from each fallback service in addition to any proof courier URL specified in a Taproot Asset address. Repeat to add multiple."` +} + +// Validate validates the universe RPC courier configuration. +func (c *UniverseRpcCourierCfg) Validate() error { + // Check that all fallback auth mailbox URLs are valid. + for idx := range c.FallbackMboxURLStrings { + urlStr := c.FallbackMboxURLStrings[idx] + + _, err := ParseMboxCourierAddress(urlStr) + if err != nil { + return fmt.Errorf("invalid fallback authmailbox "+ + "proof courier address: %w", err) + } + } + + return nil } // UniverseRpcCourier is a universe RPC proof courier service handle. It diff --git a/proof/courier_test.go b/proof/courier_test.go index ce014a803..f3dbd59b7 100644 --- a/proof/courier_test.go +++ b/proof/courier_test.go @@ -146,3 +146,61 @@ func TestCheckUniverseRpcCourierConnection(t *testing.T) { }) } } + +// TestParseCourierAddress tests the ParseCourierAddress function with various +// inputs. +func TestParseCourierAddress(t *testing.T) { + tests := []struct { + name string + addr string + expectErr string + }{ + { + name: "valid hashmail address", + addr: "hashmail://example.com:443", + }, + { + name: "valid universerpc address", + addr: "universerpc://example.com:10029", + }, + { + name: "valid authmailbox+universerpc address", + addr: "authmailbox+universerpc://example.com:10029", + }, + { + name: "valid mockcourier address", + addr: "mockcourier://example.com:8080", + }, + { + name: "invalid address no scheme", + addr: "example.com:443", + expectErr: "proof courier URI address port unspecified", + }, + { + name: "invalid address no port", + addr: "hashmail://example.com", + expectErr: "proof courier URI address port unspecified", + }, + { + name: "unsupported scheme", + addr: "http://example.com:443", + expectErr: "unknown courier address protocol", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parsedURL, err := ParseCourierAddress(tt.addr) + + if tt.expectErr != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.expectErr) + require.Nil(t, parsedURL) + return + } + + require.NoError(t, err) + require.NotNil(t, parsedURL) + }) + } +} diff --git a/sample-tapd.conf b/sample-tapd.conf index 862184396..82c4f47f3 100644 --- a/sample-tapd.conf +++ b/sample-tapd.conf @@ -76,6 +76,12 @@ ; a proof. ; universerpccourier.servicerequestimeout=5s +; Fallback proof courier AuthMailbox services using the +; authmailbox+universerpc:// scheme. The daemon subscribes to receive events +; from each fallback service in addition to any proof courier URL specified in a +; Taproot Asset address. Repeat the flag to add multiple entries. +; universerpccourier.fallbackauthmailboxurl= + ; Network to run on (mainnet, regtest, testnet, testnet4, simnet, signet) ; network=testnet diff --git a/tapcfg/config.go b/tapcfg/config.go index 76f5adb00..0a985c793 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -963,6 +963,12 @@ func ValidateConfig(cfg Config, cfgLogger btclog.Logger) (*Config, error) { "range of 0.00 to 1.00") } + err = cfg.UniverseRpcCourier.Validate() + if err != nil { + return nil, fmt.Errorf("invalid universe rpc courier config: "+ + "%w", err) + } + // All good, return the sanitized result. return &cfg, nil } diff --git a/tapcfg/server.go b/tapcfg/server.go index ef4b51ba7..df5b183c2 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/binary" "fmt" + "net/url" "github.com/btcsuite/btclog/v2" "github.com/davecgh/go-spew/spew" @@ -348,6 +349,21 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, } } + // Parse fallback authmailbox proof courier addresses into URLs. + fallbackMboxUrlStrings := cfg.UniverseRpcCourier.FallbackMboxURLStrings + fallbackMboxURLs := make([]url.URL, 0, len(fallbackMboxUrlStrings)) + for idx := range fallbackMboxUrlStrings { + urlStr := fallbackMboxUrlStrings[idx] + + parsedURL, err := proof.ParseMboxCourierAddress(urlStr) + if err != nil { + return nil, fmt.Errorf("unable to parse fallback "+ + "authmailbox URL: %w", err) + } + + fallbackMboxURLs = append(fallbackMboxURLs, parsedURL) + } + reOrgWatcher := tapgarden.NewReOrgWatcher(&tapgarden.ReOrgWatcherConfig{ ChainBridge: chainBridge, GroupVerifier: groupVerifier, @@ -722,6 +738,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ErrChan: mainErrChan, ProofCourierDispatcher: proofCourierDispatcher, MboxBackoffCfg: cfg.UniverseRpcCourier.BackoffCfg, + FallbackMboxURLs: fallbackMboxURLs, ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay, ProofWatcher: reOrgWatcher, IgnoreChecker: ignoreCheckerOpt, diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 146819770..23908054e 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net/url" "strings" "sync" "sync/atomic" @@ -144,6 +145,9 @@ type CustodianConfig struct { // for testing. MboxInsecure bool + // FallbackMboxURLs are fallback proof courier AuthMailbox services. + FallbackMboxURLs []url.URL + // ProofRetrievalDelay is the time duration the custodian waits having // identified an asset transfer on-chain and before retrieving the // corresponding proof via the proof courier service. @@ -231,12 +235,16 @@ func NewCustodian(cfg *CustodianConfig) *Custodian { statusEventsSubs: statusEventsSubs, events: make(map[wire.OutPoint]*address.Event), mboxSubscriptions: mbox.NewMultiSubscription( - mbox.ClientConfig{ - Insecure: cfg.MboxInsecure, - SkipTlsVerify: !cfg.MboxInsecure, - Signer: cfg.Signer, - MinBackoff: backoffCfg.InitialBackoff, - MaxBackoff: backoffCfg.MaxBackoff, + mbox.MultiSubscriptionConfig{ + BaseClientConfig: mbox.ClientConfig{ + Insecure: cfg.MboxInsecure, + SkipTlsVerify: !cfg.MboxInsecure, + + Signer: cfg.Signer, + MinBackoff: backoffCfg.InitialBackoff, + MaxBackoff: backoffCfg.MaxBackoff, + }, + FallbackMboxURLs: cfg.FallbackMboxURLs, }, ), ContextGuard: &fn.ContextGuard{