Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/automaxprocs/maxprocs"

"github.com/obolnetwork/charon/app/errors"
Expand Down Expand Up @@ -219,7 +221,33 @@ func Run(ctx context.Context, conf Config) (err error) {

lockHashHex := Hex7(lock.LockHash)

p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID)
// Create prometheus registry early so it can be used by p2p.
// Metric and logging labels.
labels := map[string]string{
"cluster_hash": lockHashHex,
"cluster_name": lock.Name,
"cluster_peer": "", // Set below from p2p key
"nickname": conf.Nickname,
"cluster_network": network,
"charon_version": version.Version.String(),
}

// Derive peer ID from private key to set cluster_peer label before registering metrics
peerID, err := p2p.PeerIDFromKey(p2pKey.PubKey())
if err != nil {
return err
}
labels["cluster_peer"] = p2p.PeerName(peerID)

// Update cluster_peer label for Loki
log.SetLokiLabels(labels)

promRegistry, err := promauto.NewRegistry(labels)
if err != nil {
return err
}

p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID, promRegistry, labels)
if err != nil {
return err
}
Expand All @@ -244,22 +272,6 @@ func Run(ctx context.Context, conf Config) (err error) {
z.Str("enr", enrRec.String()),
z.Int("peers", len(lock.Operators)))

// Metric and logging labels.
labels := map[string]string{
"cluster_hash": lockHashHex,
"cluster_name": lock.Name,
"cluster_peer": p2p.PeerName(p2pNode.ID()),
"nickname": conf.Nickname,
"cluster_network": network,
"charon_version": version.Version.String(),
}
log.SetLokiLabels(labels)

promRegistry, err := promauto.NewRegistry(labels)
if err != nil {
return err
}

initStartupMetrics(p2p.PeerName(p2pNode.ID()), lock.Threshold, len(lock.Operators), len(lock.Validators), network)

eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, lock, lock.ForkVersion, conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout)
Expand Down Expand Up @@ -336,7 +348,7 @@ func wirePeerInfo(life *lifecycle.Manager, p2pNode host.Host, peers []peer.ID, l

// wireP2P constructs the p2p tcp or udp (libp2p) nodes and registers it with the life cycle manager.
func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string,
lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string, promRegistry *prometheus.Registry, labels prometheus.Labels,
) (host.Host, error) {
peerIDs, err := lock.PeerIDs()
if err != nil {
Expand All @@ -354,23 +366,32 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
}

// Start libp2p node.
bwOpt, bwReporter := p2p.WithBandwidthReporter(peerIDs)

// Wrap registerer with cluster labels for libp2p metrics
wrappedRegisterer := prometheus.WrapRegistererWith(labels, promRegistry)
swarmOpts := []swarm.Option{p2p.WithSwarmMetrics(wrappedRegisterer)}

opts := []libp2p.Option{
p2p.WithBandwidthReporter(peerIDs),
bwOpt,
libp2p.ResourceManager(new(network.NullResourceManager)),
}
opts = append(opts, conf.TestConfig.LibP2POpts...)

var p2pNode host.Host
if featureset.Enabled(featureset.QUIC) {
p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeQUIC, opts...)
p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeQUIC, swarmOpts, opts...)
} else {
p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeTCP, opts...)
p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeTCP, swarmOpts, opts...)
}

if err != nil {
return nil, err
}

// Register host with bandwidth reporter for transport protocol detection
p2p.RegisterBandwidthReporter(bwReporter, p2pNode)

if conf.TestConfig.P2PNodeCallback != nil {
conf.TestConfig.P2PNodeCallback(p2pNode)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/relay/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter m
}
}

