Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
0d93889
added stdout handler to test
daniel-sanche Aug 4, 2025
a580fa2
instrumented check_and_mutate
daniel-sanche Aug 4, 2025
bc13b46
added instrumentation to read_modify_write
daniel-sanche Aug 4, 2025
6c3be46
added instrumentation to sample_row_keys
daniel-sanche Aug 4, 2025
ca38615
instrumented mutate_row
daniel-sanche Aug 4, 2025
eb82ae9
added instrumentation to mutate_rows
daniel-sanche Aug 6, 2025
0ef3372
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 6, 2025
0f8067c
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 6, 2025
996acf2
instrumented read_rows
daniel-sanche Aug 7, 2025
3eae4aa
fixed lint and mypy
daniel-sanche Aug 7, 2025
4684a72
use default backoff instance
daniel-sanche Aug 7, 2025
98e4006
remove async with for metrics
daniel-sanche Aug 7, 2025
fa865cb
followed same operation management pattern for read_rows as mutate_rows
daniel-sanche Aug 7, 2025
9fd4c56
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 7, 2025
ff7e681
fixed lint
daniel-sanche Aug 7, 2025
8049be5
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 7, 2025
40fcbe8
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Aug 8, 2025
410cfb8
fixed mypy
daniel-sanche Aug 8, 2025
2adec5a
fixed errors
daniel-sanche Aug 8, 2025
14d252b
fixed operation end for read_rows
daniel-sanche Aug 8, 2025
ab30b02
fixed lint
daniel-sanche Aug 8, 2025
bb55c46
made merge_rows into an instance method
daniel-sanche Aug 8, 2025
a4746f0
added new file for system tests
daniel-sanche Sep 15, 2025
417faf0
got tests to run
daniel-sanche Sep 15, 2025
4c8ffe5
added test for read_modify_write
daniel-sanche Sep 15, 2025
542dfb9
added test for check_and_mutate
daniel-sanche Sep 15, 2025
b815674
added test for sample_row_keys
daniel-sanche Sep 15, 2025
f84151f
pass down metadata in other rpcs
daniel-sanche Sep 15, 2025
b57ab24
added stubs for other rpcs
daniel-sanche Sep 15, 2025
a35a6b6
added start of system tests for metrics
daniel-sanche Sep 15, 2025
f0b8983
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Sep 15, 2025
55ff4d1
fixed read_rows
daniel-sanche Sep 15, 2025
614c32a
refactored system tests
daniel-sanche Sep 15, 2025
d3f8dbf
moved event loop back into test files
daniel-sanche Sep 15, 2025
2a93a28
implemented full success tests for rpcs
daniel-sanche Sep 15, 2025
3fb4f13
added failure tests for checK_and_mutate
daniel-sanche Sep 16, 2025
66d3254
added unauthenticated error test
daniel-sanche Sep 16, 2025
eb8a229
added stubs
daniel-sanche Sep 16, 2025
761cf9c
added tests for sample_row_keys
daniel-sanche Sep 16, 2025
c33b813
sped up test
daniel-sanche Sep 16, 2025
661d6eb
added read_modify_write tests
daniel-sanche Sep 16, 2025
3935744
added operation logic to retry factory
daniel-sanche Sep 16, 2025
0a9af2d
added read_rows tests
daniel-sanche Sep 16, 2025
b52c871
added read_row and read_rows_sharded tests
daniel-sanche Sep 17, 2025
759d198
added bulk_mutate_row tests
daniel-sanche Sep 17, 2025
18fba8a
added batcher tests
daniel-sanche Sep 17, 2025
c107118
added mutate_row tests
daniel-sanche Sep 17, 2025
2a42b90
fixed read_rows_sharded test
daniel-sanche Sep 17, 2025
fb84b4b
refacotred tracked exception factory
daniel-sanche Sep 17, 2025
c560589
fixed lint
daniel-sanche Sep 17, 2025
0f4ee8d
use cancelled error to test non-retryable methods
daniel-sanche Sep 18, 2025
2683b50
added aclose test to read_rows_stream
daniel-sanche Sep 18, 2025
685b62a
changed test names
daniel-sanche Sep 18, 2025
2336e64
fixed warnings
daniel-sanche Sep 18, 2025
e239d56
adding sync tests
daniel-sanche Sep 19, 2025
1a54a69
capture status for unary failed attempts
daniel-sanche Sep 19, 2025
a23d920
added test for streaming retries
daniel-sanche Sep 19, 2025
b33458f
added test
daniel-sanche Sep 20, 2025
3b146f5
fixed tracked flow control
daniel-sanche Sep 20, 2025
b91da1c
updated mutate_rows test
daniel-sanche Sep 22, 2025
64ce0a4
improved retry instrumentation
daniel-sanche Sep 22, 2025
7eb50d8
added trackers to data model
daniel-sanche Sep 22, 2025
c1baf81
fixed bug in application blocking time
daniel-sanche Sep 23, 2025
9d63963
record last attempt in exception factory
daniel-sanche Sep 23, 2025
d3f9d05
simplified helper
daniel-sanche Sep 23, 2025
1d26452
added metric system tests; solved issues
daniel-sanche Sep 23, 2025
d1b74fc
swapped out custom metadata with contextvar
daniel-sanche Sep 25, 2025
8707d40
simplified interceptor
daniel-sanche Sep 25, 2025
e9877fd
removed cancel from spec
daniel-sanche Sep 25, 2025
f92d66b
fixed tests
daniel-sanche Sep 25, 2025
d9de44d
removed dead pointer
daniel-sanche Sep 25, 2025
253284f
replace custom metadata with contextvars
daniel-sanche Sep 25, 2025
2e0e402
updated sync files
daniel-sanche Sep 25, 2025
88644b2
fixed lint
daniel-sanche Sep 26, 2025
0340fce
fixed tests
daniel-sanche Sep 26, 2025
6798ea2
fixed more tests
daniel-sanche Sep 30, 2025
c96dd25
removed unneeded kwargs
daniel-sanche Sep 30, 2025
7eb83a8
fixed unit tests
daniel-sanche Sep 30, 2025
61f8b85
ran blacken
daniel-sanche Sep 30, 2025
ee72ae9
fixed tests
daniel-sanche Sep 30, 2025
dd7453b
removed unneeded kwargs
daniel-sanche Sep 30, 2025
911a299
fixed lint
daniel-sanche Sep 30, 2025
231f38b
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Sep 30, 2025
abaf5b2
fixed flakes
daniel-sanche Oct 2, 2025
1832824
generated sync
daniel-sanche Oct 3, 2025
6bbac87
moved all fixtures into one place
daniel-sanche Oct 3, 2025
4fd97e1
Merge branch 'csm_1_data_model' into csm_2_instrumentation
daniel-sanche Oct 3, 2025
2bde30d
fixed event loop error
daniel-sanche Oct 3, 2025
08f5ce4
remvoed docstring
daniel-sanche Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

