Created
February 14, 2023 07:37
-
-
Save b0o/5ab6d7b65b178d789d3ed1362cc02740 to your computer and use it in GitHub Desktop.
python multi-filesystem adapter with support for s3fs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
File system abstraction with support for additional kinds of file systems, including: | |
- regular file system | |
- s3 via s3fs | |
""" | |
from datetime import datetime | |
import io | |
import os | |
from functools import wraps | |
from . import aws | |
def s3_stat(res): | |
""" | |
Returns a stat_result object for a file on s3 | |
""" | |
st_mode = st_ino = st_dev = st_nlink = st_uid = st_gid = 0 | |
st_size = res["Size"] | |
st_atime = int(datetime.now().timestamp()) | |
st_mtime = st_ctime = int(res["LastModified"].timestamp()) | |
return os.stat_result((st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, | |
st_size, st_atime, st_mtime, st_ctime)) | |
filesystems = { | |
's3': { | |
"adapter": aws.s3fs, | |
"transformers": { | |
"listdir": lambda x: [os.path.basename(e["Key"]) for e in x], | |
"stat": s3_stat, | |
"lstat": s3_stat, | |
}, | |
"aliases": { | |
"remove": "rm", | |
"utime": "touch", | |
"lstat": "stat", | |
"getsize": "size", | |
}, | |
}, | |
} | |
def parse_path(path): | |
""" | |
Parses a path and returns the file system adapter and the path without the | |
file system prefix | |
""" | |
for prefix, fs in filesystems.items(): | |
if path.startswith(prefix + '://'): | |
return fs, path.removeprefix(prefix + '://') | |
return None, path | |
def auto_fs(base_func, | |
func_name=None, | |
default_kwargs=None, | |
args_to_kwargs=None): | |
""" | |
Automatically selects the correct file system adapter and calls the function | |
with the correct arguments, transforming the result if necessary to match the | |
base function's return spec. | |
""" | |
default_kwargs = default_kwargs or {} | |
args_to_kwargs = args_to_kwargs or {} | |
def decorator(func): | |
@wraps(base_func) | |
def wrapper(path, *args, **kwargs): | |
args = list(args) | |
# convert positional arguments to keyword arguments | |
# if they are specified in the args_to_kwargs dict | |
for args_list_index, arg in enumerate(args): | |
all_args_index = args_list_index + 1 | |
if all_args_index in args_to_kwargs: | |
kwarg_name = args_to_kwargs[all_args_index] | |
if kwarg_name in kwargs: | |
raise ValueError( | |
f"Argument {all_args_index} ({kwarg_name}) " + | |
"is specified both as an argument and a keyword argument" | |
) | |
kwargs[kwarg_name] = arg | |
args[args_list_index] = None | |
# remove trailing None values from args | |
while args and args[-1] is None: | |
args.pop() | |
# parse the path and get the file system adapter, if any | |
fs, path = parse_path(path) | |
if fs is None: | |
# if no file system adapter is found, use the base function | |
# and no transformers/aliases | |
fs_adapter = None | |
fs_func_name = None | |
fs_transformer = None | |
fs_func = base_func | |
else: | |
# if a file system adapter is found, use the file system adapter | |
# and any transformers and aliases | |
fs_adapter = fs["adapter"]() | |
fs_func_name = func_name or func.__name__ | |
# if the function name is an alias, use the aliased function name | |
if "aliases" in fs and fs_func_name in fs["aliases"]: | |
fs_func_name = fs["aliases"][fs_func_name] | |
# if the function has a transformer, use it to transform | |
# the result | |
if "transformers" in fs: | |
# transfomer is based on the base function name, not the | |
# aliased function name | |
fs_transformer = fs["transformers"].get(func_name | |
or func.__name__) | |
# get the target function from the file system adapter | |
fs_func = getattr(fs_adapter, fs_func_name) | |
# call the target function with the correct arguments | |
res = fs_func(path, **{**default_kwargs, **kwargs}) | |
# transform the result if necessary | |
if fs_transformer: | |
res = fs_transformer(res) | |
return res | |
return wrapper | |
return decorator | |
@auto_fs(os.path.exists) | |
def exists(*_args, **_kwargs): | |
""" | |
Wrapper for os.path.exists() that can check files from any supported file system type | |
""" | |
@auto_fs(os.path.getsize) | |
def getsize(*_args, **_kwargs): | |
""" | |
Wrapper for os.path.getsize() that can check files from any supported file system type | |
""" | |
@auto_fs(os.path.isdir) | |
def isdir(*_args, **_kwargs): | |
""" | |
Wrapper for os.path.isdir() that can check directories from any supported file system type | |
""" | |
@auto_fs(os.path.isfile) | |
def isfile(*_args, **_kwargs): | |
""" | |
Wrapper for os.path.isfile() that can check files from any supported file system type | |
""" | |
@auto_fs(os.path.lexists) | |
def lexists(*_args, **_kwargs): | |
""" | |
Wrapper for os.path.lexists() that can check files from any supported file system type | |
""" | |
@auto_fs(os.listdir) | |
def listdir(*_args, **_kwargs): | |
""" | |
Wrapper for os.listdir() that can list files from any supported file system type | |
""" | |
@auto_fs(os.mkdir) | |
def mkdir(*_args, **_kwargs): | |
""" | |
Wrapper for os.mkdir() that can make directories from any supported file system type | |
""" | |
@auto_fs(os.makedirs) | |
def makedirs(*_args, **_kwargs): | |
""" | |
Wrapper for os.makedirs() that can make directories from any supported file system type | |
""" | |
@auto_fs(io.open, | |
func_name="open", | |
args_to_kwargs={ | |
0: "file", | |
1: "mode" | |
}, | |
default_kwargs={"mode": "r"}) | |
def open_file(*_args, **_kwargs): | |
""" | |
Wrapper for open() that can open files from any supported file system type | |
""" | |
@auto_fs(os.rename) | |
def rename(*_args, **_kwargs): | |
""" | |
Wrapper for os.rename() that can rename files from any supported file system type | |
""" | |
@auto_fs(os.remove) | |
def remove(*_args, **_kwargs): | |
""" | |
Wrapper for os.remove() that can remove files from any supported file system type | |
""" | |
@auto_fs(os.rmdir) | |
def rmdir(*_args, **_kwargs): | |
""" | |
Wrapper for os.rmdir() that can remove directories from any supported file system type | |
""" | |
@auto_fs(os.stat) | |
def stat(*_args, **_kwargs): | |
""" | |
Wrapper for os.stat() that can stat files from any supported file system type | |
""" | |
@auto_fs(os.lstat) | |
def lstat(*_args, **_kwargs): | |
""" | |
Wrapper for os.lstat() that can lstat files from any supported file system type | |
""" | |
@auto_fs(os.utime) | |
def utime(*_args, **_kwargs): | |
""" | |
Wrapper for os.utime() that can update file timestamps from any supported file system type | |
""" | |
if __name__ == "__main__": | |
import sys | |
p = sys.argv[1] | |
if exists(p): | |
print("size", getsize(p)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment