Last active
January 30, 2020 20:22
-
-
Save sritchie/02d80b6af62305158b97f416188675b0 to your computer and use it in GitHub Desktop.
Content addressable file manager, rewritten to use pyfilesystem2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import io | |
import os | |
from contextlib import closing | |
from typing import Iterable, Optional, Tuple, Union | |
import fs as pyfs | |
import util as u | |
from fs.permissions import Permissions | |
Key = Union[str, u.HashAddress] | |
class HashFS(object): | |
"""Content addressable file manager, rewritten from | |
https://github.com/dgilland/hashfs to use pyfilesystem2. | |
Attributes: | |
root: Directory path used as root of storage space. | |
depth (int, optional): Depth of subfolders to create when saving a | |
file. | |
width (int, optional): Width of each subfolder to create when saving a | |
file. | |
algorithm (str): Hash algorithm to use when computing file hash. | |
Algorithm should be available in ``hashlib`` module, ie, a member | |
of `hashlib.algorithms_available`. Defaults to ``'sha256'``. | |
dmode (int, optional): Directory mode permission to set for | |
subdirectories. Defaults to ``0o755`` which allows owner/group to | |
read/write and everyone else to read and everyone to execute. | |
""" | |
def __init__(self, | |
root: Union[pyfs.base.FS, str], | |
depth: Optional[int] = 4, | |
width: Optional[int] = 1, | |
algorithm: hashlib.algorithms_available = "sha256", | |
dmode: Optional[int] = 0o755): | |
self.fs = u.load_fs(root) | |
self.depth = depth | |
self.width = width | |
self.algorithm = algorithm | |
self.dmode = dmode | |
def put(self, content, extension: Optional[str] = None) -> u.HashAddress: | |
"""Store contents of `content` in the backing filesystem using its content hash | |
for the address. | |
Args: | |
content: Readable object or path to file. | |
extension: Optional extension to append to file when saving. | |
Returns: | |
File's hash address. | |
""" | |
with closing(u.Stream(content, fs=self.fs)) as stream: | |
hashid = self._computehash(stream) | |
path, is_duplicate = self._copy(stream, hashid, extension) | |
return u.HashAddress(hashid, path, is_duplicate) | |
def get(self, k: Key) -> Optional[u.HashAddress]: | |
"""Return :class:`HashAddress` from given id or path. If `k` does not refer to | |
a valid file, then `None` is returned. | |
Args: | |
k: Address ID or path of file. | |
Returns: | |
File's hash address. | |
""" | |
path = self._fs_path(k) | |
if path is None: | |
return None | |
return u.HashAddress(self._unshard(path), path) | |
def open(self, k: Key, mode: str = "rb") -> io.IOBase: | |
"""Return open IOBase object from given id or path. | |
Args: | |
k: Address ID or path of file. | |
mode (str, optional): Mode to open file in. Defaults to ``'rb'``. | |
Returns: | |
Buffer: An ``io`` buffer dependent on the `mode`. | |
Raises: | |
IOError: If file doesn't exist. | |
""" | |
path = self._fs_path(k) | |
if path is None: | |
raise IOError("Could not locate file: {0}".format(k)) | |
return self.fs.open(path, mode) | |
def delete(self, k: Key) -> None: | |
"""Delete file using id or path. Remove any empty directories after | |
deleting. No exception is raised if file doesn't exist. | |
Args: | |
file (str): Address ID or path of file. | |
""" | |
path = self._fs_path(k) | |
if path is None: | |
return | |
try: | |
self.fs.remove(path) | |
except OSError: # pragma: no cover | |
# Attempting to delete a directory. | |
pass | |
else: | |
self._remove_empty(pyfs.path.dirname(path)) | |
def files(self) -> Iterable[str]: | |
"""Return generator that yields all files in the :attr:`fs`. | |
""" | |
return self.fs.walk.files() | |
def folders(self) -> Iterable[str]: | |
"""Return generator that yields all directories in the :attr:`fs` that contain | |
files. | |
""" | |
for step in self.fs.walk(): | |
if step.files: | |
yield step.path | |
def count(self) -> int: | |
"""Return count of the number of files in the backing :attr:`fs`. | |
""" | |
return sum(1 for _, info in self.fs.walk.info() if not info.is_file) | |
def size(self) -> int: | |
"""Return the total size in bytes of all files in the :attr:`root` | |
directory. | |
""" | |
return sum(info.size | |
for _, info in self.fs.walk.info(namespaces=['details']) | |
if not info.is_dir) | |
def exists(self, k: Key) -> bool: | |
"""Check whether a given file id or path exists on disk.""" | |
return bool(self._fs_path(k)) | |
def repair(self, extensions: bool = True) -> Iterable[str]: | |
"""Repair any file locations whose content address doesn't match it's | |
file path. | |
""" | |
repaired = [] | |
corrupted = self._corrupted(extensions=extensions) | |
for path, address in corrupted: | |
if self.fs.isfile(path): | |
# File already exists so just delete corrupted path. | |
self.fs.remove(path) | |
else: | |
# File doesn't exist, so move it. | |
self._makedirs(pyfs.path.dirname(path)) | |
self.fs.move(path, address.relpath) | |
repaired.append((path, address)) | |
# check for empty directories created by the repair. | |
for d in {pyfs.path.dirname(p) for p, _ in repaired}: | |
self._remove_empty(d) | |
return repaired | |
def __contains__(self, k: Key) -> bool: | |
"""Return whether a given file id or path is contained in the | |
:attr:`root` directory. | |
""" | |
return self.exists(k) | |
def __iter__(self) -> Iterable[str]: | |
"""Iterate over all files in the backing store.""" | |
return self.files() | |
def __len__(self) -> int: | |
"""Return count of the number of files tracked by the backing filesystem. | |
""" | |
return self.count() | |
def _computehash(self, stream: u.Stream) -> str: | |
"""Compute hash of file using :attr:`algorithm`.""" | |
return u.computehash(stream, self.algorithm) | |
def _copy(self, | |
stream: u.Stream, | |
hashid: str, | |
extension: Optional[str] = None): | |
"""Copy the contents of `stream` onto disk with an optional file extension | |
appended. | |
Returns a pair of | |
- relative path, | |
- boolean noting whether or not we have a duplicate. | |
""" | |
path = self._hashid_to_path(hashid, extension) | |
if self.fs.isfile(path): | |
is_duplicate = True | |
else: | |
# Only move file if it doesn't already exist. | |
is_duplicate = False | |
self._makedirs(pyfs.path.dirname(path)) | |
with closing(self.fs.open(path, mode='wb')) as p: | |
for data in stream: | |
p.write(u.to_bytes(data)) | |
return (path, is_duplicate) | |
def _remove_empty(self, path: str) -> None: | |
"""Successively remove all empty folders starting with `subpath` and | |
proceeding "up" through directory tree until reaching the :attr:`root` | |
folder. | |
""" | |
try: | |
pyfs.tools.remove_empty(self.fs, path) | |
except pyfs.errors.ResourceNotFound: | |
# Guard against paths that don't exist in the FS. | |
return None | |
def _makedirs(self, dir_path): | |
"""Physically create the folder path on disk.""" | |
try: | |
# this is creating a directory, so we use dmode here. | |
perms = Permissions.create(self.dmode) | |
self.fs.makedirs(dir_path, permissions=perms, recreate=True) | |
except pyfs.errors.DirectoryExpected: | |
assert self.fs.isdir(dir_path), f"expected {dir_path} to be a directory" | |
def _fs_path(self, k: Union[str, u.HashAddress]) -> Optional[str]: | |
"""Attempt to determine the real path of a file id or path through successive | |
checking of candidate paths. If the real path is stored with an extension, | |
the path is considered a match if the basename matches 'the expected file | |
path of the id. | |
""" | |
# if the input is ALREADY a hash address, pull out the relative path. | |
if isinstance(k, u.HashAddress): | |
k = k.relpath | |
# Check if input was a fs path already. | |
if self.fs.isfile(k): | |
return k | |
# Check if input was an ID. | |
filepath = self._hashid_to_path(k) | |
if self.fs.isfile(filepath): | |
return filepath | |
# Check the generated filepath to see if any version of the path exist with | |
# some extension; return that if it exists.. | |
paths = self.fs.glob("{0}.*".format(filepath)) | |
if paths.count().files > 0: | |
return next(iter(paths)).path | |
# Could not determine a match. | |
return None | |
def _hashid_to_path(self, hashid: str, extension: str = "") -> str: | |
"""Build the relative file path for a given hash id. Optionally, append a file | |
extension. | |
""" | |
paths = self._shard(hashid) | |
if extension and not extension.startswith(os.extsep): | |
extension = os.extsep + extension | |
elif not extension: | |
extension = "" | |
return pyfs.path.join(*paths) + extension | |
def _shard(self, hashid: str) -> str: | |
"""Shard content ID into subfolders.""" | |
return u.shard(hashid, self.depth, self.width) | |
def _unshard(self, path: str) -> str: | |
"""Unshard path to determine hash value.""" | |
if not self.fs.isfile(path): | |
raise ValueError("Cannot unshard path. The path {0!r} doesn't exist" | |
"in the filesystem. {1!r}") | |
return pyfs.path.splitext(path)[0].replace(os.sep, "") | |
def _corrupted(self, extensions: bool = True | |
) -> Iterable[Tuple[str, u.HashAddress]]: | |
"""Return generator that yields corrupted files as ``(path, address)``, where | |
``path`` is the path of the corrupted file and ``address`` is the | |
:class:`HashAddress` of the expected location. | |
""" | |
for path in self.files(): | |
with closing(u.Stream(path, fs=self.fs)) as stream: | |
hashid = self._computehash(stream) | |
extension = pyfs.path.splitext(path)[1] if extensions else None | |
expected_path = self._hashid_to_path(hashid, extension) | |
if pyfs.path.abspath(expected_path) != pyfs.path.abspath(path): | |
yield ( | |
path, | |
u.HashAddress(hashid, expected_path), | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Utilities for sharding etc.""" | |
import hashlib | |
import io | |
import os | |
from collections import namedtuple | |
from typing import Any, List, Optional, Union | |
import fs as pyfs | |
from absl import logging | |
from fs.base import FS | |
def compact(items: List[Optional[Any]]) -> List[Any]: | |
"""Return only truthy elements of `items`.""" | |
return [item for item in items if item] | |
def to_bytes(item: Union[str, bytes]) -> bytes: | |
"""Accepts either a bytes instance or a string; if str, returns a bytes | |
instance, else acts as identity. | |
""" | |
ret = item | |
if not isinstance(item, bytes): | |
ret = bytes(item, "utf8") | |
return ret | |
def computehash(stream, algorithm: hashlib.algorithms_available) -> str: | |
"""Compute hash of file using the supplied `algorithm`.""" | |
hashobj = hashlib.new(algorithm) | |
for data in stream: | |
hashobj.update(to_bytes(data)) | |
return hashobj.hexdigest() | |
def shard(digest: str, depth: int, width: int) -> str: | |
"""This creates a list of `depth` number of tokens with width `width` from the | |
first part of the id plus the remainder. | |
TODO examine Clojure's Blocks to see if there's some nicer style here. | |
""" | |
first = [digest[i * width:width * (i + 1)] for i in range(depth)] | |
remaining = [digest[depth * width:]] | |
return compact(first + remaining) | |
def load_fs(root: Union[FS, str]) -> FS: | |
"""If str is supplied, returns an instance of OSFS, backed by the filesystem.""" | |
if isinstance(root, str): | |
return pyfs.open_fs(root) | |
if isinstance(root, FS): | |
return root | |
raise Exception("Not a filesystem or path!") | |
def syspath(fs: FS, path: str) -> Optional[str]: | |
"""""" | |
try: | |
return fs.getsyspath(path) | |
except pyfs.errors.NoSysPath: | |
logging.error("Can't get a path.") | |
# TODO add the hash here | |
# TODO add a to and from string method | |
class HashAddress(namedtuple("HashAddress", ["id", "relpath", "is_duplicate"])): | |
"""File address containing file's path on disk and it's content hash ID. | |
Attributes: | |
id (str): Hash ID (hexdigest) of file contents. | |
relpath (str): Relative path location to :attr:`HashFS.root`. | |
is_duplicate (boolean, optional): Whether the hash address created was | |
a duplicate of a previously existing file. Can only be ``True`` | |
after a put operation. Defaults to ``False``. | |
""" | |
def __new__(cls, id, relpath, is_duplicate=False): | |
return super(HashAddress, cls).__new__(cls, id, relpath, is_duplicate) | |
def __eq__(self, obj): | |
return isinstance(obj, HashAddress) and \ | |
obj.id == self.id and \ | |
obj.relpath == self.relpath | |
# TODO examine, allow this to handle wrapping another stream in addition to itself. | |
class Stream(object): | |
"""Common interface for file-like objects. | |
The input `obj` can be a file-like object or a path to a file. If `obj` is | |
a path to a file, then it will be opened until :meth:`close` is called. | |
If `obj` is a file-like object, then it's original position will be | |
restored when :meth:`close` is called instead of closing the object | |
automatically. Closing of the stream is deferred to whatever process passed | |
the stream in. | |
Successive readings of the stream is supported without having to manually | |
set it's position back to ``0``. | |
""" | |
def __init__(self, obj, fs: Optional[FS] = None): | |
if hasattr(obj, "read"): | |
pos = obj.tell() | |
elif fs: | |
if fs.isfile(obj): | |
obj = fs.open(obj, "rb") | |
pos = None | |
else: | |
raise ValueError( | |
"Object must be a valid file path or a readable object") | |
else: | |
raise ValueError( | |
"Object must be readable, OR you must supply a filesystem.") | |
try: | |
file_stat = fs.getinfo(obj.name, namespaces=['stat']) | |
buffer_size = file_stat.st_blksize | |
except Exception: | |
buffer_size = 8192 | |
self._obj = obj | |
self._pos = pos | |
self._buffer_size = buffer_size | |
def __iter__(self): | |
"""Read underlying IO object and yield results. Return object to | |
original position if we didn't open it originally. | |
""" | |
self._obj.seek(0) | |
while True: | |
data = self._obj.read(self._buffer_size) | |
if not data: | |
break | |
yield data | |
if self._pos is not None: | |
self._obj.seek(self._pos) | |
def close(self): | |
"""Close underlying IO object if we opened it, else return it to | |
original position. | |
""" | |
if self._pos is None: | |
self._obj.close() | |
else: | |
self._obj.seek(self._pos) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment