diff --git a/app/app.go b/app/app.go index 7e3dee8ee..e8af98286 100644 --- a/app/app.go +++ b/app/app.go @@ -23,6 +23,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/automaxprocs/maxprocs" "github.com/obolnetwork/charon/app/errors" @@ -219,7 +221,33 @@ func Run(ctx context.Context, conf Config) (err error) { lockHashHex := Hex7(lock.LockHash) - p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID) + // Create prometheus registry early so it can be used by p2p. + // Metric and logging labels. + labels := map[string]string{ + "cluster_hash": lockHashHex, + "cluster_name": lock.Name, + "cluster_peer": "", // Set below from p2p key + "nickname": conf.Nickname, + "cluster_network": network, + "charon_version": version.Version.String(), + } + + // Derive peer ID from private key to set cluster_peer label before registering metrics + peerID, err := p2p.PeerIDFromKey(p2pKey.PubKey()) + if err != nil { + return err + } + labels["cluster_peer"] = p2p.PeerName(peerID) + + // Update cluster_peer label for Loki + log.SetLokiLabels(labels) + + promRegistry, err := promauto.NewRegistry(labels) + if err != nil { + return err + } + + p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID, promRegistry, labels) if err != nil { return err } @@ -244,22 +272,6 @@ func Run(ctx context.Context, conf Config) (err error) { z.Str("enr", enrRec.String()), z.Int("peers", len(lock.Operators))) - // Metric and logging labels. - labels := map[string]string{ - "cluster_hash": lockHashHex, - "cluster_name": lock.Name, - "cluster_peer": p2p.PeerName(p2pNode.ID()), - "nickname": conf.Nickname, - "cluster_network": network, - "charon_version": version.Version.String(), - } - log.SetLokiLabels(labels) - - promRegistry, err := promauto.NewRegistry(labels) - if err != nil { - return err - } - initStartupMetrics(p2p.PeerName(p2pNode.ID()), lock.Threshold, len(lock.Operators), len(lock.Validators), network) eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, lock, lock.ForkVersion, conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout) @@ -336,7 +348,7 @@ func wirePeerInfo(life *lifecycle.Manager, p2pNode host.Host, peers []peer.ID, l // wireP2P constructs the p2p tcp or udp (libp2p) nodes and registers it with the life cycle manager. func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, - lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string, + lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string, promRegistry *prometheus.Registry, labels prometheus.Labels, ) (host.Host, error) { peerIDs, err := lock.PeerIDs() if err != nil { @@ -354,23 +366,32 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, } // Start libp2p node. + bwOpt, bwReporter := p2p.WithBandwidthReporter(peerIDs) + + // Wrap registerer with cluster labels for libp2p metrics + wrappedRegisterer := prometheus.WrapRegistererWith(labels, promRegistry) + swarmOpts := []swarm.Option{p2p.WithSwarmMetrics(wrappedRegisterer)} + opts := []libp2p.Option{ - p2p.WithBandwidthReporter(peerIDs), + bwOpt, libp2p.ResourceManager(new(network.NullResourceManager)), } opts = append(opts, conf.TestConfig.LibP2POpts...) var p2pNode host.Host if featureset.Enabled(featureset.QUIC) { - p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeQUIC, opts...) + p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeQUIC, swarmOpts, opts...) } else { - p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeTCP, opts...) + p2pNode, err = p2p.NewNode(ctx, conf.P2P, p2pKey, connGater, false, p2p.NodeTypeTCP, swarmOpts, opts...) } if err != nil { return nil, err } + // Register host with bandwidth reporter for transport protocol detection + p2p.RegisterBandwidthReporter(bwReporter, p2pNode) + if conf.TestConfig.P2PNodeCallback != nil { conf.TestConfig.P2PNodeCallback(p2pNode) } diff --git a/cmd/relay/p2p.go b/cmd/relay/p2p.go index ff2835fb6..3283eeb80 100644 --- a/cmd/relay/p2p.go +++ b/cmd/relay/p2p.go @@ -42,7 +42,7 @@ func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter m } } - p2pNode, err := p2p.NewNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs, p2p.NodeTypeQUIC, + p2pNode, err := p2p.NewNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs, p2p.NodeTypeQUIC, nil, libp2p.ResourceManager(new(network.NullResourceManager)), libp2p.BandwidthReporter(reporter)) if err != nil { return nil, nil, errors.Wrap(err, "new relay node") diff --git a/cmd/testpeers.go b/cmd/testpeers.go index 889770144..a9241018e 100644 --- a/cmd/testpeers.go +++ b/cmd/testpeers.go @@ -908,7 +908,7 @@ func setupP2P(ctx context.Context, privKey *k1.PrivateKey, conf p2p.Config, peer return nil, nil, err } - tcpNode, err := p2p.NewNode(ctx, conf, privKey, connGater, false, p2p.NodeTypeTCP) + tcpNode, err := p2p.NewNode(ctx, conf, privKey, connGater, false, p2p.NodeTypeTCP, nil) if err != nil { return nil, nil, err } diff --git a/cmd/testpeers_internal_test.go b/cmd/testpeers_internal_test.go index 43b262de1..136837fb0 100644 --- a/cmd/testpeers_internal_test.go +++ b/cmd/testpeers_internal_test.go @@ -539,7 +539,7 @@ func startPeer(t *testing.T, ctx context.Context, conf testPeersConfig, peerPriv connGater, err := p2p.NewConnGater([]peer.ID{hostAsPeer.ID}, relays) require.NoError(t, err) - peerTCPNode, err := p2p.NewNode(ctx, peerConf.P2P, peerPrivKey, connGater, false, p2p.NodeTypeTCP) + peerTCPNode, err := p2p.NewNode(ctx, peerConf.P2P, peerPrivKey, connGater, false, p2p.NodeTypeTCP, nil) require.NoError(t, err) t.Cleanup(func() { diff --git a/dkg/dkg.go b/dkg/dkg.go index 3a60ef59a..03f92d228 100644 --- a/dkg/dkg.go +++ b/dkg/dkg.go @@ -1282,7 +1282,7 @@ func setupP2P(ctx context.Context, key *k1.PrivateKey, conf Config, peers []p2p. return nil, nil, err } - p2pNode, err := p2p.NewNode(ctx, conf.P2P, key, connGater, false, p2p.NodeTypeTCP) + p2pNode, err := p2p.NewNode(ctx, conf.P2P, key, connGater, false, p2p.NodeTypeTCP, nil) if err != nil { return nil, nil, err } diff --git a/docs/metrics.md b/docs/metrics.md index 5ee5501f8..f9a68223a 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -87,9 +87,9 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `core_validatorapi_vc_user_agent` | Gauge | Gauge with label set to user agent string of requests made by VC | `user_agent` | | `p2p_peer_connection_total` | Counter | Total number of libp2p connections per peer. | `peer` | | `p2p_peer_connection_types` | Gauge | Current number of libp2p connections by peer, type (`direct` or `relay`), and protocol (`tcp`, `quic`). Note that peers may have multiple connections. | `peer, type, protocol` | -| `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol. | `peer, protocol` | -| `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol. | `peer, protocol` | -| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`) and protocol. | `peer, direction, protocol` | +| `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol and transport. Transport is based on first active connection (accurate in steady state). | `peer, protocol, transport` | +| `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol and transport. Transport is based on first active connection (accurate in steady state). | `peer, protocol, transport` | +| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`), protocol and transport. | `peer, direction, protocol, transport` | | `p2p_ping_error_total` | Counter | Total number of ping errors per peer | `peer` | | `p2p_ping_latency_secs` | Histogram | Ping latencies in seconds per peer | `peer` | | `p2p_ping_success` | Gauge | Whether the last ping was successful (1) or not (0). Can be used as proxy for connected peers | `peer` | diff --git a/p2p/metrics.go b/p2p/metrics.go index 57082eccb..0b0fca26e 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -6,9 +6,11 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/prometheus/client_golang/prometheus" "github.com/obolnetwork/charon/app/promauto" @@ -70,8 +72,8 @@ var ( peerStreamGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{ Namespace: "p2p", Name: "peer_streams", - Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown') and protocol.", - }, []string{"peer", "direction", "protocol"}) + Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown'), protocol and transport.", + }, []string{"peer", "direction", "protocol", "transport"}) peerConnCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "p2p", @@ -82,14 +84,14 @@ var ( networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "p2p", Name: "peer_network_receive_bytes_total", - Help: "Total number of network bytes received from the peer by protocol.", - }, []string{"peer", "protocol"}) + Help: "Total number of network bytes received from the peer by protocol and transport. Transport is based on first active connection (accurate in steady state).", + }, []string{"peer", "protocol", "transport"}) networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "p2p", Name: "peer_network_sent_bytes_total", - Help: "Total number of network bytes sent to the peer by protocol.", - }, []string{"peer", "protocol"}) + Help: "Total number of network bytes sent to the peer by protocol and transport. Transport is based on first active connection (accurate in steady state).", + }, []string{"peer", "protocol", "transport"}) ) func observePing(p peer.ID, d time.Duration) { @@ -102,42 +104,91 @@ func incPingError(p peer.ID) { pingSuccess.WithLabelValues(PeerName(p)).Set(0) } -var _ metrics.Reporter = bandwithReporter{} +// WithSwarmMetrics returns a libp2p swarm option that enables the built-in swarm metrics. +// The registerer parameter should be the same prometheus registry used by the application +// to ensure libp2p metrics are exposed alongside application metrics. +func WithSwarmMetrics(registerer prometheus.Registerer) swarm.Option { + // Use libp2p's built-in metrics tracer with the provided registerer + return swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(registerer))) +} + +// BandwidthReporter is an interface for the bandwidth reporter that can be registered with a host. +type BandwidthReporter interface { + registerHost(host.Host) +} + +var _ metrics.Reporter = (*bandwithReporter)(nil) // WithBandwidthReporter returns a libp2p option that enables bandwidth reporting via prometheus. -func WithBandwidthReporter(peers []peer.ID) libp2p.Option { +// Returns both the option and the reporter instance so the host can be registered later. +func WithBandwidthReporter(peers []peer.ID) (libp2p.Option, BandwidthReporter) { peerNames := make(map[peer.ID]string) for _, p := range peers { peerNames[p] = PeerName(p) } - return libp2p.BandwidthReporter(bandwithReporter{peerNames: peerNames}) + reporter := &bandwithReporter{ + peerNames: peerNames, + } + + return libp2p.BandwidthReporter(reporter), reporter +} + +// RegisterBandwidthReporter sets the host reference on the bandwidth reporter for transport detection. +func RegisterBandwidthReporter(reporter BandwidthReporter, h host.Host) { + if reporter != nil { + reporter.registerHost(h) + } } type bandwithReporter struct { metrics.Reporter + host host.Host peerNames map[peer.ID]string } +func (r *bandwithReporter) registerHost(h host.Host) { + r.host = h +} + func (bandwithReporter) LogSentMessage(int64) {} func (bandwithReporter) LogRecvMessage(int64) {} -func (r bandwithReporter) LogSentMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) { +func (r *bandwithReporter) LogSentMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) { name, ok := r.peerNames[peerID] if !ok { return // Do not instrument relays } - networkTXCounter.WithLabelValues(name, string(protoID)).Add(float64(bytes)) + transport := r.getTransportProtocol(peerID) + networkTXCounter.WithLabelValues(name, string(protoID), transport).Add(float64(bytes)) } -func (r bandwithReporter) LogRecvMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) { +func (r *bandwithReporter) LogRecvMessageStream(bytes int64, protoID protocol.ID, peerID peer.ID) { name, ok := r.peerNames[peerID] if !ok { return // Do not instrument relays } - networkRXCounter.WithLabelValues(name, string(protoID)).Add(float64(bytes)) + transport := r.getTransportProtocol(peerID) + networkRXCounter.WithLabelValues(name, string(protoID), transport).Add(float64(bytes)) +} + +// getTransportProtocol determines the transport protocol (tcp/quic/unknown) for a peer. +// Uses first connection which is accurate in steady state (single connection per peer). +func (r *bandwithReporter) getTransportProtocol(peerID peer.ID) string { + if r.host == nil { + return protocolUnknown + } + + conns := r.host.Network().ConnsToPeer(peerID) + if len(conns) == 0 { + return protocolUnknown + } + + // In steady state, there's typically only one connection per peer. + // Use first connection's protocol. + return addrProtocol(conns[0].RemoteMultiaddr()) } diff --git a/p2p/p2p.go b/p2p/p2p.go index e67ed6838..924597a71 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -20,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/protocol/identify" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -48,7 +49,7 @@ const ( // NewNode returns a started libp2p host. func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater ConnGater, - filterPrivateAddrs bool, nodeType NodeType, opts ...libp2p.Option, + filterPrivateAddrs bool, nodeType NodeType, swarmOpts []swarm.Option, opts ...libp2p.Option, ) (host.Host, error) { setActivationThreshold() @@ -118,6 +119,8 @@ func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater Conn return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs) }), transport, + // Enable libp2p swarm metrics + libp2p.SwarmOpts(swarmOpts...), } defaultOpts = append(defaultOpts, opts...) @@ -638,6 +641,7 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs [] PeerName string Direction string Protocol string + Transport string } var ( @@ -682,11 +686,13 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs [] } counts[cKey]++ + transport := addrProtocol(conn.RemoteMultiaddr()) for _, stream := range conn.GetStreams() { sKey := streamKey{ PeerName: PeerName(conn.RemotePeer()), Direction: stream.Stat().Direction.String(), Protocol: string(stream.Protocol()), + Transport: transport, } streams[sKey]++ } @@ -718,7 +724,7 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs [] } for sKey, amount := range streams { - peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol).Set(float64(amount)) + peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol, sKey.Transport).Set(float64(amount)) } case e := <-events: // Log and instrument events. diff --git a/p2p/peer_test.go b/p2p/peer_test.go index d0a87379d..db2411d39 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -32,7 +32,7 @@ func TestNewTCPHost(t *testing.T) { privKey, err := k1.GeneratePrivateKey() require.NoError(t, err) - _, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeTCP) + _, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeTCP, nil) require.NoError(t, err) } @@ -40,7 +40,7 @@ func TestNewQUICHost(t *testing.T) { privKey, err := k1.GeneratePrivateKey() require.NoError(t, err) - _, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeQUIC) + _, err = p2p.NewNode(context.Background(), p2p.Config{}, privKey, p2p.NewOpenGater(), false, p2p.NodeTypeQUIC, nil) require.NoError(t, err) }