Last active
January 10, 2023 20:11
-
-
Save Gribouillis/e3595e4eee50c3127e4aa61db36a333d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
History: | |
2018.11.25.19: argument path to as_dict(). New methods from_fs() from_dir() | |
2018.11.25 : bugfix in 'r' mode of openbin. | |
2018.10.01 : initial version | |
""" | |
import fs | |
import fs.base | |
import fs.errors | |
import fs.osfs | |
from fs.path import (iteratepath, normpath, relpath) | |
import fs.subfs | |
import fs.tree | |
import io | |
__version__ = '2018.11.25.19' | |
def _convert(dic): | |
return { | |
k: (_convert(v) if isinstance(v, dict) | |
else (v if isinstance(v, bytes) else str(v).encode())) | |
for (k, v) in dic.items() | |
} | |
class DictFS(fs.base.FS): | |
def __init__(self): | |
super().__init__() | |
self._dict = {} | |
def as_dict(self, path=''): | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
return self._dict | |
if name in dic: | |
if not isinstance(dic[name], dict): | |
raise fs.errors.DirectoryExpected(path) | |
else: | |
return dic[name] | |
else: | |
raise fs.errors.ResourceNotFound(path) | |
@classmethod | |
def from_dict(cls, dic): | |
d = cls() | |
d._dict = _convert(dic) | |
return d | |
@classmethod | |
def from_dir(cls, root, path=''): | |
d = cls() | |
if not root.isdir(path): | |
raise fs.errors.DirectoryExpected(path) | |
fs.copy.copy_dir(root, path, d, 'target') | |
return cls.from_dict(d._dict['target']) | |
@classmethod | |
def from_fs(cls, root): | |
return cls.from_dir(root, '') | |
def listdir(self, path): | |
with self._lock: | |
npath = relpath(normpath(path)) | |
iname = iteratepath(npath) | |
idic = self._idicompose(self._dict, iname) | |
if len(idic) < len(iname): | |
raise fs.errors.ResourceNotFound(path) | |
elif idic and not isinstance(idic[-1], dict): | |
raise fs.errors.DirectoryExpected(path) | |
else: | |
dic = idic[-1] if idic else self._dict | |
return sorted(dic.keys()) | |
def _idicompose(self, dic, iname): | |
"""Follows a sequence of keys in a dictionary, | |
returning a sequence of items, all dictionaries | |
but perhaps the last one. | |
""" | |
result = [] | |
for name in iname: | |
if isinstance(dic, dict) and name in dic: | |
dic = dic[name] | |
result.append(dic) | |
else: | |
break | |
return result | |
def _pathpos(self, path): | |
with self._lock: | |
npath = relpath(normpath(path)) | |
iname = iteratepath(npath) | |
if iname: | |
idic = self._idicompose(self._dict, iname[:-1]) | |
if len(idic) < len(iname) - 1: | |
raise fs.errors.ResourceNotFound(path) | |
elif idic and not isinstance(idic[-1], dict): | |
raise fs.errors.ResourceNotFound(path) | |
else: | |
return (idic[-1] if idic else self._dict), iname[-1], npath | |
else: | |
return self._dict, '', npath | |
def makedir(self, path, permissions=None, recreate=False): | |
with self._lock: | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
raise fs.errors.DirectoryExists(path) | |
if (name in dic) and not recreate: | |
raise fs.errors.DirectoryExists(path) | |
else: | |
dic[name] = {} | |
return fs.subfs.SubFS(self, npath) | |
def remove(self, path): | |
with self._lock: | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
raise fs.errors.FileExpected(path) | |
if name not in dic: | |
raise fs.errors.ResourceNotFound(path) | |
elif isinstance(dic[name], dict): | |
raise fs.errors.FileExpected(npath) | |
else: | |
del dic[name] | |
def removedir(self, path): | |
with self._lock: | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
raise fs.errors.RemoveRootError(path) | |
if name not in dic: | |
raise fs.errors.ResourceNotFound(path) | |
elif not isinstance(dic[name], dict): | |
raise fs.errors.DirectoryExpected(npath) | |
elif dic[name]: | |
raise fs.errors.DirectoryNotEmpty(path) | |
else: | |
del dic[name] | |
def getinfo(self, path, namespaces=None): | |
with self._lock: | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
return fs.info.Info({'basic':{'is_dir':True, 'name':''}}) | |
if name not in dic: | |
raise fs.errors.ResourceNotFound(path) | |
if isinstance(dic[name], dict): | |
return fs.info.Info({'basic':{'is_dir':True, 'name':name}}) | |
else: | |
return fs.info.Info({'basic':{'is_dir':False, 'name':name}}) | |
def setinfo(self, path, info): | |
pass | |
def openbin(self, path, mode="r", buffering=-1, **options): | |
with self._lock: | |
mode = mode.rstrip('b') | |
if not mode in ('r', 'w', 'x', 'a', 'r+', 'w+', 'x+', 'a+'): | |
raise ValueError(('Invalid mode, expected [rwxa][+]? got',mode)) | |
dic, name, npath = self._pathpos(path) | |
if not name: | |
raise fs.errors.FileExpected(path) | |
if 'x' in mode: | |
if name in dic: | |
raise fs.errors.FileExists(path) | |
else: | |
return DictFsBytesIO(dic, name, writing=True) | |
elif 'r' in mode: | |
if name in dic: | |
if isinstance(dic[name], dict): | |
raise fs.errors.FileExpected(path) | |
else: | |
return DictFsBytesIO( | |
dic, name, | |
writing=('+' in mode), | |
initial_bytes=dic[name]) | |
else: | |
raise fs.errors.ResourceNotFound(path) | |
elif 'w' in mode: | |
if name in dic: | |
if isinstance(dic[name], dict): | |
raise fs.errors.FileExpected(path) | |
else: | |
del dic[name] | |
return DictFsBytesIO(dic, name, writing=True) | |
elif 'a' in mode: | |
if name in dic: | |
if isinstance(dic[name], dict): | |
raise fs.errors.FileExpected(path) | |
stream = DictFsBytesIO( | |
dic, name, writing=True, | |
initial_bytes = dic[name] if name in dic else b'') | |
stream.seek(0, io.SEEK_END) | |
return stream | |
DictFs = DictFS | |
class DictFsBytesIO(io.BytesIO): | |
def __init__(self, dic, name, initial_bytes=b'', writing=False): | |
io.BytesIO.__init__(self, initial_bytes) | |
self._dic = dic | |
self._name = name | |
self._writing = writing | |
def close(self): | |
if self._writing: | |
self._dic[self._name] = self.getvalue() | |
super().close() | |
def main(): | |
d = DictFs() | |
with d.openbin('foo.txt', 'w') as ofile: | |
ofile.write('hello world'.encode()) | |
with d.openbin('foo.txt', 'a') as ofile: | |
ofile.write(' me again'.encode()) | |
d.makedir('spam') | |
d.makedirs('eggs/ham') | |
d.copy('foo.txt', 'eggs/bar.txt') | |
d.copydir('eggs', 'wibble', create=True) | |
with d.open('eggs/truc.txt', 'w') as ofile: | |
print('This is only a test', file=ofile) | |
d.tree() | |
from fs.osfs import OSFS | |
tmp = OSFS('tmp') | |
fs.copy.copy_dir(d, '', tmp, '') | |
# ou fs.copy.copy_fs(d, tmp) | |
print(d._dict) | |
z = DictFs.from_dict( | |
{'a':{'b': 'hi there', 'c': b'why?'}, 'e': 'really.'} | |
) | |
z.tree() | |
with z.open('a/c') as ifh: | |
print(ifh.read()) | |
print(z.as_dict()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment