From 436cf2e2d3a63285557b1c91a67d5e8e6eb94e0e Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 4 Jun 2025 23:31:13 -0600 Subject: [PATCH 1/3] chore(x-goog-spanner-request-id): set span attribute x_goog_spanner_request_id This sets the span attribute x_goog_spanner_request_id so as to enable correlation with distributed tracing with OpenTelemetry. Updates #3537 --- .../cloud/spanner/XGoogSpannerRequestId.java | 4 ++++ .../spanner/spi/v1/HeaderInterceptor.java | 5 +++++ .../cloud/spanner/OpenTelemetrySpanTest.java | 18 +++++++++++++++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 7c2006b64ae..3b15f974fe5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -198,4 +198,8 @@ XGoogSpannerRequestId withNthClientId(long replacementClientId) { return XGoogSpannerRequestId.of( replacementClientId, this.nthChannelId, this.nthRequest, this.attempt); } + + public static String getRequestIdFromMetadata(Metadata md) { + return md.get(REQUEST_HEADER_KEY); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 01aa28d8d93..6b82978e9e8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.CompositeTracer; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerRpcMetrics; +import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.spanner.admin.database.v1.DatabaseName; @@ -174,6 +175,10 @@ private void processHeader( } if (span != null) { span.setAttribute("gfe_latency", String.valueOf(gfeLatency)); + String reqId = XGoogSpannerRequestId.getRequestIdFromMetadata(metadata); + if (reqId != null) { + span.setAttribute("x_goog_spanner_request_id", reqId); + } } } else { measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java index 90e66526f14..4ab20e35416 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -748,6 +749,17 @@ public void testTransactionRunnerWithRetryOnBeginTransaction() { beginTransactionSpan.toString(), beginTransactionSpan.getEvents().stream() .anyMatch(event -> event.getName().equals("Starting RPC retry 1"))); + verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(finishedSpans); + } + + private void verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(List finishedSpans) { + AttributeKey attributeKey = AttributeKey.stringKey("x_goog_spanner_request_id"); + SpanData matchedSpan = + finishedSpans.stream() + .filter(span -> !span.getAttributes().get(attributeKey).isEmpty()) + .findAny() + .orElseThrow(IllegalStateException::new); + assertNotNull(matchedSpan); } @Test @@ -798,6 +810,7 @@ public void testSingleUseRetryOnExecuteStreamingSql() { executeStreamingQuery.toString(), executeStreamingQuery.getEvents().stream() .anyMatch(event -> event.getName().contains("Stream broken. Safe to retry"))); + verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(finishedSpans); } @Test @@ -845,6 +858,7 @@ public void testRetryOnExecuteSql() { executeSqlSpan.toString(), executeSqlSpan.getEvents().stream() .anyMatch(event -> event.getName().equals("Starting RPC retry 1"))); + verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(finishedSpans); } @Test @@ -866,12 +880,14 @@ public void testTableAttributes() { } return null; }); + List finishedSpans = spanExporter.getFinishedSpanItems(); SpanData spanData = - spanExporter.getFinishedSpanItems().stream() + finishedSpans.stream() .filter(x -> x.getName().equals("CloudSpannerOperation.ExecuteStreamingRead")) .findFirst() .get(); verifyTableAttributes(spanData); + verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(finishedSpans); } private void waitForFinishedSpans(int numExpectedSpans) { From 625c7ddd6575afe56bd9f23b13eccf4a03f02602 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 11 Jun 2025 11:11:10 -0400 Subject: [PATCH 2/3] Simply matcher and usert assertTrue --- .../com/google/cloud/spanner/OpenTelemetrySpanTest.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java index 4ab20e35416..55681d35708 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -754,12 +753,8 @@ public void testTransactionRunnerWithRetryOnBeginTransaction() { private void verifyAtLeast1SpanHasXGoogSpannerRequestIdAttribute(List finishedSpans) { AttributeKey attributeKey = AttributeKey.stringKey("x_goog_spanner_request_id"); - SpanData matchedSpan = - finishedSpans.stream() - .filter(span -> !span.getAttributes().get(attributeKey).isEmpty()) - .findAny() - .orElseThrow(IllegalStateException::new); - assertNotNull(matchedSpan); + assertTrue( + finishedSpans.stream().anyMatch(span -> !span.getAttributes().get(attributeKey).isEmpty())); } @Test From 9ce07990e2d0030528b1127237eb4c1eef68291e Mon Sep 17 00:00:00 2001 From: "Harold D. Turyasingura" Date: Thu, 31 Jul 2025 23:53:10 -0400 Subject: [PATCH 3/3] enabled tests, sort requestIds before asserting monotonicity --- .../cloud/spanner/DatabaseClientImplTest.java | 26 +++++------------- .../spanner/XGoogSpannerRequestIdTest.java | 27 +++++++++---------- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index fa8b5c982fa..a8b2bcf365f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1417,8 +1417,7 @@ public void testWriteAtLeastOnceAborted() { List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertEquals(2, commitRequests.size()); - // TODO(@odeke-em): Enable in later PR. - // xGoogReqIdInterceptor.assertIntegrity(); + xGoogReqIdInterceptor.assertIntegrity(); } @Test @@ -2920,9 +2919,7 @@ public void testPartitionedDmlDoesNotTimeout() { "google.spanner.v1.Spanner/ExecuteStreamingSql", new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 6, 1)), }; - if (false) { // TODO(@odeke-em): enable in next PRs. - xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); - } + xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = { XGoogSpannerRequestIdTest.ofMethodAndRequestId( @@ -2935,9 +2932,7 @@ public void testPartitionedDmlDoesNotTimeout() { "google.spanner.v1.Spanner/ExecuteSql", new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 8, 1)), }; - if (false) { // TODO(@odeke-em): enable in next PRs. - xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); - } + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); } } @@ -3034,9 +3029,7 @@ public void testPartitionedDmlWithHigherTimeout() { new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 6, 1)), }; - if (false) { // TODO(@odeke-em): enable in next PRs. - xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); - } + xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = { XGoogSpannerRequestIdTest.ofMethodAndRequestId( @@ -3049,9 +3042,7 @@ public void testPartitionedDmlWithHigherTimeout() { "google.spanner.v1.Spanner/ExecuteSql", new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 8, 1)), }; - if (false) { // TODO(@odeke-em): enable in next PRs. - xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); - } + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); } } @@ -5304,8 +5295,7 @@ public void testSelectHasXGoogRequestIdHeader() { assertEquals(1L, resultSet.getLong(0)); assertFalse(resultSet.next()); } finally { - // TODO(@odeke-em): Enable in later PR. - // xGoogReqIdInterceptor.assertIntegrity(); + xGoogReqIdInterceptor.assertIntegrity(); } } @@ -5394,9 +5384,7 @@ public void testSessionPoolExhaustedError_containsStackTraces() { "google.spanner.v1.Spanner/CreateSession", new XGoogSpannerRequestId(NON_DETERMINISTIC, 0, 1, 1)), }; - if (false) { // TODO(@odeke-em): enable in next PRs. - xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); - } + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index 719b94593bf..26b5885b6a7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -73,11 +73,9 @@ public static class ServerHeaderEnforcer implements ServerInterceptor { private Set checkMethods; ServerHeaderEnforcer(Set checkMethods) { - this.gotValues = new CopyOnWriteArrayList(); - this.unaryResults = - new ConcurrentHashMap>(); - this.streamingResults = - new ConcurrentHashMap>(); + this.gotValues = new CopyOnWriteArrayList<>(); + this.unaryResults = new ConcurrentHashMap<>(); + this.streamingResults = new ConcurrentHashMap<>(); this.checkMethods = checkMethods; } @@ -128,17 +126,16 @@ public String[] accumulatedValues() { } public void assertIntegrity() { - this.unaryResults.forEach( - (String method, CopyOnWriteArrayList values) -> { - assertMonotonicityOfIds(method, values); - }); - this.streamingResults.forEach( - (String method, CopyOnWriteArrayList values) -> { - assertMonotonicityOfIds(method, values); - }); + this.unaryResults.forEach(this::assertMonotonicityOfIds); + this.streamingResults.forEach(this::assertMonotonicityOfIds); } private void assertMonotonicityOfIds(String prefix, List reqIds) { + reqIds.sort( + (id1, id2) -> { + if (id1.equals(id2)) return 0; + return id1.isGreaterThan(id2) ? 1 : -1; + }); int size = reqIds.size(); List violations = new ArrayList<>(); @@ -161,7 +158,7 @@ private void assertMonotonicityOfIds(String prefix, List } public MethodAndRequestId[] accumulatedUnaryValues() { - List accumulated = new ArrayList(); + List accumulated = new ArrayList<>(); this.unaryResults.forEach( (String method, CopyOnWriteArrayList values) -> { for (int i = 0; i < values.size(); i++) { @@ -172,7 +169,7 @@ public MethodAndRequestId[] accumulatedUnaryValues() { } public MethodAndRequestId[] accumulatedStreamingValues() { - List accumulated = new ArrayList(); + List accumulated = new ArrayList<>(); this.streamingResults.forEach( (String method, CopyOnWriteArrayList values) -> { for (int i = 0; i < values.size(); i++) {