diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml index 562eae175..26daa8ae9 100644 --- a/cloudbuild/zb-system-tests-cloudbuild.yaml +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -68,7 +68,7 @@ steps: # Execute the script on the VM via SSH. # Capture the exit code to ensure cleanup happens before the build fails. set +e - gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" + gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" EXIT_CODE=$? set -e diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index 3ee773a04..d4ba6727b 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -228,6 +228,8 @@ def __init__( self._read_id_to_download_ranges_id = {} self._download_ranges_id_to_pending_read_ids = {} self.persisted_size: Optional[int] = None # updated after opening the stream + self._open_retries: int = 0 + async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" @@ -257,13 +259,18 @@ async def open( raise ValueError("Underlying bidi-gRPC stream is already open") if retry_policy is None: + def on_error_wrapper(exc): + self._open_retries += 1 + self._on_open_error(exc) + retry_policy = AsyncRetry( - predicate=_is_read_retryable, on_error=self._on_open_error + predicate=_is_read_retryable, on_error=on_error_wrapper ) else: original_on_error = retry_policy._on_error def combined_on_error(exc): + self._open_retries += 1 self._on_open_error(exc) if original_on_error: original_on_error(exc) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 49c0ac7da..01d9a7c01 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -31,6 +31,7 @@ # TODO: replace this with a fixture once zonal bucket creation / deletion # is supported in grpc client or json client client. _ZONAL_BUCKET = os.getenv("ZONAL_BUCKET") +_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET") _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" @@ -82,6 +83,51 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: return a + step, a + 2 * step +@pytest.mark.parametrize( + "object_size", + [ + 256, # less than _chunk size + 10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES + 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE + ], +) +def test_basic_wrd_x_region( + storage_client, + blobs_to_delete, + object_size, + event_loop, + grpc_client, +): + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter(grpc_client, _CROSS_REGION_BUCKET, object_name) + await writer.open() + await writer.append(object_data) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + buffer = BytesIO() + mrd = AsyncMultiRangeDownloader(grpc_client, _CROSS_REGION_BUCKET, object_name) + async with mrd: + assert mrd._open_retries == 1 + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + assert mrd.persisted_size == object_size + + assert buffer.getvalue() == object_data + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_CROSS_REGION_BUCKET).blob(object_name)) + del writer + gc.collect() + + event_loop.run_until_complete(_run()) + @pytest.mark.parametrize( "object_size", [