Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
StringType,
StructType,
)
from pyiceberg.utils.config import Config

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
Expand Down Expand Up @@ -891,13 +892,16 @@ def __hash__(self) -> int:
return hash(self.manifest_path)


# Global cache for ManifestFile objects, keyed by manifest_path.
# This deduplicates ManifestFile objects across manifest lists, which commonly
# share manifests after append operations.
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)

# Lock for thread-safe cache access
_DEFAULT_MANIFEST_CACHE_SIZE = 128
_manifest_cache_size = Config().get_int("manifest-cache-size") or _DEFAULT_MANIFEST_CACHE_SIZE
_manifest_cache_lock = threading.RLock()
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=_manifest_cache_size)


def clear_manifest_cache() -> None:
"""Clear the manifest cache."""
with _manifest_cache_lock:
_manifest_cache.clear()


def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
Expand Down
19 changes: 12 additions & 7 deletions tests/benchmark/test_memory_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import pyarrow as pa
import pytest

from pyiceberg import manifest as manifest_module
from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.manifest import _manifest_cache
from pyiceberg.manifest import clear_manifest_cache


def generate_test_dataframe() -> pa.Table:
Expand Down Expand Up @@ -64,7 +65,7 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
@pytest.fixture(autouse=True)
def clear_caches() -> None:
"""Clear caches before each test."""
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()


Expand Down Expand Up @@ -95,7 +96,8 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
# Sample memory at intervals
if (i + 1) % 10 == 0:
current, _ = tracemalloc.get_traced_memory()
cache_size = len(_manifest_cache)
cache = manifest_module._manifest_cache
cache_size = len(cache) if cache is not None else 0

memory_samples.append((i + 1, current, cache_size))
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
Expand Down Expand Up @@ -150,13 +152,14 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) ->

gc.collect()
before_clear_memory, _ = tracemalloc.get_traced_memory()
cache_size_before = len(_manifest_cache)
cache = manifest_module._manifest_cache
cache_size_before = len(cache) if cache is not None else 0
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
print(f" Cache size: {cache_size_before}")

# Phase 2: Clear cache and GC
print("\nPhase 2: Clearing cache and running GC...")
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()
gc.collect() # Multiple GC passes for thorough cleanup

Expand Down Expand Up @@ -192,6 +195,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
ManifestEntry,
ManifestEntryStatus,
_manifests,
clear_manifest_cache,
write_manifest,
write_manifest_list,
)
Expand Down Expand Up @@ -245,7 +249,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
num_lists = 10
print(f"Creating {num_lists} manifest lists with overlapping manifests...")

_manifest_cache.clear()
clear_manifest_cache()

for i in range(num_lists):
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
Expand All @@ -265,7 +269,8 @@ def test_manifest_cache_deduplication_efficiency() -> None:
_manifests(io, list_path)

# Analyze cache efficiency
cache_entries = len(_manifest_cache)
cache = manifest_module._manifest_cache
cache_entries = len(cache) if cache is not None else 0
# List i contains manifests 0..i, so only the first num_lists manifests are actually used
manifests_actually_used = num_lists

Expand Down
84 changes: 75 additions & 9 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import fastavro
import pytest

import pyiceberg.manifest as manifest_module
from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand All @@ -32,8 +33,8 @@
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
_manifest_cache,
_manifests,
clear_manifest_cache,
read_manifest_list,
write_manifest,
write_manifest_list,
Expand All @@ -46,9 +47,8 @@


@pytest.fixture(autouse=True)
def clear_global_manifests_cache() -> None:
# Clear the global cache before each test
_manifest_cache.clear()
def reset_global_manifests_cache() -> None:
clear_manifest_cache()


def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None:
Expand Down Expand Up @@ -804,9 +804,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None:

# Verify cache size - should only have 3 unique ManifestFile objects
# instead of 1 + 2 + 3 = 6 objects as with the old approach
assert len(_manifest_cache) == 3, (
f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}"
)
cache = manifest_module._manifest_cache
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}"


def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
Expand Down Expand Up @@ -879,9 +879,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
# With the new approach, we should have exactly N objects

# Verify cache has exactly N unique entries
assert len(_manifest_cache) == num_manifests, (
cache = manifest_module._manifest_cache
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == num_manifests, (
f"Cache should contain exactly {num_manifests} ManifestFile objects, "
f"but has {len(_manifest_cache)}. "
f"but has {len(cache)}. "
f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects."
)

Expand Down Expand Up @@ -932,3 +934,67 @@ def test_manifest_writer_tell(format_version: TableVersion) -> None:
after_entry_bytes = writer.tell()

assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry"


def test_clear_manifest_cache() -> None:
"""Test that clear_manifest_cache() clears cache entries while keeping cache enabled."""
io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
spec = UNPARTITIONED_PARTITION_SPEC

# Create a manifest file
manifest_path = f"{tmp_dir}/manifest.avro"
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest_path),
snapshot_id=1,
avro_compression="zstandard",
) as writer:
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=1,
data_file=data_file,
)
)
manifest_file = writer.to_manifest_file()

# Create a manifest list
list_path = f"{tmp_dir}/manifest-list.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(list_path),
snapshot_id=1,
parent_snapshot_id=None,
sequence_number=1,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file])

# Populate the cache
_manifests(io, list_path)

# Verify cache has entries
cache = manifest_module._manifest_cache
assert cache is not None, "Cache should be enabled"
assert len(cache) > 0, "Cache should have entries after reading manifests"

# Clear the cache
clear_manifest_cache()

# Verify cache is empty but still enabled
cache_after = manifest_module._manifest_cache
assert cache_after is not None, "Cache should still be enabled after clear"
assert len(cache_after) == 0, "Cache should be empty after clear"