Skip to content

Instantly share code, notes, and snippets.

@Gribouillis
Last active January 10, 2023 20:11
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Gribouillis/e3595e4eee50c3127e4aa61db36a333d to your computer and use it in GitHub Desktop.
Save Gribouillis/e3595e4eee50c3127e4aa61db36a333d to your computer and use it in GitHub Desktop.
"""
History:
2018.11.25.19: argument path to as_dict(). New methods from_fs() from_dir()
2018.11.25 : bugfix in 'r' mode of openbin.
2018.10.01 : initial version
"""
import fs
import fs.base
import fs.errors
import fs.osfs
from fs.path import (iteratepath, normpath, relpath)
import fs.subfs
import fs.tree
import io
__version__ = '2018.11.25.19'
def _convert(dic):
return {
k: (_convert(v) if isinstance(v, dict)
else (v if isinstance(v, bytes) else str(v).encode()))
for (k, v) in dic.items()
}
class DictFS(fs.base.FS):
def __init__(self):
super().__init__()
self._dict = {}
def as_dict(self, path=''):
dic, name, npath = self._pathpos(path)
if not name:
return self._dict
if name in dic:
if not isinstance(dic[name], dict):
raise fs.errors.DirectoryExpected(path)
else:
return dic[name]
else:
raise fs.errors.ResourceNotFound(path)
@classmethod
def from_dict(cls, dic):
d = cls()
d._dict = _convert(dic)
return d
@classmethod
def from_dir(cls, root, path=''):
d = cls()
if not root.isdir(path):
raise fs.errors.DirectoryExpected(path)
fs.copy.copy_dir(root, path, d, 'target')
return cls.from_dict(d._dict['target'])
@classmethod
def from_fs(cls, root):
return cls.from_dir(root, '')
def listdir(self, path):
with self._lock:
npath = relpath(normpath(path))
iname = iteratepath(npath)
idic = self._idicompose(self._dict, iname)
if len(idic) < len(iname):
raise fs.errors.ResourceNotFound(path)
elif idic and not isinstance(idic[-1], dict):
raise fs.errors.DirectoryExpected(path)
else:
dic = idic[-1] if idic else self._dict
return sorted(dic.keys())
def _idicompose(self, dic, iname):
"""Follows a sequence of keys in a dictionary,
returning a sequence of items, all dictionaries
but perhaps the last one.
"""
result = []
for name in iname:
if isinstance(dic, dict) and name in dic:
dic = dic[name]
result.append(dic)
else:
break
return result
def _pathpos(self, path):
with self._lock:
npath = relpath(normpath(path))
iname = iteratepath(npath)
if iname:
idic = self._idicompose(self._dict, iname[:-1])
if len(idic) < len(iname) - 1:
raise fs.errors.ResourceNotFound(path)
elif idic and not isinstance(idic[-1], dict):
raise fs.errors.ResourceNotFound(path)
else:
return (idic[-1] if idic else self._dict), iname[-1], npath
else:
return self._dict, '', npath
def makedir(self, path, permissions=None, recreate=False):
with self._lock:
dic, name, npath = self._pathpos(path)
if not name:
raise fs.errors.DirectoryExists(path)
if (name in dic) and not recreate:
raise fs.errors.DirectoryExists(path)
else:
dic[name] = {}
return fs.subfs.SubFS(self, npath)
def remove(self, path):
with self._lock:
dic, name, npath = self._pathpos(path)
if not name:
raise fs.errors.FileExpected(path)
if name not in dic:
raise fs.errors.ResourceNotFound(path)
elif isinstance(dic[name], dict):
raise fs.errors.FileExpected(npath)
else:
del dic[name]
def removedir(self, path):
with self._lock:
dic, name, npath = self._pathpos(path)
if not name:
raise fs.errors.RemoveRootError(path)
if name not in dic:
raise fs.errors.ResourceNotFound(path)
elif not isinstance(dic[name], dict):
raise fs.errors.DirectoryExpected(npath)
elif dic[name]:
raise fs.errors.DirectoryNotEmpty(path)
else:
del dic[name]
def getinfo(self, path, namespaces=None):
with self._lock:
dic, name, npath = self._pathpos(path)
if not name:
return fs.info.Info({'basic':{'is_dir':True, 'name':''}})
if name not in dic:
raise fs.errors.ResourceNotFound(path)
if isinstance(dic[name], dict):
return fs.info.Info({'basic':{'is_dir':True, 'name':name}})
else:
return fs.info.Info({'basic':{'is_dir':False, 'name':name}})
def setinfo(self, path, info):
pass
def openbin(self, path, mode="r", buffering=-1, **options):
with self._lock:
mode = mode.rstrip('b')
if not mode in ('r', 'w', 'x', 'a', 'r+', 'w+', 'x+', 'a+'):
raise ValueError(('Invalid mode, expected [rwxa][+]? got',mode))
dic, name, npath = self._pathpos(path)
if not name:
raise fs.errors.FileExpected(path)
if 'x' in mode:
if name in dic:
raise fs.errors.FileExists(path)
else:
return DictFsBytesIO(dic, name, writing=True)
elif 'r' in mode:
if name in dic:
if isinstance(dic[name], dict):
raise fs.errors.FileExpected(path)
else:
return DictFsBytesIO(
dic, name,
writing=('+' in mode),
initial_bytes=dic[name])
else:
raise fs.errors.ResourceNotFound(path)
elif 'w' in mode:
if name in dic:
if isinstance(dic[name], dict):
raise fs.errors.FileExpected(path)
else:
del dic[name]
return DictFsBytesIO(dic, name, writing=True)
elif 'a' in mode:
if name in dic:
if isinstance(dic[name], dict):
raise fs.errors.FileExpected(path)
stream = DictFsBytesIO(
dic, name, writing=True,
initial_bytes = dic[name] if name in dic else b'')
stream.seek(0, io.SEEK_END)
return stream
DictFs = DictFS
class DictFsBytesIO(io.BytesIO):
def __init__(self, dic, name, initial_bytes=b'', writing=False):
io.BytesIO.__init__(self, initial_bytes)
self._dic = dic
self._name = name
self._writing = writing
def close(self):
if self._writing:
self._dic[self._name] = self.getvalue()
super().close()
def main():
d = DictFs()
with d.openbin('foo.txt', 'w') as ofile:
ofile.write('hello world'.encode())
with d.openbin('foo.txt', 'a') as ofile:
ofile.write(' me again'.encode())
d.makedir('spam')
d.makedirs('eggs/ham')
d.copy('foo.txt', 'eggs/bar.txt')
d.copydir('eggs', 'wibble', create=True)
with d.open('eggs/truc.txt', 'w') as ofile:
print('This is only a test', file=ofile)
d.tree()
from fs.osfs import OSFS
tmp = OSFS('tmp')
fs.copy.copy_dir(d, '', tmp, '')
# ou fs.copy.copy_fs(d, tmp)
print(d._dict)
z = DictFs.from_dict(
{'a':{'b': 'hi there', 'c': b'why?'}, 'e': 'really.'}
)
z.tree()
with z.open('a/c') as ifh:
print(ifh.read())
print(z.as_dict())
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment