-
Notifications
You must be signed in to change notification settings - Fork 514
Add PuffinWriter for writing deletion vectors #3474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
755793c
9b10a4f
c90ad38
842d6a5
9524618
e23a67d
72ebba8
4ecfd18
eb81422
a6d2f31
a979e55
be344a6
16ce42c
2cee6a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,19 +14,26 @@ | |||||||
| # KIND, either express or implied. See the License for the | ||||||||
| # specific language governing permissions and limitations | ||||||||
| # under the License. | ||||||||
| import io | ||||||||
| import math | ||||||||
| import zlib | ||||||||
| from collections.abc import Iterable | ||||||||
| from typing import TYPE_CHECKING | ||||||||
|
|
||||||||
| from pyroaring import BitMap, FrozenBitMap | ||||||||
|
|
||||||||
| from pyiceberg.table.puffin import PuffinFile | ||||||||
| from pyiceberg.table.puffin import PuffinBlob, PuffinBlobMetadata, PuffinFile | ||||||||
|
|
||||||||
| if TYPE_CHECKING: | ||||||||
| import pyarrow as pa | ||||||||
|
|
||||||||
| EMPTY_BITMAP = FrozenBitMap() | ||||||||
| MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 | ||||||||
| PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" | ||||||||
| DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" | ||||||||
| # Reserved field id of the row position (_pos) metadata column, referenced by | ||||||||
| # deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION) | ||||||||
| ROW_POSITION_FIELD_ID = 2147483645 | ||||||||
|
|
||||||||
|
|
||||||||
| class DeletionVector: | ||||||||
|
|
@@ -37,6 +44,21 @@ def __init__(self, referenced_data_file: str, bitmaps: list[BitMap]) -> None: | |||||||
| self.referenced_data_file = referenced_data_file | ||||||||
| self._bitmaps = bitmaps | ||||||||
|
|
||||||||
| @classmethod | ||||||||
| def from_positions(cls, referenced_data_file: str, positions: Iterable[int]) -> "DeletionVector": | ||||||||
| bitmaps_by_key: dict[int, BitMap] = {} | ||||||||
| for position in positions: | ||||||||
| if position < 0: | ||||||||
| raise ValueError(f"Invalid position: {position}, positions must be non-negative") | ||||||||
| bitmaps_by_key.setdefault(position >> 32, BitMap()).add(position & 0xFFFFFFFF) | ||||||||
|
|
||||||||
| if not bitmaps_by_key: | ||||||||
| raise ValueError("Deletion vector must contain at least one position") | ||||||||
|
|
||||||||
| # Materialize a list indexed by key, padding gaps with the empty bitmap (mirrors _deserialize_bitmap) | ||||||||
| bitmaps: list[BitMap] = [bitmaps_by_key.get(key, EMPTY_BITMAP) for key in range(max(bitmaps_by_key) + 1)] | ||||||||
| return cls(referenced_data_file, bitmaps) | ||||||||
|
|
||||||||
| @staticmethod | ||||||||
| def _deserialize_bitmap(pl: bytes) -> list[BitMap]: | ||||||||
| number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little") | ||||||||
|
|
@@ -67,6 +89,21 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: | |||||||
|
|
||||||||
| return bitmaps | ||||||||
|
|
||||||||
| @staticmethod | ||||||||
| def _serialize_bitmap(bitmaps: list[BitMap]) -> bytes: | ||||||||
| # Counterpart of _deserialize_bitmap: number of bitmaps (8 bytes, little-endian), then for each | ||||||||
| # non-empty bitmap in ascending key order its key (4 bytes, little-endian) and serialized payload. | ||||||||
| non_empty = [(key, bitmap) for key, bitmap in enumerate(bitmaps) if len(bitmap) > 0] | ||||||||
|
|
||||||||
| with io.BytesIO() as out: | ||||||||
| out.write(len(non_empty).to_bytes(8, "little")) | ||||||||
| for key, bitmap in non_empty: | ||||||||
| if key > MAX_JAVA_SIGNED: | ||||||||
| raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") | ||||||||
| out.write(key.to_bytes(4, "little")) | ||||||||
| out.write(bitmap.serialize()) | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One gap I see from Java: Java's
Suggested change
|
||||||||
| return out.getvalue() | ||||||||
|
|
||||||||
| @staticmethod | ||||||||
| def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": | ||||||||
| import pyarrow as pa | ||||||||
|
|
@@ -76,6 +113,29 @@ def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": | |||||||
| def to_vector(self) -> "pa.ChunkedArray": | ||||||||
| return self._bitmaps_to_chunked_array(self._bitmaps) | ||||||||
|
|
||||||||
| def to_blob(self) -> PuffinBlob: | ||||||||
| vector_payload = self._serialize_bitmap(self._bitmaps) | ||||||||
|
|
||||||||
| # deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian), | ||||||||
| # the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian) | ||||||||
| blob_content = DELETION_VECTOR_MAGIC + vector_payload | ||||||||
| payload = len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big") | ||||||||
|
|
||||||||
| cardinality = sum(len(bitmap) for bitmap in self._bitmaps) | ||||||||
| metadata = PuffinBlobMetadata( | ||||||||
| type="deletion-vector-v1", | ||||||||
| fields=[ROW_POSITION_FIELD_ID], | ||||||||
| # -1 means the snapshot id and sequence number are inherited at commit time | ||||||||
| snapshot_id=-1, | ||||||||
| sequence_number=-1, | ||||||||
| # offset and length are placeholders; PuffinWriter fills them in when assembling the file | ||||||||
| offset=0, | ||||||||
| length=0, | ||||||||
| properties={PROPERTY_REFERENCED_DATA_FILE: self.referenced_data_file, "cardinality": str(cardinality)}, | ||||||||
| compression_codec=None, | ||||||||
| ) | ||||||||
| return PuffinBlob(metadata=metadata, payload=payload) | ||||||||
|
|
||||||||
|
|
||||||||
| def deletion_vectors_from_puffin_file(puffin_file: PuffinFile) -> list[DeletionVector]: | ||||||||
| return [ | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,10 +14,15 @@ | |||||||||
| # KIND, either express or implied. See the License for the | ||||||||||
| # specific language governing permissions and limitations | ||||||||||
| # under the License. | ||||||||||
| import io | ||||||||||
| from dataclasses import dataclass | ||||||||||
| from types import TracebackType | ||||||||||
| from typing import TYPE_CHECKING, Literal | ||||||||||
|
|
||||||||||
| from pydantic import Field | ||||||||||
|
|
||||||||||
| from pyiceberg import __version__ | ||||||||||
| from pyiceberg.io import OutputFile | ||||||||||
| from pyiceberg.typedef import IcebergBaseModel | ||||||||||
| from pyiceberg.utils.deprecated import deprecated | ||||||||||
|
|
||||||||||
|
|
@@ -75,3 +80,73 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: | |||||||||
| from pyiceberg.table.deletion_vector import deletion_vectors_from_puffin_file # local import avoids the cycle | ||||||||||
|
|
||||||||||
| return {dv.referenced_data_file: dv.to_vector() for dv in deletion_vectors_from_puffin_file(self)} | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @dataclass(frozen=True) | ||||||||||
| class PuffinBlob: | ||||||||||
| """A blob to write into a Puffin file: its metadata and serialized payload.""" | ||||||||||
|
|
||||||||||
| metadata: PuffinBlobMetadata | ||||||||||
| payload: bytes | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class PuffinWriter: | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name looks too generic. We could consider renaming it to DeletionVectorWriter or a similar name. I've opened #3491 to extract DV-specific logic from puffin.py.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me. I'll wait for #3491 to land and then rebase this on top of it, renaming the writer to
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||
| """Assembles a Puffin file from blobs and writes it to an output file. | ||||||||||
|
|
||||||||||
| This writer is format-level and blob-agnostic: callers supply already-serialized blobs | ||||||||||
| (for example via DeletionVector.to_blob()). Use it as a context manager; the file is | ||||||||||
| written on exit, after which its size is available via len(output_file). | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| closed: bool | ||||||||||
| _output_file: OutputFile | ||||||||||
| _blobs: list[PuffinBlob] | ||||||||||
| _created_by: str | ||||||||||
|
|
||||||||||
| def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None: | ||||||||||
| self.closed = False | ||||||||||
| self._output_file = output_file | ||||||||||
| self._blobs = [] | ||||||||||
| self._created_by = created_by if created_by is not None else f"PyIceberg version {__version__}" | ||||||||||
|
|
||||||||||
| def __enter__(self) -> "PuffinWriter": | ||||||||||
| """Open the writer.""" | ||||||||||
| return self | ||||||||||
|
|
||||||||||
| def __exit__( | ||||||||||
| self, | ||||||||||
| exc_type: type[BaseException] | None, | ||||||||||
| exc_value: BaseException | None, | ||||||||||
| traceback: TracebackType | None, | ||||||||||
| ) -> None: | ||||||||||
| """Assemble the Puffin file and write it to the output file.""" | ||||||||||
| self.closed = True | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit (non-blocking): since orphaned files get reaped by the maintenance jobs anyway, this is fine as-is, but
Suggested change
WDYT? 🙂 |
||||||||||
|
|
||||||||||
| with io.BytesIO() as out: | ||||||||||
| out.write(MAGIC_BYTES) | ||||||||||
|
|
||||||||||
| blobs_metadata: list[PuffinBlobMetadata] = [] | ||||||||||
| for blob in self._blobs: | ||||||||||
| # offset and length are placeholders on the blob's metadata until the file is assembled here | ||||||||||
| blobs_metadata.append(blob.metadata.model_copy(update={"offset": out.tell(), "length": len(blob.payload)})) | ||||||||||
| out.write(blob.payload) | ||||||||||
|
|
||||||||||
| footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by}) | ||||||||||
| footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") | ||||||||||
|
|
||||||||||
| out.write(MAGIC_BYTES) | ||||||||||
| out.write(footer_payload_bytes) | ||||||||||
| out.write(len(footer_payload_bytes).to_bytes(4, "little")) | ||||||||||
| out.write((0).to_bytes(4, "little")) # flags | ||||||||||
| out.write(MAGIC_BYTES) | ||||||||||
|
|
||||||||||
| puffin_bytes = out.getvalue() | ||||||||||
|
|
||||||||||
| with self._output_file.create(overwrite=True) as output_stream: | ||||||||||
| output_stream.write(puffin_bytes) | ||||||||||
|
|
||||||||||
| def add_blob(self, blob: PuffinBlob) -> "PuffinWriter": | ||||||||||
| if self.closed: | ||||||||||
| raise RuntimeError("Cannot add blob to closed Puffin writer") | ||||||||||
| self._blobs.append(blob) | ||||||||||
| return self | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
|
|
||
| from pyiceberg import __version__ | ||
| from pyiceberg.io.pyarrow import PyArrowFileIO | ||
| from pyiceberg.table.deletion_vector import DeletionVector, deletion_vectors_from_puffin_file | ||
| from pyiceberg.table.puffin import MAGIC_BYTES, PuffinFile, PuffinWriter | ||
|
|
||
|
|
||
| def _write(tmp_path: Path, *deletion_vectors: DeletionVector, created_by: str | None = None) -> Path: | ||
| puffin_path = tmp_path / "test.puffin" | ||
| with PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by) as writer: | ||
| for dv in deletion_vectors: | ||
| writer.add_blob(dv.to_blob()) | ||
| return puffin_path | ||
|
|
||
|
|
||
| def test_puffin_writer_round_trips_single_blob(tmp_path: Path) -> None: | ||
| positions = [0, 1, 5, (1 << 32) + 7] | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", positions)) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| dvs = deletion_vectors_from_puffin_file(reader) | ||
|
|
||
| assert len(dvs) == 1 | ||
| assert dvs[0].referenced_data_file == "file.parquet" | ||
| assert dvs[0].to_vector().to_pylist() == sorted(positions) | ||
|
|
||
|
|
||
| def test_puffin_writer_round_trips_multiple_blobs(tmp_path: Path) -> None: | ||
| puffin_path = _write( | ||
| tmp_path, | ||
| DeletionVector.from_positions("file1.parquet", [1, 2, 3]), | ||
| DeletionVector.from_positions("file2.parquet", [4, 5, 6]), | ||
| ) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| dvs = deletion_vectors_from_puffin_file(reader) | ||
|
|
||
| assert {dv.referenced_data_file: dv.to_vector().to_pylist() for dv in dvs} == { | ||
| "file1.parquet": [1, 2, 3], | ||
| "file2.parquet": [4, 5, 6], | ||
| } | ||
|
|
||
|
|
||
| def test_puffin_writer_writes_magic_bytes_and_offsets(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1, 2, 3])) | ||
| puffin_bytes = puffin_path.read_bytes() | ||
|
|
||
| assert puffin_bytes[:4] == MAGIC_BYTES | ||
| assert puffin_bytes[-4:] == MAGIC_BYTES | ||
|
|
||
| blob = PuffinFile(puffin_bytes).footer.blobs[0] | ||
| # PuffinWriter fills in the placeholder offset and length while assembling the file | ||
| assert blob.offset > 0 | ||
| assert blob.length > 0 | ||
|
|
||
|
|
||
| def test_puffin_writer_default_created_by(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1])) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.properties["created-by"] == f"PyIceberg version {__version__}" | ||
|
|
||
|
|
||
| def test_puffin_writer_custom_created_by(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1]), created_by="my-test-app") | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.properties["created-by"] == "my-test-app" | ||
|
|
||
|
|
||
| def test_puffin_writer_file_size_via_output_file(tmp_path: Path) -> None: | ||
| puffin_path = tmp_path / "test.puffin" | ||
| output_file = PyArrowFileIO().new_output(str(puffin_path)) | ||
| with PuffinWriter(output_file) as writer: | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [1, 2, 3]).to_blob()) | ||
|
|
||
| assert len(output_file) == len(puffin_path.read_bytes()) | ||
|
|
||
|
|
||
| def test_puffin_writer_empty(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.blobs == [] | ||
| assert deletion_vectors_from_puffin_file(reader) == [] | ||
|
|
||
|
|
||
| def test_add_blob_to_closed_writer_raises(tmp_path: Path) -> None: | ||
| output_file = PyArrowFileIO().new_output(str(tmp_path / "test.puffin")) | ||
| writer = PuffinWriter(output_file) | ||
| with writer: | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [1]).to_blob()) | ||
|
|
||
| with pytest.raises(RuntimeError, match="Cannot add blob to closed Puffin writer"): | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [2]).to_blob()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
from_positionsguardsposition < 0but not the upper end, so a wildly out-of-range position blows up on therange(max(...) + 1)gap-fill (OOM) instead of erroring cleanly. Java'svalidatePositioncaps it atMAX_POSITION. Do we want to mirror that?with
MAX_POSITION = ((MAX_JAVA_SIGNED - 1) << 32) | 0x80000000(= Java'stoPosition(Integer.MAX_VALUE - 1, Integer.MIN_VALUE)).