From aa7e8232df0b19c21d763d381bc3a669b4f50ea2 Mon Sep 17 00:00:00 2001 From: Surbhi Garg Date: Fri, 14 Nov 2025 12:02:11 +0530 Subject: [PATCH] fix: Connectivity Error Metrics --- .../spanner/SpannerGrpcStreamTracer.java | 53 +++++++++++++++++++ .../spanner/spi/v1/HeaderInterceptor.java | 34 +++++++++--- ...OpenTelemetryBuiltInMetricsTracerTest.java | 6 +++ 3 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerGrpcStreamTracer.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerGrpcStreamTracer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerGrpcStreamTracer.java new file mode 100644 index 0000000000..e419b17961 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerGrpcStreamTracer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner; + +import io.grpc.ClientStreamTracer; +import io.grpc.Metadata; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Captures the event when a request is sent from the gRPC client. */ +public class SpannerGrpcStreamTracer extends ClientStreamTracer { + + private final AtomicBoolean outBoundMessageSent = new AtomicBoolean(false); + + public SpannerGrpcStreamTracer() {} + + public boolean isOutBoundMessageSent() { + return outBoundMessageSent.get(); + } + + /** An outbound message has been serialized and sent to the transport. */ + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + outBoundMessageSent.set(true); + } + + public static class Factory extends ClientStreamTracer.Factory { + + SpannerGrpcStreamTracer spannerGrpcStreamTracer; + + public Factory(SpannerGrpcStreamTracer spannerGrpcStreamTracer) { + this.spannerGrpcStreamTracer = spannerGrpcStreamTracer; + } + + @Override + public ClientStreamTracer newClientStreamTracer( + ClientStreamTracer.StreamInfo info, Metadata headers) { + return spannerGrpcStreamTracer; + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 432749fd95..e62890cc63 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -28,14 +28,9 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.spanner.admin.database.v1.DatabaseName; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; +import io.grpc.*; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; import io.grpc.alts.AltsContextUtil; import io.opencensus.stats.MeasureMap; import io.opencensus.stats.Stats; @@ -50,6 +45,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -102,7 +98,11 @@ public ClientCall interceptCall( ApiTracer tracer = callOptions.getOption(TRACER_KEY); CompositeTracer compositeTracer = tracer instanceof CompositeTracer ? (CompositeTracer) tracer : null; - return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + final AtomicBoolean headersReceived = new AtomicBoolean(false); + SpannerGrpcStreamTracer streamTracer = new SpannerGrpcStreamTracer(); + CallOptions newOptions = + callOptions.withStreamTracerFactory(new SpannerGrpcStreamTracer.Factory(streamTracer)); + return new SimpleForwardingClientCall(next.newCall(method, newOptions)) { @Override public void start(Listener responseListener, Metadata headers) { try { @@ -124,6 +124,7 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onHeaders(Metadata metadata) { + headersReceived.set(true); // Check if the call uses DirectPath by inspecting the ALTS context. boolean isDirectPathUsed = AltsContextUtil.check(getAttributes()); addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed); @@ -131,6 +132,25 @@ public void onHeaders(Metadata metadata) { metadata, tagContext, attributes, span, compositeTracer, isDirectPathUsed); super.onHeaders(metadata); } + + @Override + public void onClose(Status status, Metadata trailers) { + // Check if RPC was sent from gRPC client, but no response headers were received. + // This can happen in + // case of a timeout, for example. + if (streamTracer.isOutBoundMessageSent() && !headersReceived.get()) { + if (compositeTracer != null) { + compositeTracer.recordGfeHeaderMissingCount(1L); + // Disable afe_connectivity_error_count metric as AFE header is disabled in + // backend + // currently. + // if (GapicSpannerRpc.isEnableAFEServerTiming()) { + // compositeTracer.recordAfeHeaderMissingCount(1L); + // } + } + } + super.onClose(status, trailers); + } }, headers); } catch (ExecutionException executionException) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index aeb9487e2e..8f5f91c343 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -347,6 +347,10 @@ public void testNoNetworkConnection() { // Attempt count should have a failed metric point for CreateSession. assertEquals( 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0); + + // Connectivity count will not increase as client did not attempt to send the request + assertFalse( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); } @Test @@ -390,6 +394,8 @@ public void testNoServerTimingHeader() throws IOException, InterruptedException assertFalse(checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_LATENCIES_NAME)); assertFalse(checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_LATENCIES_NAME)); + assertTrue( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); // Metric is disabled currently assertFalse( checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME));