Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions rest-api/site-agent/pkg/components/managers/coregrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package coregrpc

import (
"context"
"errors"
"time"

Expand Down Expand Up @@ -50,8 +51,15 @@ func makeGrpcClientMetrics() client.Metrics {
return metrics
}

func (m *grpcClientMetrics) RecordRpcResponse(method, code string, duration time.Duration) {
ManagerAccess.Data.EB.Log.Debug().Msgf("method=%s, code=%s, duration=%v", method, code, duration)
func (m *grpcClientMetrics) RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration) {
event := ManagerAccess.Data.EB.Log.Debug()
// Tag the log with the RPC's trace id so an operator can thread this call back to the
// workflow that issued it. Omitted when the call carries no span (TraceIDFromContext is
// panic-safe and returns "" in that case).
if traceID := client.TraceIDFromContext(ctx); traceID != "" {
event = event.Str("trace_id", traceID)
}
event.Msgf("method=%s, code=%s, duration=%v", method, code, duration)
m.responseLatency.WithLabelValues(method, code).Observe(duration.Seconds())
}

Expand Down
12 changes: 10 additions & 2 deletions rest-api/site-agent/pkg/components/managers/flowgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package flowgrpc

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -55,8 +56,15 @@ func makeGrpcClientMetrics() client.Metrics {
return metrics
}

func (m *grpcClientMetrics) RecordRpcResponse(method, code string, duration time.Duration) {
ManagerAccess.Data.EB.Log.Debug().Msgf("method=%s, code=%s, duration=%v", method, code, duration)
func (m *grpcClientMetrics) RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration) {
event := ManagerAccess.Data.EB.Log.Debug()
// Tag the log with the RPC's trace id so an operator can thread this call back to the
// workflow that issued it. Omitted when the call carries no span (TraceIDFromContext is
// panic-safe and returns "" in that case).
if traceID := client.TraceIDFromContext(ctx); traceID != "" {
event = event.Str("trace_id", traceID)
}
event.Msgf("method=%s, code=%s, duration=%v", method, code, duration)
m.responseLatency.WithLabelValues(method, code).Observe(duration.Seconds())
}

Expand Down
21 changes: 17 additions & 4 deletions rest-api/site-workflow/pkg/grpc/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,36 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// Metrics interface that defines call-back functions for RPC metrics
type Metrics interface {
// RecordRpcResponse call-back method that includes rpc method, response code, and duration
RecordRpcResponse(method, code string, duration time.Duration)
// RecordRpcResponse call-back method that includes rpc method, response code, and duration.
// ctx carries the per-RPC OpenTelemetry span (the gRPC client is otelgrpc-instrumented), so
// implementations can log its trace id to correlate the call back to the workflow that issued it.
RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration)
}

// TraceIDFromContext returns the hex-encoded OpenTelemetry trace id carried by ctx, or "" when ctx
// has no valid span context. It never panics, so callers may use it unconditionally regardless of
// whether the RPC originated inside a traced workflow.
func TraceIDFromContext(ctx context.Context) string {
if sc := trace.SpanContextFromContext(ctx); sc.HasTraceID() {
return sc.TraceID().String()
}
return ""
}

func newGrpcUnaryMetricsInterceptor(metrics Metrics) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req interface{}, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var code codes.Code

defer func(startTime time.Time) {
metrics.RecordRpcResponse(method, normalizeRPCCode(code), time.Since(startTime))
metrics.RecordRpcResponse(ctx, method, normalizeRPCCode(code), time.Since(startTime))
}(time.Now())

err := invoker(ctx, method, req, reply, cc, opts...)
Expand All @@ -37,7 +50,7 @@ func newGrpcStreamMetricsInterceptor(metrics Metrics) grpc.StreamClientIntercept
var code codes.Code

defer func(startTime time.Time) {
metrics.RecordRpcResponse(method, normalizeRPCCode(code), time.Since(startTime))
metrics.RecordRpcResponse(ctx, method, normalizeRPCCode(code), time.Since(startTime))
}(time.Now())

s, err := streamer(ctx, desc, cc, method, opts...)
Expand Down
48 changes: 48 additions & 0 deletions rest-api/site-workflow/pkg/grpc/client/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package client

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/trace"
)

func TestTraceIDFromContext(t *testing.T) {
traceID := trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}

tcs := []struct {
descr string
ctx context.Context
want string
}{
{
descr: "no span context returns empty",
ctx: context.Background(),
want: "",
},
{
descr: "valid span context returns hex-encoded trace id",
ctx: trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
})),
want: "0102030405060708090a0b0c0d0e0f10",
},
{
descr: "all-zero trace id is treated as absent",
ctx: trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{},
})),
want: "",
},
}

for _, tc := range tcs {
t.Run(tc.descr, func(t *testing.T) {
assert.Equal(t, tc.want, TraceIDFromContext(tc.ctx))
})
}
}
Loading