Skip to content

Instantly share code, notes, and snippets.

@niansa
Created July 18, 2020 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niansa/15770adee78ab4633037bebf36d694cc to your computer and use it in GitHub Desktop.
Save niansa/15770adee78ab4633037bebf36d694cc to your computer and use it in GitHub Desktop.
import os, io
from base64 import b64encode, b64decode
class itypes:
FILE = 1
DIRECTORY = 0
# READABLE WRITABLE APPENDS CREATES
FREAD_ONLY = "r" # YES NO NO NO
FREAD_WRITE = "r+" # YES YES NO NO
FWRITE_ONLY = "w" # NO YES NO YES
FWRITE_READ = "w+" # YES YES NO YES
FAPPEND = "a" # NO YES YES YES
FAPPEND_READ = "a+" # YES YES YES YES
FREADABLE = [FREAD_ONLY,FREAD_WRITE,FWRITE_READ,FAPPEND_READ]
FWRITABLE = [FREAD_WRITE,FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ]
FAPPENDS = [FAPPEND,FAPPEND_READ]
FCREATES = [FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ]
FALL = [FREAD_ONLY,FREAD_WRITE,FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ]
class vfs_class:
def __init__(self, filesystem=None):
# If no filesystem given; create one
if not filesystem:
self.filesystem = {}
# Else; use existing one
else:
self.filesystem = filesystem
# Default current working directory to /
self.cwd = "/"
# No default update event
self._update_event = False
def _def_update_event(self, definition):
self._update_event = definition
def _takeover_fs(self):
for function in dir(os):
if function in dir(self) and not function.startswith("__"):
exec(f'os.{function} = self.{function}')
for function in dir(os):
if function in dir(self) and not function.startswith("__"):
exec(f'os.path.{function} = self.{function}')
#open = self.open
def _get_path_realsplit(self, path):
# Make sure path is not an empty string
if path == "":
raise FileNotFoundError("[Errno 2] No such file or directory: ''")
# Make sure path begins with CWD if it does not begin with either no / or ./
if not path.startswith("/"):
if path.startswith("./") or path == ".":
path = path[2:]
path = self.cwd + path
# Do the magic
path_split = path.split('/')
path_depth = 0
path_realsplit = []
for filename in path_split:
if filename != "":
if filename == ".":
continue
elif filename == "..":
try:
del path_realsplit[path_depth-1]
path_depth -= 1
except IndexError: pass
else:
path_realsplit.append(filename)
path_depth += 1
return path_realsplit
def _get_inode_data(self, path, firsterr=True):
# Get path in list type
path_realsplit = self._get_path_realsplit(path)
# Dig through the filesystem
fsdata = self.filesystem
try:
for filename in path_realsplit:
fsdata = fsdata[filename]
# No such file or directory
except:
if firsterr == False:
firsterr = True
else:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
# Return the inode
return fsdata
def _set_inode(self, path, value):
# Trigger event
if self._update_event:
self._update_event(0, self.realpath(path))
# Get path list from string
path_realsplit = self._get_path_realsplit(path)
# Convert the list into a string
dstring = "self.filesystem"
for filename in path_realsplit:
dstring += f'["{filename}"]'
# Apply changes to the filesystem
try:
exec(dstring + value) # I know, I have to find a better way
except KeyError:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
# Trigger event
if self._update_event:
self._update_event(1, self.realpath(path))
def _get_inode_type(self, inode):
# folder
if type(inode) == type({}):
return itypes.DIRECTORY
# file
elif type(inode) == type(""):
return itypes.FILE
# exception
else:
return False
def _remove_inode(self, path):
# Remove trailing /
if path[-1:] == "/":
path = path[:-1]
# Get important details
basepath = os.path.split(path)[0]
filename = os.path.basename(os.path.normpath(path))
# Set basepath if empty
if basepath == "":
basepath = "/"
# Undefine inode, then redefine it
self._set_inode(basepath, f'.pop("{filename}")')
# os.path functions
def realpath(self, path):
# Get real pathsplit
path_realsplit = self._get_path_realsplit(path)
# Get real path
path_real = ""
for filename in path_realsplit:
path_real += f'/{filename}'
if len(path_real) == 0:
path_real = "/"
return path_real
def isabs(self, path):
if path.startswith("./") or not path.startswith("/"):
return False
else:
return True
def isdir(self, path):
inode = self._get_inode_data(path)
return self._get_inode_type(inode) == itypes.DIRECTORY
def isfile(self, path):
inode = self._get_inode_data(path)
return self._get_inode_type(inode) == itypes.FILE
# Dummy functions
def access(self, path, mode): raise OSError(95, "Operation not supported")
def link(self, src): raise OSError(95, "Operation not supported")
def unlink(self, path): raise OSError(95, "Operation not supported")
def readlink(self, path): raise OSError(95, "Operation not supported")
def mknod(self, path): raise OSError(95, "Operation not supported")
def statvfs(self, path): raise OSError(95, "Operation not supported")
def utime(self, path): raise OSError(95, "Operation not supported")
def chmod(self, path, mode): raise OSError(95, "Operation not supported")
### DIRECTORY FUNCTIONS
def chdir(self, path):
# Get absolute path
path = self.realpath(path)
# Load inode
inode = self._get_inode_data(path)
# Check if path is a directory
if self._get_inode_type(inode) != itypes.DIRECTORY:
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
# Add trailing / if required
if path[-1:] != "/":
path = f"{path}/"
# Set the current working directory to path
self.cwd = path
def getcwd(self):
if len(self.cwd) != 1:
return self.cwd[:-1]
else:
return self.cwd
def listdir(self, path):
inode = self._get_inode_data(path)
# Check if inode is a directory
if self._get_inode_type(inode) != itypes.DIRECTORY:
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
# Create list
dirents = []
for sinode in inode.keys():
dirents.append(sinode)
# Sort the list
dirents_dirs = []
dirents_files = []
for file in dirents:
if self.isdir(file):
dirents_dirs.append(file)
if self.isfile(file):
dirents_files.append(file)
dirents_dirs.sort()
dirents_files.sort()
# Return the list
return dirents_dirs + dirents_files
def mkdir(self, path):
# Remove trailing /
if path[-1:] == "/":
path = path[:-1]
# Get absolute path
path = self.realpath(path)
# Check if inode exists already
nonexistent = False
try:
inode = self._get_inode_data(path)
except FileNotFoundError:
nonexistent = True
if not nonexistent:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
# Check if target path is a directory
if self._get_inode_type(self._get_inode_data(os.path.split(path)[0], firsterr=False)) != itypes.DIRECTORY:
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
inode = self._get_inode_data(path, firsterr=False)
# Get filename
filename = os.path.basename(os.path.normpath(path))
# Define inode
self._set_inode(path, ' = {}')
def rmdir(self, path):
inode = self._get_inode_data(path)
# Check if inode is a directory
if self._get_inode_type(inode) != itypes.DIRECTORY:
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
# Check if directory is empty
if inode != {}:
raise OSError(39, f"Directory not empty: '{path}'")
# Remove the inode
self._remove_inode(path)
### FILE FUNCTIONS
def touch(self, path):
self.open(path, mode="w").close()
def remove(self, path):
inode = self._get_inode_data(path)
# Check weather inode is a file or a directory
if self._get_inode_type(inode) != itypes.FILE:
raise IsADirectoryError
self._remove_inode(path)
def rename(self, old, new):
inode = self._get_inode_data(old)
# Get important details for new inode
basepath = os.path.split(new)[0]
filename = os.path.basename(os.path.normpath(new))
# Create new inode
if self._get_inode_type(inode) == itypes.FILE:
inode = f'"{inode}"'
self._set_inode(new, f' = {inode}')
# Remove old inode
self._remove_inode(old)
def open(self, path, mode="r"):
# Check if mode is bytes
if 'b' in mode:
bytesfd = True
mode = mode.replace("b", "")
else:
bytesfd = False
# Check if mode is valid
if not mode in FALL:
raise ValueError(f"invalid mode: '{mode}'")
# Try to get the inode. If it does not exist and write mode is enabled; use a empty one
try:
inode = self._get_inode_data(path)
except FileNotFoundError:
if mode in FCREATES:
inode = ""
else:
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'")
# Check weather inode is a file or a directory
if self._get_inode_type(inode) != itypes.FILE:
raise IsADirectoryError
# Check for trailing /
elif path[-1:] == "/":
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'")
# Decode base64
inode = b64decode(inode.encode("utf-8"))
# Create IO
if bytesfd:
fd = io.BytesIO(inode)
else:
fd = io.StringIO(inode.decode("utf-8"))
# Add functions to force read() and seek()
fd._fread = fd.read
fd._fseek = fd.seek
# Restrict write()
if not mode in FWRITABLE:
def fdwrite(data):
raise io.UnsupportedOperation("not writable")
fd.write = fdwrite
# Restrict read()
if not mode in FREADABLE:
def fdread():
raise io.UnsupportedOperation("not readable")
fd.read = fdread
# Append path to fd
fd.name = path
fd.mode = mode
fd.bytesfd = bytesfd
# Implement append mode if required
if mode in FAPPENDS:
inode_legenth = len(inode)
fd._fseek(inode_legenth)
def fdseek(position):
nonlocal inode_legenth
nonlocal fd
return fd._fseek(inode_legenth + position)
fd.seek = fdseek
# Write back data to inode on close()
def fdclose(*args):
nonlocal fd
nonlocal self
if mode in FWRITABLE:
fd._fseek(0)
inode = fd._fread()
# Convert to bytes object
if not fd.bytesfd:
inode = inode.encode('utf-8')
# Convert to base64
b64inode = b64encode(inode).decode('utf-8')
# Write to filesystem
self._set_inode(fd.name, f' = "{b64inode}"')
# Invalidate fd
fd = None
fd.close = fdclose
fd.__exit__ = fdclose
# Return created fd
return fd
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment