From 4f6bd141c64161077949a7da3b7698141691fce7 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 21 Feb 2026 09:14:37 +0000 Subject: [PATCH 1/3] feat: Add system test for cross-region buckets Adds a new system test, test_basic_wrd_x_region, to verify functionality with cross-region GCS buckets. Also updates the Cloud Build configuration to pass the necessary _CROSS_REGION_BUCKET environment variable to the test environment. --- cloudbuild/zb-system-tests-cloudbuild.yaml | 2 +- tests/system/test_zonal.py | 46 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml index 562eae175..7cedf867a 100644 --- a/cloudbuild/zb-system-tests-cloudbuild.yaml +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -68,7 +68,7 @@ steps: # Execute the script on the VM via SSH. # Capture the exit code to ensure cleanup happens before the build fails. set +e - gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" + gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" EXIT_CODE=$? set -e diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 49c0ac7da..79ecdd015 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -31,6 +31,7 @@ # TODO: replace this with a fixture once zonal bucket creation / deletion # is supported in grpc client or json client client. _ZONAL_BUCKET = os.getenv("ZONAL_BUCKET") +_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET") _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" @@ -82,6 +83,51 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: return a + step, a + 2 * step +@pytest.mark.parametrize( + "object_size", + [ + 256, # less than _chunk size + 10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES + 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE + ], +) +def test_basic_wrd_x_region( + storage_client, + blobs_to_delete, + object_size, + event_loop, + grpc_client, +): + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter(grpc_client, _CROSS_REGION_BUCKET, object_name) + await writer.open() + await writer.append(object_data) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + buffer = BytesIO() + async with AsyncMultiRangeDownloader( + grpc_client, _CROSS_REGION_BUCKET, object_name + ) as mrd: + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + assert mrd.persisted_size == object_size + + assert buffer.getvalue() == object_data + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_CROSS_REGION_BUCKET).blob(object_name)) + del writer + gc.collect() + + event_loop.run_until_complete(_run()) + @pytest.mark.parametrize( "object_size", [ From ad3387cb6f3e12c7753b48d788ab1f6feb7b6a57 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 21 Feb 2026 10:00:48 +0000 Subject: [PATCH 2/3] fix: Correct environment variable name in Cloud Build config --- cloudbuild/zb-system-tests-cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml index 7cedf867a..26daa8ae9 100644 --- a/cloudbuild/zb-system-tests-cloudbuild.yaml +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -68,7 +68,7 @@ steps: # Execute the script on the VM via SSH. # Capture the exit code to ensure cleanup happens before the build fails. set +e - gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" + gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" EXIT_CODE=$? set -e From 05f48646e0492130b9b9d897eb0d195762c55a03 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 21 Feb 2026 12:14:08 +0000 Subject: [PATCH 3/3] feat: Make open_retries private and tracking it in downloader Renames open_retries to _open_retries in AsyncMultiRangeDownloader to encapsulate the retry counter. Updates test_zonal.py to verify the private attribute. --- .../storage/asyncio/async_multi_range_downloader.py | 9 ++++++++- tests/system/test_zonal.py | 6 +++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index 3ee773a04..d4ba6727b 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -228,6 +228,8 @@ def __init__( self._read_id_to_download_ranges_id = {} self._download_ranges_id_to_pending_read_ids = {} self.persisted_size: Optional[int] = None # updated after opening the stream + self._open_retries: int = 0 + async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" @@ -257,13 +259,18 @@ async def open( raise ValueError("Underlying bidi-gRPC stream is already open") if retry_policy is None: + def on_error_wrapper(exc): + self._open_retries += 1 + self._on_open_error(exc) + retry_policy = AsyncRetry( - predicate=_is_read_retryable, on_error=self._on_open_error + predicate=_is_read_retryable, on_error=on_error_wrapper ) else: original_on_error = retry_policy._on_error def combined_on_error(exc): + self._open_retries += 1 self._on_open_error(exc) if original_on_error: original_on_error(exc) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 79ecdd015..01d9a7c01 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -112,9 +112,9 @@ async def _run(): assert int(object_metadata.checksums.crc32c) == object_checksum buffer = BytesIO() - async with AsyncMultiRangeDownloader( - grpc_client, _CROSS_REGION_BUCKET, object_name - ) as mrd: + mrd = AsyncMultiRangeDownloader(grpc_client, _CROSS_REGION_BUCKET, object_name) + async with mrd: + assert mrd._open_retries == 1 # (0, 0) means read the whole object await mrd.download_ranges([(0, 0, buffer)]) assert mrd.persisted_size == object_size