Skip to content

Instantly share code, notes, and snippets.

@jaraco
Created September 2, 2024 13:46
Show Gist options
  • Save jaraco/5bceae062a5eafb1310cc4668bbc4eaf to your computer and use it in GitHub Desktop.
Save jaraco/5bceae062a5eafb1310cc4668bbc4eaf to your computer and use it in GitHub Desktop.
diff --git a/Lib/zipfile.py b/Lib/zipfile.py
index 9b66a9f054d..b21889694da 100644
--- a/Lib/zipfile.py
+++ b/Lib/zipfile.py
@@ -2188,10 +2188,33 @@ def _difference(minuend, subtrahend):
return itertools.filterfalse(set(subtrahend).__contains__, minuend)
-class CompleteDirs(ZipFile):
+class InitializedState:
+ """
+ Mix-in to save the initialization state for pickling.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.__args = args
+ self.__kwargs = kwargs
+ super().__init__(*args, **kwargs)
+
+ def __getstate__(self):
+ return self.__args, self.__kwargs
+
+ def __setstate__(self, state):
+ args, kwargs = state
+ super().__init__(*args, **kwargs)
+
+
+class CompleteDirs(InitializedState, ZipFile):
"""
A ZipFile subclass that ensures that implied directories
are always included in the namelist.
+
+ >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
+ ['foo/', 'foo/bar/']
+ >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
+ ['foo/']
"""
@staticmethod
@@ -2201,7 +2224,7 @@ def _implied_dirs(names):
return _dedupe(_difference(as_dirs, names))
def namelist(self):
- names = super(CompleteDirs, self).namelist()
+ names = super().namelist()
return names + list(self._implied_dirs(names))
def _name_set(self):
@@ -2247,6 +2270,16 @@ def make(cls, source):
source.__class__ = cls
return source
+ @classmethod
+ def inject(cls, zf: ZipFile) -> ZipFile:
+ """
+ Given a writable zip file zf, inject directory entries for
+ any directories implied by the presence of children.
+ """
+ for name in cls._implied_dirs(zf.namelist()):
+ zf.writestr(name, b"")
+ return zf
+
class FastLookup(CompleteDirs):
"""
@@ -2257,24 +2290,29 @@ class FastLookup(CompleteDirs):
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
- self.__names = super(FastLookup, self).namelist()
+ self.__names = super().namelist()
return self.__names
def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
- self.__lookup = super(FastLookup, self)._name_set()
+ self.__lookup = super()._name_set()
return self.__lookup
def _extract_text_encoding(encoding=None, *args, **kwargs):
- # stacklevel=3 so that the caller of the caller see any warning.
- return io.text_encoding(encoding, 3), args, kwargs
+ # compute stack level so that the caller of the caller sees any warning.
+ is_pypy = sys.implementation.name == 'pypy'
+ stack_level = 3 + is_pypy
+ return io.text_encoding(encoding, stack_level), args, kwargs
class Path:
"""
- A pathlib-compatible interface for zip files.
+ A :class:`importlib.resources.abc.Traversable` interface for zip files.
+
+ Implements many of the features users enjoy from
+ :class:`pathlib.Path`.
Consider a zip file with this structure::
@@ -2294,13 +2332,13 @@ class Path:
Path accepts the zipfile object itself or a filename
- >>> root = Path(zf)
+ >>> path = Path(zf)
From there, several path operations are available.
Directory iteration (including the zip file itself):
- >>> a, b = root.iterdir()
+ >>> a, b = path.iterdir()
>>> a
Path('mem/abcde.zip', 'a.txt')
>>> b
@@ -2321,7 +2359,7 @@ class Path:
Read text:
- >>> c.read_text()
+ >>> c.read_text(encoding='utf-8')
'content of c'
existence:
@@ -2338,16 +2376,38 @@ class Path:
'mem/abcde.zip/b/c.txt'
At the root, ``name``, ``filename``, and ``parent``
- resolve to the zipfile. Note these attributes are not
- valid and will raise a ``ValueError`` if the zipfile
- has no filename.
+ resolve to the zipfile.
- >>> root.name
+ >>> str(path)
+ 'mem/abcde.zip/'
+ >>> path.name
'abcde.zip'
- >>> str(root.filename).replace(os.sep, posixpath.sep)
- 'mem/abcde.zip'
- >>> str(root.parent)
+ >>> path.filename == pathlib.Path('mem/abcde.zip')
+ True
+ >>> str(path.parent)
'mem'
+
+ If the zipfile has no filename, such attributes are not
+ valid and accessing them will raise an Exception.
+
+ >>> zf.filename = None
+ >>> path.name
+ Traceback (most recent call last):
+ ...
+ TypeError: ...
+
+ >>> path.filename
+ Traceback (most recent call last):
+ ...
+ TypeError: ...
+
+ >>> path.parent
+ Traceback (most recent call last):
+ ...
+ TypeError: ...
+
+ # workaround python/cpython#106763
+ >>> pass
"""
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
@@ -2365,6 +2425,18 @@ def __init__(self, root, at=""):
self.root = FastLookup.make(root)
self.at = at
+ def __eq__(self, other):
+ """
+ >>> Path(ZipFile(io.BytesIO(), 'w')) == 'foo'
+ False
+ """
+ if self.__class__ is not other.__class__:
+ return NotImplemented
+ return (self.root, self.at) == (other.root, other.at)
+
+ def __hash__(self):
+ return hash((self.root, self.at))
+
def open(self, mode='r', *args, pwd=None, **kwargs):
"""
Open this entry as text or binary following the semantics
@@ -2385,9 +2457,24 @@ def open(self, mode='r', *args, pwd=None, **kwargs):
encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
return io.TextIOWrapper(stream, encoding, *args, **kwargs)
+ def _base(self):
+ return pathlib.PurePosixPath(self.at or self.root.filename)
+
@property
def name(self):
- return pathlib.Path(self.at).name or self.filename.name
+ return self._base().name
+
+ @property
+ def suffix(self):
+ return self._base().suffix
+
+ @property
+ def suffixes(self):
+ return self._base().suffixes
+
+ @property
+ def stem(self):
+ return self._base().stem
@property
def filename(self):
@@ -2423,6 +2510,32 @@ def iterdir(self):
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)
+ def match(self, path_pattern):
+ return pathlib.PurePosixPath(self.at).match(path_pattern)
+
+ def is_symlink(self):
+ """
+ Return whether this path is a symlink.
+ """
+ info = self.root.getinfo(self.at)
+ mode = info.external_attr >> 16
+ return stat.S_ISLNK(mode)
+
+ def glob(self, pattern):
+ if not pattern:
+ raise ValueError(f"Unacceptable pattern: {pattern!r}")
+
+ prefix = re.escape(self.at)
+ tr = Translator(seps='/')
+ matches = re.compile(prefix + tr.translate(pattern)).fullmatch
+ return map(self._next, filter(matches, self.root.namelist()))
+
+ def rglob(self, pattern):
+ return self.glob(f'**/{pattern}')
+
+ def relative_to(self, other, *extra):
+ return posixpath.relpath(str(self), str(other.joinpath(*extra)))
+
def __str__(self):
return posixpath.join(self.root.filename, self.at)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment