Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,29 @@ def __init__(
:param object_name: The name of the GCS Appendable Object to be written.

:type generation: int
:param generation: (Optional) If present, selects a specific revision of
that object.
If None, a new object is created.
If None and Object already exists then it'll will be
overwritten.
:param generation: (Optional) If present, creates writer for that
specific revision of that object. Use this to append data to an
existing Appendable Object.

Setting to ``0`` makes the `writer.open()` succeed only if
object doesn't exist in the bucket (useful for not accidentally
overwriting existing objects).

Warning: If `None`, a new object is created. If an object with the
same name already exists, it will be overwritten the moment
`writer.open()` is called.

:type write_handle: bytes
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
:param write_handle: (Optional) An handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.

:type writer_options: dict
:param writer_options: (Optional) A dictionary of writer options.
Supported options:
- "FLUSH_INTERVAL_BYTES": int
The number of bytes to append before "persisting" data in GCS
servers. Default is `_DEFAULT_FLUSH_INTERVAL_BYTES`.
Must be a multiple of `_MAX_CHUNK_SIZE_BYTES`.
"""
raise_if_no_fast_crc32c()
self.client = client
Expand Down Expand Up @@ -133,7 +147,6 @@ def __init__(
self.flush_interval = writer_options.get(
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
)
# TODO: add test case for this.
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
raise exceptions.OutOfRange(
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
Expand Down Expand Up @@ -346,6 +359,11 @@ async def finalize(self) -> _storage_v2.Object:
self.offset = None
return self.object_resource

@property
def is_stream_open(self) -> bool:
return self._is_stream_open


# helper methods.
async def append_from_string(self, data: str):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,17 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
:param object_name: The name of the GCS ``Appendable Object`` to be write.

:type generation_number: int
:param generation_number: (Optional) If present, selects a specific revision of
this object. If None, a new object is created.
:param generation_number: (Optional) If present, creates writer for that
specific revision of that object. Use this to append data to an
existing Appendable Object.

Setting to ``0`` makes the `writer.open()` succeed only if
object doesn't exist in the bucket (useful for not accidentally
overwriting existing objects).

Warning: If `None`, a new object is created. If an object with the
same name already exists, it will be overwritten the moment
`writer.open()` is called.

:type write_handle: bytes
:param write_handle: (Optional) An existing handle for writing the object.
Expand Down Expand Up @@ -101,13 +110,16 @@ async def open(self) -> None:
# Create a new object or overwrite existing one if generation_number
# is None. This makes it consistent with GCS JSON API behavior.
# Created object type would be Appendable Object.
if self.generation_number is None:
# if `generation_number` == 0 new object will be created only if there
# isn't any existing object.
if self.generation_number is None or self.generation_number == 0:
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
write_object_spec=_storage_v2.WriteObjectSpec(
resource=_storage_v2.Object(
name=self.object_name, bucket=self._full_bucket_name
),
appendable=True,
if_generation_match=self.generation_number,
),
)
else:
Expand All @@ -118,7 +130,6 @@ async def open(self) -> None:
generation=self.generation_number,
),
)

self.socket_like_rpc = AsyncBidiRpc(
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
)
Expand Down
72 changes: 72 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)
from google.api_core.exceptions import FailedPrecondition


pytestmark = pytest.mark.skipif(
Expand Down Expand Up @@ -360,3 +361,74 @@ async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete):
await mrd.close()
content = buffer.getvalue()
assert content == full_data

@pytest.mark.asyncio
async def test_open_with_generation_zero(storage_client, blobs_to_delete):
"""Tests that using `generation=0` fails if the object already exists.

