diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index c808cb52a..444e8d030 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -97,15 +97,29 @@ def __init__( :param object_name: The name of the GCS Appendable Object to be written. :type generation: int - :param generation: (Optional) If present, selects a specific revision of - that object. - If None, a new object is created. - If None and Object already exists then it'll will be - overwritten. + :param generation: (Optional) If present, creates writer for that + specific revision of that object. Use this to append data to an + existing Appendable Object. + + Setting to ``0`` makes the `writer.open()` succeed only if + object doesn't exist in the bucket (useful for not accidentally + overwriting existing objects). + + Warning: If `None`, a new object is created. If an object with the + same name already exists, it will be overwritten the moment + `writer.open()` is called. :type write_handle: bytes - :param write_handle: (Optional) An existing handle for writing the object. - If provided, opening the bidi-gRPC connection will be faster. + :param write_handle: (Optional) An handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + + :type writer_options: dict + :param writer_options: (Optional) A dictionary of writer options. + Supported options: + - "FLUSH_INTERVAL_BYTES": int + The number of bytes to append before "persisting" data in GCS + servers. Default is `_DEFAULT_FLUSH_INTERVAL_BYTES`. + Must be a multiple of `_MAX_CHUNK_SIZE_BYTES`. """ raise_if_no_fast_crc32c() self.client = client @@ -133,7 +147,6 @@ def __init__( self.flush_interval = writer_options.get( "FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES ) - # TODO: add test case for this. if self.flush_interval < _MAX_CHUNK_SIZE_BYTES: raise exceptions.OutOfRange( f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}" @@ -346,6 +359,11 @@ async def finalize(self) -> _storage_v2.Object: self.offset = None return self.object_resource + @property + def is_stream_open(self) -> bool: + return self._is_stream_open + + # helper methods. async def append_from_string(self, data: str): """ diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index a0ebaa498..1454803ad 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -47,8 +47,17 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): :param object_name: The name of the GCS ``Appendable Object`` to be write. :type generation_number: int - :param generation_number: (Optional) If present, selects a specific revision of - this object. If None, a new object is created. + :param generation_number: (Optional) If present, creates writer for that + specific revision of that object. Use this to append data to an + existing Appendable Object. + + Setting to ``0`` makes the `writer.open()` succeed only if + object doesn't exist in the bucket (useful for not accidentally + overwriting existing objects). + + Warning: If `None`, a new object is created. If an object with the + same name already exists, it will be overwritten the moment + `writer.open()` is called. :type write_handle: bytes :param write_handle: (Optional) An existing handle for writing the object. @@ -101,13 +110,16 @@ async def open(self) -> None: # Create a new object or overwrite existing one if generation_number # is None. This makes it consistent with GCS JSON API behavior. # Created object type would be Appendable Object. - if self.generation_number is None: + # if `generation_number` == 0 new object will be created only if there + # isn't any existing object. + if self.generation_number is None or self.generation_number == 0: self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( write_object_spec=_storage_v2.WriteObjectSpec( resource=_storage_v2.Object( name=self.object_name, bucket=self._full_bucket_name ), appendable=True, + if_generation_match=self.generation_number, ), ) else: @@ -118,7 +130,6 @@ async def open(self) -> None: generation=self.generation_number, ), ) - self.socket_like_rpc = AsyncBidiRpc( self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata ) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 2ade654ad..b5942365a 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -18,6 +18,7 @@ from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.api_core.exceptions import FailedPrecondition pytestmark = pytest.mark.skipif( @@ -360,3 +361,74 @@ async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete): await mrd.close() content = buffer.getvalue() assert content == full_data + +@pytest.mark.asyncio +async def test_open_with_generation_zero(storage_client, blobs_to_delete): + """Tests that using `generation=0` fails if the object already exists. + + This test verifies that: + 1. An object can be created using `AsyncAppendableObjectWriter` with `generation=0`. + 2. Attempting to create the same object again with `generation=0` raises a + `FailedPrecondition` error with a 400 status code, because the + precondition (object must not exist) is not met. + """ + object_name = f"test_append_with_generation-{uuid.uuid4()}" + grpc_client = AsyncGrpcClient().grpc_client + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) + + # Empty object is created. + await writer.open() + assert writer.is_stream_open + + await writer.close() + assert not writer.is_stream_open + + + with pytest.raises(FailedPrecondition) as exc_info: + writer = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=0 + ) + await writer.open() + assert exc_info.value.code == 400 + + # cleanup + del writer + gc.collect() + + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + +@pytest.mark.asyncio +async def test_open_existing_object_with_gen_None_overrides_existing(storage_client, blobs_to_delete): + """ + Test that a new writer when specifies `None` overrides the existing object. + """ + object_name = f"test_append_with_generation-{uuid.uuid4()}" + + grpc_client = AsyncGrpcClient().grpc_client + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) + + # Empty object is created. + await writer.open() + assert writer.is_stream_open + old_gen = writer.generation + + + await writer.close() + assert not writer.is_stream_open + + + + new_writer = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=None + ) + await new_writer.open() + assert new_writer.generation != old_gen + + # assert exc_info.value.code == 400 + + # cleanup + del writer + del new_writer + gc.collect() + + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) \ No newline at end of file diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 07f7047d8..c19622238 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -55,7 +55,7 @@ def test_init(mock_write_object_stream, mock_client): assert writer.object_name == OBJECT assert writer.generation is None assert writer.write_handle is None - assert not writer._is_stream_open + assert not writer.is_stream_open assert writer.offset is None assert writer.persisted_size is None assert writer.bytes_appended_since_last_flush == 0 @@ -225,7 +225,7 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie # Assert mock_stream.open.assert_awaited_once() - assert writer._is_stream_open + assert writer.is_stream_open assert writer.generation == GENERATION assert writer.write_handle == WRITE_HANDLE assert writer.persisted_size == 0 @@ -255,7 +255,7 @@ async def test_open_appendable_object_writer_existing_object( # Assert mock_stream.open.assert_awaited_once() - assert writer._is_stream_open + assert writer.is_stream_open assert writer.generation == GENERATION assert writer.write_handle == WRITE_HANDLE assert writer.persisted_size == PERSISTED_SIZE @@ -379,7 +379,7 @@ async def test_close(mock_write_object_stream, mock_client): mock_stream.close.assert_awaited_once() assert writer.offset is None assert persisted_size == 1024 - assert not writer._is_stream_open + assert not writer.is_stream_open @pytest.mark.asyncio @@ -415,7 +415,7 @@ async def test_finalize_on_close(mock_write_object_stream, mock_client): # Assert mock_stream.close.assert_awaited_once() - assert not writer._is_stream_open + assert not writer.is_stream_open assert writer.offset is None assert writer.object_resource == mock_resource assert writer.persisted_size == 2048 @@ -448,7 +448,7 @@ async def test_finalize(mock_write_object_stream, mock_client): assert writer.object_resource == mock_resource assert writer.persisted_size == 123 assert gcs_object == mock_resource - assert writer._is_stream_open is False + assert not writer.is_stream_open assert writer.offset is None diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 63b5495bd..f0d90543f 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -148,6 +148,42 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): assert stream.persisted_size == 0 +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, mock_client): + """Test opening a stream for a new object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.resource.size = 0 + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0) + + # Act + await stream.open() + + # Assert + mock_async_bidi_rpc.assert_called_once() + _, call_kwargs = mock_async_bidi_rpc.call_args + initial_request = call_kwargs["initial_request"] + assert initial_request.write_object_spec.if_generation_match == 0 + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + assert stream.persisted_size == 0 + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"