if TYPE_CHECKING:
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._metrics import ActiveOperationMetric

if CrossSync.is_async:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
Expand Down Expand Up @@ -68,6 +69,8 @@ class _MutateRowsOperationAsync:
operation_timeout: the timeout to use for the entire operation, in seconds.
attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
metric: the metric object representing the active operation
retryable_exceptions: a list of exceptions that should be retried
"""

@CrossSync.convert
Expand All @@ -78,6 +81,7 @@ def __init__(
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
metric: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
# check that mutations are within limits
Expand All @@ -97,13 +101,13 @@ def __init__(
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = lambda: CrossSync.retry_target(
self._run_attempt,
self.is_retryable,
sleep_generator,
metric.backoff_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
exception_factory=metric.track_terminal_error(_retry_exception_factory),
on_error=metric.track_retryable_error,
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
Expand All @@ -112,6 +116,8 @@ def __init__(
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}
# set up metrics
self._operation_metric = metric

@CrossSync.convert
async def start(self):
Expand All @@ -121,34 +127,35 @@ async def start(self):
Raises:
MutationsExceptionGroup: if any mutations failed
"""
try:
# trigger mutate_rows
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
for idx in incomplete_indices:
self._handle_entry_error(idx, exc)
finally:
# raise exception detailing incomplete mutations
all_errors: list[Exception] = []
for idx, exc_list in self.errors.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
with self._operation_metric:
try:
# trigger mutate_rows
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
for idx in incomplete_indices:
self._handle_entry_error(idx, exc)
finally:
# raise exception detailing incomplete mutations
all_errors: list[Exception] = []
for idx, exc_list in self.errors.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
)
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
entry = self.mutations[idx].entry
all_errors.append(
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
entry = self.mutations[idx].entry
all_errors.append(
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)

@CrossSync.convert
async def _run_attempt(self):
Expand All @@ -160,6 +167,8 @@ async def _run_attempt(self):
retry after the attempt is complete
GoogleAPICallError: if the gapic rpc fails
"""
# register attempt start
self._operation_metric.start_attempt()
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
active_request_indices = {
Expand Down
Loading
Loading