Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions client/mesh_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (

var (
errInvalidBondIndex = errors.New("invalid bond index")
errUnauthorized = errors.New("unauthorized")
)

// meshConnection represents a connection to a mesh peer.
Expand Down Expand Up @@ -134,7 +135,20 @@ func (m *meshConnection) broadcast(ctx context.Context, topic string, data []byt
Topic: topic,
Data: data,
}
return codec.WriteLengthPrefixedMessage(s, req)
if err := codec.WriteLengthPrefixedMessage(s, req); err != nil {
return fmt.Errorf("failed to send publish request: %w", err)
}

resp := &protocolsPb.Response{}
if err := codec.ReadLengthPrefixedMessage(s, resp); err != nil {
return fmt.Errorf("failed to read publish response: %w", err)
}

if respErr := resp.GetError(); respErr != nil {
return fmt.Errorf("publish failed: %s", respErr.GetMessage())
}

return nil
}

// unsubscribe unsubscribes from the provided topic.
Expand Down Expand Up @@ -176,7 +190,7 @@ func (m *meshConnection) postBondInternal(ctx context.Context, req *protocolsPb.

switch v := resp.Response.(type) {
case *protocolsPb.Response_Error:
switch v.Error.GetError().(type) {
switch rErr := v.Error.GetError().(type) {
case *protocolsPb.Error_PostBondError:
bondErr := v.Error.GetPostBondError()
err := m.bondInfo.RemoveBondAtIndex(bondErr.InvalidBondIndex)
Expand All @@ -191,8 +205,11 @@ func (m *meshConnection) postBondInternal(ctx context.Context, req *protocolsPb.
errMsg := v.Error.GetMessage()
return fmt.Errorf("%s: %v", hostID, errMsg)

case *protocolsPb.Error_Unauthorized:
return errUnauthorized

default:
return fmt.Errorf("%s: unknown error type %T", hostID, v)
return fmt.Errorf("%s: unknown error type %T", hostID, rErr)
}

case *protocolsPb.Response_PostBondResponse:
Expand All @@ -208,7 +225,7 @@ func (m *meshConnection) postBondInternal(ctx context.Context, req *protocolsPb.
// postBond posts the connection's bond, retrying on invalid bond index errors. This needs to be
// called before the connection's push stream is established.
func (m *meshConnection) postBond(ctx context.Context) error {
hostID := m.host.ID()
hostID := m.host.ID().ShortString()
for range maxPostBondRetries {
req, err := bond.PostBondReqFromBondInfo(m.bondInfo)
if err != nil {
Expand All @@ -225,6 +242,9 @@ func (m *meshConnection) postBond(ctx context.Context) error {
// Retry if an invalid bond index error is returned.
continue

case errors.Is(err, errUnauthorized):
return err

case errors.Is(err, context.DeadlineExceeded), errors.Is(err, context.Canceled):
return fmt.Errorf("%s: post bond retry cancelled due to context: %w", hostID, err)

Expand Down
19 changes: 18 additions & 1 deletion client/mesh_connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"math/rand/v2"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -188,6 +189,13 @@ func (m *meshConnectionManager) connectToAvailableNode(ctx context.Context) (mes

result := m.connectToNode(ctx, peerID)
if result.connectErr != nil {
if errors.Is(result.connectErr, errUnauthorized) {
m.log.Infof("%s: unauthorized", peerID.ShortString())
errCh := make(chan error, 1)
errCh <- result.connectErr
return nil, errCh, false
}

m.log.Errorf("Connection to %s failed: %v", peerID.ShortString(), result.connectErr)
continue
}
Expand Down Expand Up @@ -220,6 +228,10 @@ func (m *meshConnectionManager) run(ctx context.Context) {
m.setPrimaryConnection(conn)
runErrCh = errCh
} else {
// No available nodes - schedule retry with backoff
if errCh != nil {
runErrCh = errCh
}
reconnectTimer.Reset(backoff)
backoff *= 2
if backoff > maxReconnectDelay {
Expand All @@ -237,9 +249,14 @@ func (m *meshConnectionManager) run(ctx context.Context) {
refreshTimer.Reset(meshNodesRefreshInterval)
case <-reconnectTimer.C:
attemptConnect()
case <-runErrCh:
case err := <-runErrCh:
m.setPrimaryConnection(nil)
runErrCh = nil
if errors.Is(err, errUnauthorized) {
return
}

// Primary connection failed, immediately try alternatives
attemptConnect()
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/mesh_connection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func TestMeshConnectionManagerFailover(t *testing.T) {
return false
}
return c.remotePeerID() == expected
}, 2*time.Second, 10*time.Millisecond, "primary connection not set to %s", expected)
}, 5*time.Second, 10*time.Millisecond, "primary connection not set to %s", expected)
}

waitForPrimary(node1ID)

requireEventually(t, func() bool {
addrs := h.Peerstore().Addrs(node2ID)
return len(addrs) > 0
}, 2*time.Second, 10*time.Millisecond, "peerstore missing addresses for node 2")
}, 5*time.Second, 10*time.Millisecond, "peerstore missing addresses for node 2")

node1Available = false
conn1.fail(errors.New("node-1 down"))
Expand Down
7 changes: 7 additions & 0 deletions client/mesh_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func (h *meshConnHarness) setupDefaultHandlers(t *testing.T) {
t.Fatalf("Publish read error: %v", err)
}
h.publishReceived <- &msg

resp := &protocolsPb.Response{
Response: &protocolsPb.Response_Success{Success: &protocolsPb.Success{}},
}
if err := codec.WriteLengthPrefixedMessage(s, resp); err != nil {
t.Fatalf("Failed to send publish response: %v", err)
}
})

h.tatankaHost.SetStreamHandler(protocols.ClientSubscribeProtocol, func(s network.Stream) {
Expand Down
Loading