This test verifies that:
1. An object can be created using `AsyncAppendableObjectWriter` with `generation=0`.
2. Attempting to create the same object again with `generation=0` raises a
`FailedPrecondition` error with a 400 status code, because the
precondition (object must not exist) is not met.
"""
object_name = f"test_append_with_generation-{uuid.uuid4()}"
grpc_client = AsyncGrpcClient().grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0)

# Empty object is created.
await writer.open()
assert writer.is_stream_open

await writer.close()
assert not writer.is_stream_open


with pytest.raises(FailedPrecondition) as exc_info:
writer = AsyncAppendableObjectWriter(
grpc_client, _ZONAL_BUCKET, object_name, generation=0
)
await writer.open()
assert exc_info.value.code == 400

# cleanup
del writer
gc.collect()

blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

@pytest.mark.asyncio
async def test_open_existing_object_with_gen_None_overrides_existing(storage_client, blobs_to_delete):
"""
Test that a new writer when specifies `None` overrides the existing object.
"""
object_name = f"test_append_with_generation-{uuid.uuid4()}"

grpc_client = AsyncGrpcClient().grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0)

# Empty object is created.
await writer.open()
assert writer.is_stream_open
old_gen = writer.generation


await writer.close()
assert not writer.is_stream_open



new_writer = AsyncAppendableObjectWriter(
grpc_client, _ZONAL_BUCKET, object_name, generation=None
)
await new_writer.open()
assert new_writer.generation != old_gen

# assert exc_info.value.code == 400

# cleanup
del writer
del new_writer
gc.collect()

blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
12 changes: 6 additions & 6 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_init(mock_write_object_stream, mock_client):
assert writer.object_name == OBJECT
assert writer.generation is None
assert writer.write_handle is None
assert not writer._is_stream_open
assert not writer.is_stream_open
assert writer.offset is None
assert writer.persisted_size is None
assert writer.bytes_appended_since_last_flush == 0
Expand Down Expand Up @@ -225,7 +225,7 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie

# Assert
mock_stream.open.assert_awaited_once()
assert writer._is_stream_open
assert writer.is_stream_open
assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE
assert writer.persisted_size == 0
Expand Down Expand Up @@ -255,7 +255,7 @@ async def test_open_appendable_object_writer_existing_object(

# Assert
mock_stream.open.assert_awaited_once()
assert writer._is_stream_open
assert writer.is_stream_open
assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE
assert writer.persisted_size == PERSISTED_SIZE
Expand Down Expand Up @@ -379,7 +379,7 @@ async def test_close(mock_write_object_stream, mock_client):
mock_stream.close.assert_awaited_once()
assert writer.offset is None
assert persisted_size == 1024
assert not writer._is_stream_open
assert not writer.is_stream_open


@pytest.mark.asyncio
Expand Down Expand Up @@ -415,7 +415,7 @@ async def test_finalize_on_close(mock_write_object_stream, mock_client):

# Assert
mock_stream.close.assert_awaited_once()
assert not writer._is_stream_open
assert not writer.is_stream_open
assert writer.offset is None
assert writer.object_resource == mock_resource
assert writer.persisted_size == 2048
Expand Down Expand Up @@ -448,7 +448,7 @@ async def test_finalize(mock_write_object_stream, mock_client):
assert writer.object_resource == mock_resource
assert writer.persisted_size == 123
assert gcs_object == mock_resource
assert writer._is_stream_open is False
assert not writer.is_stream_open
assert writer.offset is None


Expand Down
36 changes: 36 additions & 0 deletions tests/unit/asyncio/test_async_write_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,42 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
assert stream.persisted_size == 0


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
)
async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, mock_client):
"""Test opening a stream for a new object."""
# Arrange
socket_like_rpc = mock.AsyncMock()
mock_async_bidi_rpc.return_value = socket_like_rpc
socket_like_rpc.open = mock.AsyncMock()

mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
mock_response.resource.generation = GENERATION
mock_response.resource.size = 0
mock_response.write_handle = WRITE_HANDLE
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)

stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0)

# Act
await stream.open()

# Assert
mock_async_bidi_rpc.assert_called_once()
_, call_kwargs = mock_async_bidi_rpc.call_args
initial_request = call_kwargs["initial_request"]
assert initial_request.write_object_spec.if_generation_match == 0
assert stream._is_stream_open
socket_like_rpc.open.assert_called_once()
socket_like_rpc.recv.assert_called_once()
assert stream.generation_number == GENERATION
assert stream.write_handle == WRITE_HANDLE
assert stream.persisted_size == 0


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
Expand Down