Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/docx/opc/phys_pkg.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Provides a general interface to a `physical` OPC package, such as a zip file."""

import os
from zipfile import ZIP_DEFLATED, ZipFile, is_zipfile
import zlib
from zipfile import ZIP_DEFLATED, BadZipFile, ZipFile, is_zipfile

from docx.opc.exceptions import PackageNotFoundError
from docx.opc.packuri import CONTENT_TYPES_URI
Expand Down Expand Up @@ -73,14 +74,24 @@ class _ZipPkgReader(PhysPkgReader):

def __init__(self, pkg_file):
super(_ZipPkgReader, self).__init__()
self._zipf = ZipFile(pkg_file, "r")
try:
self._zipf = ZipFile(pkg_file, "r")
except BadZipFile as e:
raise PackageNotFoundError("Package is not a valid zip file: %s" % e) from e

def blob_for(self, pack_uri):
"""Return blob corresponding to `pack_uri`.

Raises |ValueError| if no matching member is present in zip archive.
Raises |PackageNotFoundError| if the zip entry cannot be read due to corruption,
truncation, or encryption.
"""
return self._zipf.read(pack_uri.membername)
try:
return self._zipf.read(pack_uri.membername)
except (BadZipFile, zlib.error, EOFError, RuntimeError) as e:
raise PackageNotFoundError(
"Package member '%s' could not be read: %s" % (pack_uri.membername, e)
) from e

def close(self):
"""Close the zip archive, releasing any resources it is using."""
Expand Down
48 changes: 47 additions & 1 deletion tests/opc/test_phys_pkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import hashlib
import io
from zipfile import ZIP_DEFLATED, ZipFile
import struct
import zlib
from zipfile import ZIP_DEFLATED, BadZipFile, ZipFile

import pytest

Expand Down Expand Up @@ -70,6 +72,50 @@ def it_raises_when_pkg_path_is_not_a_package(self):


class DescribeZipPkgReader:
def it_raises_PackageNotFoundError_when_stream_is_not_a_zip(self):
with pytest.raises(PackageNotFoundError, match="not a valid zip file"):
_ZipPkgReader(io.BytesIO(b"not a zip file"))

def it_raises_PackageNotFoundError_when_blob_has_bad_crc(self):
"""BadZipFile (CRC mismatch) from ZipFile.read() is wrapped."""
# Build a zip with a large enough payload, then flip a byte in the
# middle of the compressed data to cause a CRC mismatch at read time.
buf = io.BytesIO()
with ZipFile(buf, "w", compression=ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", b"<Types/>" * 50)
raw = bytearray(buf.getvalue())
sig_pos = raw.find(b"PK\x03\x04")
fname_len = struct.unpack_from("<H", raw, sig_pos + 26)[0]
extra_len = struct.unpack_from("<H", raw, sig_pos + 28)[0]
data_start = sig_pos + 30 + fname_len + extra_len
compressed_size = struct.unpack_from("<I", raw, sig_pos + 18)[0]
mid = data_start + compressed_size // 2
raw[mid] ^= 0xFF
reader = _ZipPkgReader(io.BytesIO(bytes(raw)))
pack_uri = PackURI("/[Content_Types].xml")
with pytest.raises(PackageNotFoundError, match="could not be read"):
reader.blob_for(pack_uri)

def it_raises_PackageNotFoundError_when_zlib_data_is_corrupt(self):
"""zlib.error from ZipFile.read() is wrapped in PackageNotFoundError."""
# Corrupt the zlib header bytes of the compressed payload so that
# decompression fails with zlib.error before the CRC is checked.
buf = io.BytesIO()
with ZipFile(buf, "w", compression=ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", b"<Types/>" * 1000)
raw = bytearray(buf.getvalue())
sig_pos = raw.find(b"PK\x03\x04")
fname_len = struct.unpack_from("<H", raw, sig_pos + 26)[0]
extra_len = struct.unpack_from("<H", raw, sig_pos + 28)[0]
data_start = sig_pos + 30 + fname_len + extra_len
# Flip the first two bytes of the zlib stream (the zlib header).
raw[data_start] ^= 0xFF
raw[data_start + 1] ^= 0xFF
reader = _ZipPkgReader(io.BytesIO(bytes(raw)))
pack_uri = PackURI("/[Content_Types].xml")
with pytest.raises(PackageNotFoundError, match="could not be read"):
reader.blob_for(pack_uri)

def it_is_used_by_PhysPkgReader_when_pkg_is_a_zip(self):
phys_reader = PhysPkgReader(zip_pkg_path)
assert isinstance(phys_reader, _ZipPkgReader)
Expand Down