p2pNode, err := p2p.NewNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs, p2p.NodeTypeQUIC,
p2pNode, err := p2p.NewNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs, p2p.NodeTypeQUIC, nil,
libp2p.ResourceManager(new(network.NullResourceManager)), libp2p.BandwidthReporter(reporter))
if err != nil {
return nil, nil, errors.Wrap(err, "new relay node")
Expand Down
2 changes: 1 addition & 1 deletion cmd/testpeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ func setupP2P(ctx context.Context, privKey *k1.PrivateKey, conf p2p.Config, peer
return nil, nil, err
}

tcpNode, err := p2p.NewNode(ctx, conf, privKey, connGater, false, p2p.NodeTypeTCP)
tcpNode, err := p2p.NewNode(ctx, conf, privKey, connGater, false, p2p.NodeTypeTCP, nil)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/testpeers_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func startPeer(t *testing.T, ctx context.Context, conf testPeersConfig, peerPriv
connGater, err := p2p.NewConnGater([]peer.ID{hostAsPeer.ID}, relays)
require.NoError(t, err)

peerTCPNode, err := p2p.NewNode(ctx, peerConf.P2P, peerPrivKey, connGater, false, p2p.NodeTypeTCP)
peerTCPNode, err := p2p.NewNode(ctx, peerConf.P2P, peerPrivKey, connGater, false, p2p.NodeTypeTCP, nil)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ func setupP2P(ctx context.Context, key *k1.PrivateKey, conf Config, peers []p2p.
return nil, nil, err
}

p2pNode, err := p2p.NewNode(ctx, conf.P2P, key, connGater, false, p2p.NodeTypeTCP)
p2pNode, err := p2p.NewNode(ctx, conf.P2P, key, connGater, false, p2p.NodeTypeTCP, nil)
if err != nil {
return nil, nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance.
| `core_validatorapi_vc_user_agent` | Gauge | Gauge with label set to user agent string of requests made by VC | `user_agent` |
| `p2p_peer_connection_total` | Counter | Total number of libp2p connections per peer. | `peer` |
| `p2p_peer_connection_types` | Gauge | Current number of libp2p connections by peer, type (`direct` or `relay`), and protocol (`tcp`, `quic`). Note that peers may have multiple connections. | `peer, type, protocol` |
| `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol. | `peer, protocol` |
| `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol. | `peer, protocol` |
| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`) and protocol. | `peer, direction, protocol` |
| `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol and transport. Transport is based on first active connection (accurate in steady state). | `peer, protocol, transport` |
| `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol and transport. Transport is based on first active connection (accurate in steady state). | `peer, protocol, transport` |
| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`), protocol and transport. | `peer, direction, protocol, transport` |
| `p2p_ping_error_total` | Counter | Total number of ping errors per peer | `peer` |
| `p2p_ping_latency_secs` | Histogram | Ping latencies in seconds per peer | `peer` |
| `p2p_ping_success` | Gauge | Whether the last ping was successful (1) or not (0). Can be used as proxy for connected peers | `peer` |
Expand Down
77 changes: 64 additions & 13 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/prometheus/client_golang/prometheus"

"github.com/obolnetwork/charon/app/promauto"
Expand Down Expand Up @@ -70,8 +72,8 @@ var (
peerStreamGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{
Namespace: "p2p",
Name: "peer_streams",
Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown') and protocol.",
}, []string{"peer", "direction", "protocol"})
Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown'), protocol and transport.",
}, []string{"peer", "direction", "protocol", "transport"})

peerConnCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "p2p",
Expand All @@ -82,14 +84,14 @@ var (
networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "p2p",
Name: "peer_network_receive_bytes_total",
Help: "Total number of network bytes received from the peer by protocol.",
}, []string{"peer", "protocol"})
Help: "Total number of network bytes received from the peer by protocol and transport. Transport is based on first active connection (accurate in steady state).",
}, []string{"peer", "protocol", "transport"})

networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "p2p",
Name: "peer_network_sent_bytes_total",
Help: "Total number of network bytes sent to the peer by protocol.",
}, []string{"peer", "protocol"})
Help: "Total number of network bytes sent to the peer by protocol and transport. Transport is based on first active connection (accurate in steady state).",
}, []string{"peer", "protocol", "transport"})
)

func observePing(p peer.ID, d time.Duration) {
Expand All @@ -102,42 +104,91 @@ func incPingError(p peer.ID) {
pingSuccess.WithLabelValues(PeerName(p)).Set(0)
}

var _ metrics.Reporter = bandwithReporter{}
// WithSwarmMetrics returns a libp2p swarm option that enables the built-in swarm metrics.
// The registerer parameter should be the same prometheus registry used by the application
// to ensure libp2p metrics are exposed alongside application metrics.
func WithSwarmMetrics(registerer prometheus.Registerer) swarm.Option {
// Use libp2p's built-in metrics tracer with the provided registerer
return swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(registerer)))
}

// BandwidthReporter is an interface for the bandwidth reporter that can be registered with a host.
type BandwidthReporter interface {
registerHost(host.Host)
}

var _ metrics.Reporter = (*bandwithReporter)(nil)

