Skip to content

Instantly share code, notes, and snippets.

@ansemjo
Last active January 29, 2022 11:36
Show Gist options
  • Save ansemjo/85d47b4547deda6d5374ba21be49bed2 to your computer and use it in GitHub Desktop.
Save ansemjo/85d47b4547deda6d5374ba21be49bed2 to your computer and use it in GitHub Desktop.
FUSE driver to interface with Jussi Kilpelainen's core memory shield. This gives you a four byte file to store all your precious memories in.
#!/usr/bin/env python3
"""
FUSE driver to interface with Jussi Kilpelainen's core memory shield [0].
This gives you a four byte file to store all your precious memories in.
Get the Arduino code on his homepage [1]. I used the 2019-10-31 version.
0: https://www.tindie.com/products/kilpelaj/core-memory-shield-for-arduino/
1: https://jussikilpelainen.kapsi.fi/wordpress/?p=213
"""
import os, errno, time, re, binascii
try:
from fuse import FUSE, FuseOSError, Operations
from serial import Serial
except ImportError:
print("required packages: fusepy, pyserial")
exit(1)
class Corememory(Operations):
mem = [b"\x00", b"\x00", b"\x00", b"\x00"]
def __init__(self, serial, trace=False, foreground=False) -> None:
super().__init__()
self.trace = trace
self.fg = foreground
self.serial = Serial(serial, 115200, timeout=0.2, exclusive=True)
while self.serial.in_waiting == 0: time.sleep(0.1)
welcome = self.serial.read(1024)
if not welcome.startswith(b"Welcome! If you're new, try"):
raise Exception("unexpected greeting. is this a core memory shield?")
self.readshield()
print("READY." + ("" if self.fg else " Backgrounding ..."))
# read complete word from shield
def readshield(self):
self.serial.flushInput()
self.serial.write(b"R")
out = self.serial.read(128)
if m := re.match(b"Core data read: 0x([0-9A-Z]+) .*", out):
mem = binascii.unhexlify((b"00000000" + m.group(1))[-8:])
if self.fg: print(f"RD 0x{mem.hex()}")
for i in range(len(mem)):
self.mem[i] = mem[i:i+1]
else:
if self.fg: print(out)
raise Exception("couldn't read memory from shield")
# write complete word to shield
def writeshield(self, data):
if len(data) != 4:
raise ValueError("expected four bytes")
self.serial.write(b"W")
hx = b"".join(data).hex()
if self.fg: print(f"WR 0x{hx}")
self.serial.write(hx.encode())
self.serial.flushOutput()
# only allow access to this single file
cores = "/cores"
def checkfile(self, path, err):
if path not in [".", "..", "/", self.cores]:
raise FuseOSError(err)
def access(self, path, mode):
if self.trace: print("access", path, mode)
self.checkfile(path, errno.ENOENT)
def getattr(self, path, fh=None):
if self.trace: print("getattr", path, fh)
self.checkfile(path, errno.ENOENT)
attr = dict(((f"st_{c}time", 0) for c in ["a", "c", "m"]),
st_uid=os.getuid(), st_gid=os.getgid(), st_nlink=1)
match path:
case "." | ".." | "/":
attr["st_mode"] = 0o040755
attr["st_size"] = 4096
case self.cores:
attr["st_mode"] = 0o100644
attr["st_size"] = 4
return attr
def readdir(self, path, fh=None):
if path != "/": raise FuseOSError(errno.ENOENT)
if self.trace: print("readdir", path, fh)
for el in [".", "..", self.cores.lstrip("/")]:
yield el
def open(self, path, flags):
if self.trace: print("open", path, flags)
self.checkfile(path, errno.ENOENT)
if self.trace: print("opening file")
return 0
def create(self, path, mode, fi=None):
if self.trace: print("create", path, mode, fi)
raise FuseOSError(errno.EACCES)
def read(self, path, length, offset, fh):
if self.trace: print("read", path, length, offset, fh)
self.checkfile(path, errno.ENOENT)
if offset > 4: raise FuseOSError(errno.ERANGE)
self.readshield()
return b"".join(self.mem[offset:max(4, offset+length)])
def write(self, path, buf, offset, fh):
if self.trace: print("write", path, buf, offset, fh)
self.checkfile(path, errno.ENOENT)
err = None
for i in range(len(buf)):
if i+offset >= 4:
err = FuseOSError(errno.ENOSPC)
break
self.mem[i+offset] = buf[i:i+1]
self.writeshield(self.mem)
if err: raise err
return i+1
def truncate(self, path, length, fh=None):
if self.trace: print("truncate", path, length, fh)
if path != self.cores:
raise FuseOSError(errno.ENOENT)
if length == 0:
self.mem = [b"\x00", b"\x00", b"\x00", b"\x00"]
self.writeshield(self.mem)
else:
raise FuseOSError(errno.ERANGE)
def chmod(self, path, mode):
if self.trace: print("chmod", path, mode)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def chown(self, path, uid, gid):
if self.trace: print("chown", path, uid, gid)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def unlink(self, path):
if self.trace: print("unlink", path)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def symlink(self, path, target):
if self.trace: print("symlink", path, target)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def rename(self, path, new):
if self.trace: print("rename", path, new)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def link(self, target, path):
if self.trace: print("link", target, path)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def utimens(self, path, times=None):
if self.trace: print("utimens", path, times)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def mknod(self, path, mode, dev):
if self.trace: print("mknod", path, mode, dev)
raise FuseOSError(errno.EACCES)
def rmdir(self, path):
if self.trace: print("rmdir", path)
self.checkfile(path, errno.ENOENT)
raise FuseOSError(errno.EACCES)
def mkdir(self, path, mode):
if self.trace: print("mkdir", path, mode)
raise FuseOSError(errno.EACCES)
def flush(self, path, fh):
if self.trace: print("flush", path, fh)
def release(self, path, fh):
if self.trace: print("release", path, fh)
def fsync(self, path, fdatasync, fh):
if self.trace: print("fsync", path, fdatasync, fh)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--trace", help="enable tracing of all filesystem commands", action="store_true")
parser.add_argument("--foreground", "-f", help="keep running in foreground", action="store_true")
parser.add_argument("corememory", help="path to serial interface of arduino with core memory shield")
parser.add_argument("mountpoint", help="where to mount the filesystem, duh'")
args = parser.parse_args()
FUSE(Corememory(args.corememory, trace=args.trace, foreground=args.foreground), args.mountpoint, foreground=args.foreground)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment