From fb15808922e8aa8c9546d1b167570ce2a4174bdd Mon Sep 17 00:00:00 2001 From: Omar Refai Date: Mon, 29 Jun 2026 20:01:44 -0700 Subject: [PATCH] feat(site-agent): log trace id on gRPC client responses The gRPC client metrics callback logged only method, code, and duration, so an operator reading e.g. a DeadlineExceeded line could not tell which workflow issued the call. The Flow and Core gRPC clients are already otelgrpc-instrumented, so every RPC carries a span; thread the call context into Metrics.RecordRpcResponse and tag the log with the span's trace id when present, letting operators thread together all logs for a single workflow. - Add ctx to client.Metrics.RecordRpcResponse and pass it from the unary and stream interceptors. - Add client.TraceIDFromContext, a panic-safe helper returning the hex-encoded trace id (or "" when absent), covered by a unit test. - Log trace_id via the existing zerolog style in the coregrpc and flowgrpc implementations; the existing message text is unchanged. Fixes #2650 Signed-off-by: Omar Refai --- .../components/managers/coregrpc/metrics.go | 12 ++++- .../components/managers/flowgrpc/metrics.go | 12 ++++- .../site-workflow/pkg/grpc/client/metrics.go | 21 ++++++-- .../pkg/grpc/client/metrics_test.go | 48 +++++++++++++++++++ 4 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 rest-api/site-workflow/pkg/grpc/client/metrics_test.go diff --git a/rest-api/site-agent/pkg/components/managers/coregrpc/metrics.go b/rest-api/site-agent/pkg/components/managers/coregrpc/metrics.go index 462479f490..8aefce447a 100644 --- a/rest-api/site-agent/pkg/components/managers/coregrpc/metrics.go +++ b/rest-api/site-agent/pkg/components/managers/coregrpc/metrics.go @@ -4,6 +4,7 @@ package coregrpc import ( + "context" "errors" "time" @@ -50,8 +51,15 @@ func makeGrpcClientMetrics() client.Metrics { return metrics } -func (m *grpcClientMetrics) RecordRpcResponse(method, code string, duration time.Duration) { - ManagerAccess.Data.EB.Log.Debug().Msgf("method=%s, code=%s, duration=%v", method, code, duration) +func (m *grpcClientMetrics) RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration) { + event := ManagerAccess.Data.EB.Log.Debug() + // Tag the log with the RPC's trace id so an operator can thread this call back to the + // workflow that issued it. Omitted when the call carries no span (TraceIDFromContext is + // panic-safe and returns "" in that case). + if traceID := client.TraceIDFromContext(ctx); traceID != "" { + event = event.Str("trace_id", traceID) + } + event.Msgf("method=%s, code=%s, duration=%v", method, code, duration) m.responseLatency.WithLabelValues(method, code).Observe(duration.Seconds()) } diff --git a/rest-api/site-agent/pkg/components/managers/flowgrpc/metrics.go b/rest-api/site-agent/pkg/components/managers/flowgrpc/metrics.go index 08d3ff5d85..31bc0d2a48 100644 --- a/rest-api/site-agent/pkg/components/managers/flowgrpc/metrics.go +++ b/rest-api/site-agent/pkg/components/managers/flowgrpc/metrics.go @@ -4,6 +4,7 @@ package flowgrpc import ( + "context" "errors" "fmt" "time" @@ -55,8 +56,15 @@ func makeGrpcClientMetrics() client.Metrics { return metrics } -func (m *grpcClientMetrics) RecordRpcResponse(method, code string, duration time.Duration) { - ManagerAccess.Data.EB.Log.Debug().Msgf("method=%s, code=%s, duration=%v", method, code, duration) +func (m *grpcClientMetrics) RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration) { + event := ManagerAccess.Data.EB.Log.Debug() + // Tag the log with the RPC's trace id so an operator can thread this call back to the + // workflow that issued it. Omitted when the call carries no span (TraceIDFromContext is + // panic-safe and returns "" in that case). + if traceID := client.TraceIDFromContext(ctx); traceID != "" { + event = event.Str("trace_id", traceID) + } + event.Msgf("method=%s, code=%s, duration=%v", method, code, duration) m.responseLatency.WithLabelValues(method, code).Observe(duration.Seconds()) } diff --git a/rest-api/site-workflow/pkg/grpc/client/metrics.go b/rest-api/site-workflow/pkg/grpc/client/metrics.go index bb9d7dce47..fa2978bc92 100644 --- a/rest-api/site-workflow/pkg/grpc/client/metrics.go +++ b/rest-api/site-workflow/pkg/grpc/client/metrics.go @@ -7,6 +7,7 @@ import ( "context" "time" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -14,8 +15,20 @@ import ( // Metrics interface that defines call-back functions for RPC metrics type Metrics interface { - // RecordRpcResponse call-back method that includes rpc method, response code, and duration - RecordRpcResponse(method, code string, duration time.Duration) + // RecordRpcResponse call-back method that includes rpc method, response code, and duration. + // ctx carries the per-RPC OpenTelemetry span (the gRPC client is otelgrpc-instrumented), so + // implementations can log its trace id to correlate the call back to the workflow that issued it. + RecordRpcResponse(ctx context.Context, method, code string, duration time.Duration) +} + +// TraceIDFromContext returns the hex-encoded OpenTelemetry trace id carried by ctx, or "" when ctx +// has no valid span context. It never panics, so callers may use it unconditionally regardless of +// whether the RPC originated inside a traced workflow. +func TraceIDFromContext(ctx context.Context) string { + if sc := trace.SpanContextFromContext(ctx); sc.HasTraceID() { + return sc.TraceID().String() + } + return "" } func newGrpcUnaryMetricsInterceptor(metrics Metrics) grpc.UnaryClientInterceptor { @@ -23,7 +36,7 @@ func newGrpcUnaryMetricsInterceptor(metrics Metrics) grpc.UnaryClientInterceptor var code codes.Code defer func(startTime time.Time) { - metrics.RecordRpcResponse(method, normalizeRPCCode(code), time.Since(startTime)) + metrics.RecordRpcResponse(ctx, method, normalizeRPCCode(code), time.Since(startTime)) }(time.Now()) err := invoker(ctx, method, req, reply, cc, opts...) @@ -37,7 +50,7 @@ func newGrpcStreamMetricsInterceptor(metrics Metrics) grpc.StreamClientIntercept var code codes.Code defer func(startTime time.Time) { - metrics.RecordRpcResponse(method, normalizeRPCCode(code), time.Since(startTime)) + metrics.RecordRpcResponse(ctx, method, normalizeRPCCode(code), time.Since(startTime)) }(time.Now()) s, err := streamer(ctx, desc, cc, method, opts...) diff --git a/rest-api/site-workflow/pkg/grpc/client/metrics_test.go b/rest-api/site-workflow/pkg/grpc/client/metrics_test.go new file mode 100644 index 0000000000..8f8907e034 --- /dev/null +++ b/rest-api/site-workflow/pkg/grpc/client/metrics_test.go @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package client + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/trace" +) + +func TestTraceIDFromContext(t *testing.T) { + traceID := trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10} + + tcs := []struct { + descr string + ctx context.Context + want string + }{ + { + descr: "no span context returns empty", + ctx: context.Background(), + want: "", + }, + { + descr: "valid span context returns hex-encoded trace id", + ctx: trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + })), + want: "0102030405060708090a0b0c0d0e0f10", + }, + { + descr: "all-zero trace id is treated as absent", + ctx: trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{}, + })), + want: "", + }, + } + + for _, tc := range tcs { + t.Run(tc.descr, func(t *testing.T) { + assert.Equal(t, tc.want, TraceIDFromContext(tc.ctx)) + }) + } +}