// WithBandwidthReporter returns a libp2p option that enables bandwidth reporting via prometheus.
func WithBandwidthReporter(peers []peer.ID) libp2p.Option {
// Returns both the option and the reporter instance so the host can be registered later.
func WithBandwidthReporter(peers []peer.ID) (libp2p.Option, BandwidthReporter) {
peerNames := make(map[peer.ID]string)
for _, p := range peers {
peerNames[p] = PeerName(p)
}

return libp2p.BandwidthReporter(bandwithReporter{peerNames: peerNames})
reporter := &bandwithReporter{
peerNames: peerNames,
}

return libp2p.BandwidthReporter(reporter), reporter
}

// RegisterBandwidthReporter sets the host reference on the bandwidth reporter for transport detection.
func RegisterBandwidthReporter(reporter BandwidthReporter, h host.Host) {
if reporter != nil {
reporter.registerHost(h)
}
}

type bandwithReporter struct {
metrics.Reporter

host host.Host
peerNames map[peer.ID]string
}

func (r *bandwithReporter) registerHost(h host.Host) {
r.host = h
}

func (bandwithReporter) LogSentMessage(int64) {}

func (bandwithReporter) LogRecvMessage(int64) {}

func (r bandwithReporter) LogSentMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) {
func (r *bandwithReporter) LogSentMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) {
name, ok := r.peerNames[peerID]
if !ok {
return // Do not instrument relays
}

networkTXCounter.WithLabelValues(name, string(protoID)).Add(float64(bytes))
transport := r.getTransportProtocol(peerID)
networkTXCounter.WithLabelValues(name, string(protoID), transport).Add(float64(bytes))
}

func (r bandwithReporter) LogRecvMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) {
func (r *bandwithReporter) LogRecvMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) {
name, ok := r.peerNames[peerID]
if !ok {
return // Do not instrument relays
}

networkRXCounter.WithLabelValues(name, string(protoID)).Add(float64(bytes))
transport := r.getTransportProtocol(peerID)
networkRXCounter.WithLabelValues(name, string(protoID), transport).Add(float64(bytes))
}

// getTransportProtocol determines the transport protocol (tcp/quic/unknown) for a peer.
// Uses first connection which is accurate in steady state (single connection per peer).
func (r *bandwithReporter) getTransportProtocol(peerID peer.ID) string {
if r.host == nil {
return protocolUnknown
}

conns := r.host.Network().ConnsToPeer(peerID)
if len(conns) == 0 {
return protocolUnknown
}

// In steady state, there's typically only one connection per peer.
// Use first connection's protocol.
return addrProtocol(conns[0].RemoteMultiaddr())
}
10 changes: 8 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
Expand Down Expand Up @@ -48,7 +49,7 @@ const (

// NewNode returns a started libp2p host.
func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater ConnGater,
filterPrivateAddrs bool, nodeType NodeType, opts ...libp2p.Option,
filterPrivateAddrs bool, nodeType NodeType, swarmOpts []swarm.Option, opts ...libp2p.Option,
) (host.Host, error) {
setActivationThreshold()

Expand Down Expand Up @@ -118,6 +119,8 @@ func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater Conn
return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs)
}),
transport,
// Enable libp2p swarm metrics
libp2p.SwarmOpts(swarmOpts...),
}

defaultOpts = append(defaultOpts, opts...)
Expand Down Expand Up @@ -638,6 +641,7 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs []
PeerName string
Direction string
Protocol string
Transport string
}

var (
Expand Down Expand Up @@ -682,11 +686,13 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs []
}
counts[cKey]++

transport := addrProtocol(conn.RemoteMultiaddr())
for _, stream := range conn.GetStreams() {
sKey := streamKey{
PeerName: PeerName(conn.RemotePeer()),
Direction: stream.Stat().Direction.String(),
Protocol: string(stream.Protocol()),
Transport: transport,
}
streams[sKey]++
}
Expand Down Expand Up @@ -718,7 +724,7 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs []
}

for sKey, amount := range streams {
peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol).Set(float64(amount))
peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol, sKey.Transport).Set(float64(amount))
}
case e := <-events:
// Log and instrument events.
Expand Down
4 changes: 2 additions & 2 deletions p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func TestNewTCPHost(t *testing.T) {
privKey, err := k1.GeneratePrivateKey()
require.NoError(t, err)

_, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeTCP)
_, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeTCP, nil)
require.NoError(t, err)
}

func TestNewQUICHost(t *testing.T) {
privKey, err := k1.GeneratePrivateKey()
require.NoError(t, err)

_, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeQUIC)
_, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeQUIC, nil)
require.NoError(t, err)
}

Expand Down
Loading