Last active
May 18, 2021 02:52
-
-
Save brandonscript/1fea94a9c11bbadad246655a99dd160f to your computer and use it in GitHub Desktop.
WalkPath: A wrapper for pathlib.Path – deeply search the specified path and return all files and dirs (using os.walk), mapped to WalkPath objects that maintain a hierarchical matrix of pathlib.Path objects.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from pathlib import Path | |
from typing import Union, Iterable | |
def is_sys_file(path: Union[str, 'Path', 'WalkPath']) -> bool: | |
"""Checks to see if the path provided is a system file. | |
Args: | |
path (str or Pathlike): Path to check | |
Returns: | |
bool: True if it's a system file, otherwise False | |
""" | |
try: | |
# Try to coerce a Pathlike object to a string | |
x = path.name | |
except: | |
x = str(path) | |
return (x.lower().startswith('.') | |
or x.lower() in [sys.lower() for sys in ['Thumbs.db', '@eaDir']]) | |
class WalkPath(Path): | |
_flavour = type(Path())._flavour | |
def __init__(self, *args, origin: 'Path' = None, dirs: [] = None, files: [] = None): | |
"""Initialize WalkPath object. | |
Args: | |
dirs (list): Dirs provided by os.walk(), defauls to [] | |
files (list): Files provided by os.walk(), defaults to [] | |
""" | |
super().__init__() | |
if origin: | |
self.origin = Path(origin) | |
elif len(args) > 0: | |
try: | |
self.origin = Path(args[0].origin) or Path(args[0]) | |
except: | |
self.origin = Path(self) | |
if not self.origin: | |
raise AttributeError(f"Could not infer 'origin' when initializing 'WalkPath' for path '{args[0]}'") | |
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None | |
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None | |
# @overrides(__reduce__) | |
def __reduce__(self): | |
# From super() | |
# Using the parts tuple helps share internal path parts | |
# when pickling related paths. | |
# return (self.__class__, tuple(self._parts)) | |
# This override passes its parts to a Path object (which | |
# natively pickles), then serializes and applies | |
# its remaining attributes. | |
args = {**{'_parts': self._parts}, **self.__dict__} | |
return (self.__class__._from_kwargs, tuple(args.items())) | |
# @overrides(_make_child) | |
def _make_child(self, args): | |
drv, root, parts = self._parse_args(args) | |
drv, root, parts = self._flavour.join_parsed_parts( | |
self._drv, self._root, self._parts, drv, root, parts) | |
new = self._from_parsed_parts(drv, root, parts) | |
new._origin = self._origin | |
return new | |
# @overrrides(iterdir) | |
def iterdir(self) -> Iterable['WalkPath']: | |
yield from [WalkPath(x, origin=self.origin) for x in super().iterdir()] | |
# @overrides(joinpath) | |
def joinpath(self, path) -> 'WalkPath': | |
joined = WalkPath(super().joinpath(path), origin=self.origin) | |
return joined | |
# @overrides(relative_to) | |
def relative_to(self, path) -> 'WalkPath': | |
rel = WalkPath(super().relative_to(path), origin=self.origin) | |
return rel | |
# @overrrides(parent) | |
@property | |
def parent(self) -> 'WalkPath': | |
return WalkPath(super().parent, origin=self.origin) | |
# @overrrides(parents) | |
@property | |
def parents(self) -> ['WalkPath']: | |
return [WalkPath(p, origin=self.origin) for p in super().parents] | |
@classmethod | |
def _from_kwargs(cls, *args): | |
kwargs = dict(args) | |
new = cls(super().__new__(cls, | |
*kwargs['_parts']), | |
origin=kwargs['origin']) | |
new.__dict__ = {**new.__dict__, **kwargs} | |
return new | |
@property | |
def dirs(self) -> ['Walkpath']: | |
self._dirs = (self._dirs or | |
[WalkPath(d) for d in self.iterdir() if d.is_dir()] | |
if self.is_absolute() and self.is_dir() else []) | |
return self._dirs | |
@property | |
def files(self) -> ['Walkpath']: | |
self._files = (self._files or | |
[WalkPath(f) for f in self.iterdir() if f.is_file()] | |
if self.is_absolute() and self.is_dir() else []) | |
return self._files | |
@property | |
def is_terminus(self) -> bool: | |
return self.is_file() or not self.dirs | |
class Find: | |
@staticmethod | |
def deep(path: Union[str, Path, 'WalkPath'], | |
hide_sys_files=True) -> ['WalkPath']: | |
origin = WalkPath(path) | |
if origin.is_file(): | |
return [origin] | |
for root,dirs,files in os.walk(path): | |
if hide_sys_files: | |
files = filter(lambda f: not is_sys_file(f), files) | |
this = WalkPath(root, origin=origin) | |
this._dirs = [this.joinpath(d) for d in dirs] | |
this._files = [this.joinpath(f) for f in files] | |
dirs = this.dirs | |
files = this.files | |
yield this | |
yield from files | |
@staticmethod | |
def deep_sorted(path: Union[str, Path, 'WalkPath'], | |
sort_key=lambda p: str(p).lower(), | |
hide_sys_files=True) -> ['WalkPath']: | |
yield from sorted(Find.deep(path, | |
hide_sys_files=hide_sys_files), | |
key=sort_key) | |
@staticmethod | |
def shallow(path: Union[str, Path, 'WalkPath'], | |
sort_key=lambda p: str(p).lower(), | |
hide_sys_files=True) -> Iterable['WalkPath']: | |
origin = WalkPath(path) | |
if origin.is_file(): | |
return [origin] | |
for p in sorted(origin.iterdir(), key=sort_key): | |
if hide_sys_files and is_sys_file(p): | |
continue | |
yield WalkPath(p, origin=origin) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import pathlib | |
from pathlib import Path | |
from typing import Union | |
class WalkPath(Path): | |
_flavour = type(Path())._flavour | |
def __init__(self, *args, dirs: []=None, files: []=None): | |
"""Initialize WalkPath object. | |
Args: | |
dirs (list): Dirs provided by os.walk(), defauls to [] | |
files (list): Files provided by os.walk(), defaults to [] | |
""" | |
super().__init__() | |
self._dirs: [WalkPath] = list(map(WalkPath, dirs)) if dirs else None | |
self._files: [WalkPath] = list(map(WalkPath, files)) if files else None | |
# @overrides(joinpath) | |
def joinpath(self, path) -> 'WalkPath': | |
joined = WalkPath(super().joinpath(path)) | |
return joined | |
# @overrides(relative_to) | |
def relative_to(self, path) -> 'WalkPath': | |
rel = WalkPath(super().relative_to(path)) | |
return rel | |
# @overrrides(parent) | |
@property | |
def parent(self) -> 'WalkPath': | |
return WalkPath(super().parent) | |
# @overrrides(parents) | |
@property | |
def parents(self) -> ['WalkPath']: | |
return [WalkPath(p) for p in super().parents] | |
@property | |
def dirs(self) -> ['Walkpath']: | |
self._dirs = (self._dirs or | |
[WalkPath(d) for d in self.iterdir() if d.is_dir()] | |
if self.is_absolute() and self.is_dir() else []) | |
return self._dirs | |
@property | |
def files(self) -> ['Walkpath']: | |
self._files = (self._files or | |
[WalkPath(f) for f in self.iterdir() if f.is_file()] | |
if self.is_absolute() and self.is_dir() else []) | |
return self._files | |
@property | |
def is_terminus(self) -> bool: | |
return self.is_file() or not self.dirs | |
class Find: | |
@staticmethod | |
def deep(path: Union[str, Path, 'WalkPath'], | |
hide_sys_files=True) -> ['WalkPath']: | |
"""Deeply search the specified path and return all files and dirs | |
(using os.walk), mapped to WalkPath objects that maintain a | |
hierarchical matrix of pathlib.Path objects. Note: the original | |
path root will always be included in this list. If sorting | |
alphabetically, use [1:] to remove the first element (root). | |
If path provided is a single file, return a list with it. | |
Args: | |
path (str or Path): Root path to search for files. | |
sort_key (lambda, optional): Sort function, defaults | |
to alpha case-insensitive. | |
hide_sys_files (bool): Hide system files. Default is True. | |
Returns: | |
A filtered list of files or an empty list. | |
""" | |
if Path(path).is_file(): | |
return [WalkPath(path)] | |
for root,dirs,files in os.walk(WalkPath(path)): | |
if hide_sys_files: | |
files = filter(lambda f: | |
not f.startswith('.') | |
and not f.lower() == 'thumbs.db', files) | |
this = WalkPath(root) | |
this._dirs = [this.joinpath(d) for d in dirs] | |
this._files = [this.joinpath(f) for f in files] | |
dirs = this.dirs | |
files = this.files | |
yield this | |
yield from files | |
@staticmethod | |
def deep_sorted(path: Union[str, Path, 'WalkPath'], | |
sort_key=lambda p: str(p).lower(), | |
hide_sys_files=True) -> ['WalkPath']: | |
yield from sorted(Find.deep(root, | |
hide_sys_files=hide_sys_files), | |
key=sort_key) | |
root = WalkPath('tests/files').resolve() | |
for p in Find.deep_sorted(root): | |
print(p, | |
p.exists(), | |
len(p.dirs) if len(p.dirs) > 10 else p.dirs, | |
len(p.files), | |
p.parent.name, | |
len(p.parent.dirs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment