Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7fd21ca
copied files from previous branch
daniel-sanche Aug 8, 2025
0fa28bb
create metrics client using same credentials as bt client
daniel-sanche Aug 8, 2025
33cf00e
updated aggregation values
daniel-sanche Aug 11, 2025
5a507c2
Merge branch 'csm_1_data_model' into csm_3_handlers
daniel-sanche Oct 1, 2025
e10741b
fixed init issues
daniel-sanche Oct 1, 2025
1dd87ca
added otel to setup.py
daniel-sanche Oct 1, 2025
18afdb7
added new test file
daniel-sanche Oct 1, 2025
7338c84
Merge branch 'csm_2_instrumentation' into csm_3_handlers->instrumenta…
daniel-sanche Oct 1, 2025
011fd84
Merge branch 'csm_3_handlers' into csm_3_handlers->instrumentation
daniel-sanche Oct 1, 2025
8179f97
fixed metric export
daniel-sanche Oct 1, 2025
71f2125
fixed test
daniel-sanche Oct 1, 2025
a6f16ec
Merge branch 'csm_3_handlers->instrumentation' into csm_3_handlers
daniel-sanche Oct 1, 2025
6ee0c4d
apply ordering
daniel-sanche Oct 1, 2025
666ea74
fixed time format
daniel-sanche Oct 1, 2025
2426f50
simplified export test
daniel-sanche Oct 1, 2025
de36f36
use tighter assertions
daniel-sanche Oct 1, 2025
589c221
improved test structure
daniel-sanche Oct 1, 2025
473f2bc
removed imports
daniel-sanche Oct 1, 2025
e9119e7
test other metrics
daniel-sanche Oct 2, 2025
018bcc4
remove OK status requirement
daniel-sanche Oct 2, 2025
e5958db
test with table
daniel-sanche Oct 2, 2025
69219e5
added export metrics tests
daniel-sanche Oct 2, 2025
05ae3ec
keep project id in exporter
daniel-sanche Oct 2, 2025
5ccb57c
removed sync export metrics
daniel-sanche Oct 2, 2025
65cd359
moved exported metrics test to main system test class
daniel-sanche Oct 2, 2025
88fe0b3
started otel unit tests
daniel-sanche Oct 3, 2025
f1ee799
fixed comments
daniel-sanche Oct 3, 2025
17ad4e0
gemini tests
daniel-sanche Oct 3, 2025
8ff32a5
added constant instead of magic number
daniel-sanche Oct 3, 2025
fdc609c
improved gemini tests
daniel-sanche Oct 3, 2025
facfb25
fixed lint
daniel-sanche Oct 3, 2025
de53154
added some unit tests
daniel-sanche Oct 3, 2025
c5a08c5
ran lint
daniel-sanche Oct 3, 2025
53669f7
simplified metric existance check
daniel-sanche Oct 3, 2025
c7481b8
fixed docstring
daniel-sanche Oct 3, 2025
9b2cd1c
added tests for gcp_exporter file
daniel-sanche Oct 3, 2025
2616894
fixed lint
daniel-sanche Oct 3, 2025
1fdd7ad
moved back some fixtures
daniel-sanche Oct 3, 2025
8e82647
Merge branch 'csm_2_instrumentation' into csm_3_handlers
daniel-sanche Oct 3, 2025
3906ea6
Merge branch 'csm_2_instrumentation' into csm_3_handlers
daniel-sanche Oct 3, 2025
694e042
fixed event loop error
daniel-sanche Oct 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import (
BigtableMetricsExporter,
)
from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import (
GoogleCloudMetricsHandler,
)
from google.cloud.bigtable.data._metrics import OperationType

from google.cloud.bigtable.data._cross_sync import CrossSync
Expand Down Expand Up @@ -242,6 +248,12 @@ def __init__(
"configured the universe domain explicitly, `googleapis.com` "
"is the default."
)
# create a metrics exporter using the same client configuration
self._gcp_metrics_exporter = BigtableMetricsExporter(
project_id=self.project,
credentials=credentials,
client_options=client_options,
)
self._is_closed = CrossSync.Event()
self.transport = cast(TransportType, self._gapic_client.transport)
# keep track of active instances to for warmup on channel refresh
Expand Down Expand Up @@ -970,8 +982,17 @@ def __init__(
self.default_retryable_errors: Sequence[type[Exception]] = (
default_retryable_errors or ()
)

self._metrics = BigtableClientSideMetricsController()
self._metrics = BigtableClientSideMetricsController(
handlers=[
GoogleCloudMetricsHandler(
exporter=client._gcp_metrics_exporter,
instance_id=instance_id,
table_id=table_id,
app_profile_id=app_profile_id,
client_version=client._client_version(),
)
]
)

try:
self._register_instance_future = CrossSync.create_task(
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/bigtable/data/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
OpenTelemetryMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers.gcp_exporter import (
GoogleCloudMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers._stdout import _StdoutMetricsHandler
from google.cloud.bigtable.data._metrics.metrics_controller import (
BigtableClientSideMetricsController,
)
Expand All @@ -23,6 +30,9 @@

__all__ = (
"BigtableClientSideMetricsController",
"OpenTelemetryMetricsHandler",
"GoogleCloudMetricsHandler",
"_StdoutMetricsHandler",
"OperationType",
"ActiveOperationMetric",
"ActiveAttemptMetric",
Expand Down
272 changes: 272 additions & 0 deletions google/cloud/bigtable/data/_metrics/handlers/gcp_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import time

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics import view
from opentelemetry.sdk.metrics.export import (
HistogramDataPoint,
MetricExporter,
MetricExportResult,
MetricsData,
NumberDataPoint,
PeriodicExportingMetricReader,
)
from google.protobuf.timestamp_pb2 import Timestamp
from google.api.distribution_pb2 import Distribution
from google.api.metric_pb2 import Metric as GMetric
from google.api.monitored_resource_pb2 import MonitoredResource
from google.api.metric_pb2 import MetricDescriptor
from google.api_core import gapic_v1
from google.cloud.monitoring_v3 import (
CreateTimeSeriesRequest,
MetricServiceClient,
Point,
TimeInterval,
TimeSeries,
TypedValue,
)

from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
OpenTelemetryMetricsHandler,
)
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import (
_OpenTelemetryInstruments,
)


# create OpenTelemetry views for Bigtable metrics
# avoid reformatting into individual lines
# fmt: off
MILLIS_AGGREGATION = view.ExplicitBucketHistogramAggregation(
[
0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40,
50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650,
800, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000,
200_000, 400_000, 800_000, 1_600_000, 3_200_000
]
)
# fmt: on
COUNT_AGGREGATION = view.SumAggregation()
INSTRUMENT_NAMES = (
"operation_latencies",
"first_response_latencies",
"attempt_latencies",
"retry_count",
"server_latencies",
"connectivity_error_count",
"application_latencies",
"throttling_latencies",
)
VIEW_LIST = [
view.View(
instrument_name=n,
name=n,
aggregation=MILLIS_AGGREGATION
if n.endswith("latencies")
else COUNT_AGGREGATION,
)
for n in INSTRUMENT_NAMES
]


class GoogleCloudMetricsHandler(OpenTelemetryMetricsHandler):
"""
Maintains an internal set of OpenTelemetry metrics for the Bigtable client library,
and periodically exports them to Google Cloud Monitoring.

The OpenTelemetry metrics that are tracked are as follows:
- operation_latencies: latency of each client method call, over all of it's attempts.
- first_response_latencies: latency of receiving the first row in a ReadRows operation.
- attempt_latencies: latency of each client attempt RPC.
- retry_count: Number of additional RPCs sent after the initial attempt.
- server_latencies: latency recorded on the server side for each attempt.
- connectivity_error_count: number of attempts that failed to reach Google's network.
- application_latencies: the time spent waiting for the application to process the next response.
- throttling_latencies: latency introduced by waiting when there are too many outstanding requests in a bulk operation.

Args:
exporter: The exporter object used to write metrics to Cloud Montitoring.
Should correspond 1:1 with a bigtable client, and share auth configuration
export_interval: The interval (in seconds) at which to export metrics to Cloud Monitoring.
*args: configuration positional arguments passed down to super class
*kwargs: configuration keyword arguments passed down to super class
"""

def __init__(self, exporter, *args, export_interval=60, **kwargs):
# periodically executes exporter
gcp_reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=export_interval * 1000
)
# use private meter provider to store instruments and views
self.meter_provider = MeterProvider(
metric_readers=[gcp_reader], views=VIEW_LIST
)
otel = _OpenTelemetryInstruments(meter_provider=self.meter_provider)
super().__init__(*args, instruments=otel, **kwargs)

def close(self):
self.meter_provider.shutdown()


class BigtableMetricsExporter(MetricExporter):
"""
OpenTelemetry Exporter implementation for sending metrics to Google Cloud Monitoring.

We must use a custom exporter because the public one doesn't support writing to internal
metrics like `bigtable.googleapis.com/internal/client/`

Each GoogleCloudMetricsHandler will maintain its own exporter instance associated with the
project_id it is configured with.

Args:
project_id: GCP project id to associate metrics with
"""

def __init__(self, project_id: str, *client_args, **client_kwargs):
super().__init__()
self.client = MetricServiceClient(*client_args, **client_kwargs)
self.prefix = "bigtable.googleapis.com/internal/client"
self.project_id = project_id

def export(
self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs
) -> MetricExportResult:
"""
Write a set of metrics to Cloud Monitoring.
This method is called by the OpenTelemetry SDK
"""
deadline = time.time() + (timeout_millis / 1000)
metric_kind = MetricDescriptor.MetricKind.CUMULATIVE
all_series: list[TimeSeries] = []
# process each metric from OTel format into Cloud Monitoring format
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
for data_point in [
pt for pt in metric.data.data_points if pt.attributes
]:
if data_point.attributes:
monitored_resource = MonitoredResource(
type="bigtable_client_raw",
labels={
"project_id": self.project_id,
"instance": data_point.attributes[
"resource_instance"
],
"cluster": data_point.attributes[
"resource_cluster"
],
"table": data_point.attributes["resource_table"],
"zone": data_point.attributes["resource_zone"],
},
)
point = self._to_point(data_point)
series = TimeSeries(
resource=monitored_resource,
metric_kind=metric_kind,
points=[point],
metric=GMetric(
type=f"{self.prefix}/{metric.name}",
labels={
k: v
for k, v in data_point.attributes.items()
if not k.startswith("resource_")
},
),
unit=metric.unit,
)
all_series.append(series)
# send all metrics to Cloud Monitoring
try:
self._batch_write(all_series, deadline)
return MetricExportResult.SUCCESS
except Exception:
return MetricExportResult.FAILURE

def _batch_write(
self, series: list[TimeSeries], deadline=None, max_batch_size=200
) -> None:
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82

Args:
series: list of TimeSeries to write. Will be split into batches if necessary
deadline: designates the time.time() at which to stop writing. If None, uses API default
max_batch_size: maximum number of time series to write at once.
Cloud Monitoring allows up to 200 per request
"""
write_ind = 0
while write_ind < len(series):
# find time left for next batch
timeout = deadline - time.time() if deadline else gapic_v1.method.DEFAULT
# write next batch
self.client.create_service_time_series(
CreateTimeSeriesRequest(
name=f"projects/{self.project_id}",
time_series=series[write_ind : write_ind + max_batch_size],
),
timeout=timeout,
)
write_ind += max_batch_size

@staticmethod
def _to_point(data_point: NumberDataPoint | HistogramDataPoint) -> Point:
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
if isinstance(data_point, HistogramDataPoint):
mean = data_point.sum / data_point.count if data_point.count else 0.0
point_value = TypedValue(
distribution_value=Distribution(
count=data_point.count,
mean=mean,
bucket_counts=data_point.bucket_counts,
bucket_options=Distribution.BucketOptions(
explicit_buckets=Distribution.BucketOptions.Explicit(
bounds=data_point.explicit_bounds,
)
),
)
)
else:
if isinstance(data_point.value, int):
point_value = TypedValue(int64_value=data_point.value)
else:
point_value = TypedValue(double_value=data_point.value)
start_time = Timestamp()
start_time.FromNanoseconds(data_point.start_time_unix_nano)
end_time = Timestamp()
end_time.FromNanoseconds(data_point.time_unix_nano)
interval = TimeInterval(start_time=start_time, end_time=end_time)
return Point(interval=interval, value=point_value)

def shutdown(self, timeout_millis: float = 30_000, **kwargs):
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
pass

def force_flush(self, timeout_millis: float = 10_000):
"""
Adapted from CloudMonitoringMetricsExporter
https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/3668dfe7ce3b80dd01f42af72428de957b58b316/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py#L82
"""
return True
Loading
Loading