Last active
January 29, 2022 11:36
-
-
Save ansemjo/85d47b4547deda6d5374ba21be49bed2 to your computer and use it in GitHub Desktop.
FUSE driver to interface with Jussi Kilpelainen's core memory shield. This gives you a four byte file to store all your precious memories in.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
FUSE driver to interface with Jussi Kilpelainen's core memory shield [0]. | |
This gives you a four byte file to store all your precious memories in. | |
Get the Arduino code on his homepage [1]. I used the 2019-10-31 version. | |
0: https://www.tindie.com/products/kilpelaj/core-memory-shield-for-arduino/ | |
1: https://jussikilpelainen.kapsi.fi/wordpress/?p=213 | |
""" | |
import os, errno, time, re, binascii | |
try: | |
from fuse import FUSE, FuseOSError, Operations | |
from serial import Serial | |
except ImportError: | |
print("required packages: fusepy, pyserial") | |
exit(1) | |
class Corememory(Operations): | |
mem = [b"\x00", b"\x00", b"\x00", b"\x00"] | |
def __init__(self, serial, trace=False, foreground=False) -> None: | |
super().__init__() | |
self.trace = trace | |
self.fg = foreground | |
self.serial = Serial(serial, 115200, timeout=0.2, exclusive=True) | |
while self.serial.in_waiting == 0: time.sleep(0.1) | |
welcome = self.serial.read(1024) | |
if not welcome.startswith(b"Welcome! If you're new, try"): | |
raise Exception("unexpected greeting. is this a core memory shield?") | |
self.readshield() | |
print("READY." + ("" if self.fg else " Backgrounding ...")) | |
# read complete word from shield | |
def readshield(self): | |
self.serial.flushInput() | |
self.serial.write(b"R") | |
out = self.serial.read(128) | |
if m := re.match(b"Core data read: 0x([0-9A-Z]+) .*", out): | |
mem = binascii.unhexlify((b"00000000" + m.group(1))[-8:]) | |
if self.fg: print(f"RD 0x{mem.hex()}") | |
for i in range(len(mem)): | |
self.mem[i] = mem[i:i+1] | |
else: | |
if self.fg: print(out) | |
raise Exception("couldn't read memory from shield") | |
# write complete word to shield | |
def writeshield(self, data): | |
if len(data) != 4: | |
raise ValueError("expected four bytes") | |
self.serial.write(b"W") | |
hx = b"".join(data).hex() | |
if self.fg: print(f"WR 0x{hx}") | |
self.serial.write(hx.encode()) | |
self.serial.flushOutput() | |
# only allow access to this single file | |
cores = "/cores" | |
def checkfile(self, path, err): | |
if path not in [".", "..", "/", self.cores]: | |
raise FuseOSError(err) | |
def access(self, path, mode): | |
if self.trace: print("access", path, mode) | |
self.checkfile(path, errno.ENOENT) | |
def getattr(self, path, fh=None): | |
if self.trace: print("getattr", path, fh) | |
self.checkfile(path, errno.ENOENT) | |
attr = dict(((f"st_{c}time", 0) for c in ["a", "c", "m"]), | |
st_uid=os.getuid(), st_gid=os.getgid(), st_nlink=1) | |
match path: | |
case "." | ".." | "/": | |
attr["st_mode"] = 0o040755 | |
attr["st_size"] = 4096 | |
case self.cores: | |
attr["st_mode"] = 0o100644 | |
attr["st_size"] = 4 | |
return attr | |
def readdir(self, path, fh=None): | |
if path != "/": raise FuseOSError(errno.ENOENT) | |
if self.trace: print("readdir", path, fh) | |
for el in [".", "..", self.cores.lstrip("/")]: | |
yield el | |
def open(self, path, flags): | |
if self.trace: print("open", path, flags) | |
self.checkfile(path, errno.ENOENT) | |
if self.trace: print("opening file") | |
return 0 | |
def create(self, path, mode, fi=None): | |
if self.trace: print("create", path, mode, fi) | |
raise FuseOSError(errno.EACCES) | |
def read(self, path, length, offset, fh): | |
if self.trace: print("read", path, length, offset, fh) | |
self.checkfile(path, errno.ENOENT) | |
if offset > 4: raise FuseOSError(errno.ERANGE) | |
self.readshield() | |
return b"".join(self.mem[offset:max(4, offset+length)]) | |
def write(self, path, buf, offset, fh): | |
if self.trace: print("write", path, buf, offset, fh) | |
self.checkfile(path, errno.ENOENT) | |
err = None | |
for i in range(len(buf)): | |
if i+offset >= 4: | |
err = FuseOSError(errno.ENOSPC) | |
break | |
self.mem[i+offset] = buf[i:i+1] | |
self.writeshield(self.mem) | |
if err: raise err | |
return i+1 | |
def truncate(self, path, length, fh=None): | |
if self.trace: print("truncate", path, length, fh) | |
if path != self.cores: | |
raise FuseOSError(errno.ENOENT) | |
if length == 0: | |
self.mem = [b"\x00", b"\x00", b"\x00", b"\x00"] | |
self.writeshield(self.mem) | |
else: | |
raise FuseOSError(errno.ERANGE) | |
def chmod(self, path, mode): | |
if self.trace: print("chmod", path, mode) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def chown(self, path, uid, gid): | |
if self.trace: print("chown", path, uid, gid) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def unlink(self, path): | |
if self.trace: print("unlink", path) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def symlink(self, path, target): | |
if self.trace: print("symlink", path, target) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def rename(self, path, new): | |
if self.trace: print("rename", path, new) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def link(self, target, path): | |
if self.trace: print("link", target, path) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def utimens(self, path, times=None): | |
if self.trace: print("utimens", path, times) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def mknod(self, path, mode, dev): | |
if self.trace: print("mknod", path, mode, dev) | |
raise FuseOSError(errno.EACCES) | |
def rmdir(self, path): | |
if self.trace: print("rmdir", path) | |
self.checkfile(path, errno.ENOENT) | |
raise FuseOSError(errno.EACCES) | |
def mkdir(self, path, mode): | |
if self.trace: print("mkdir", path, mode) | |
raise FuseOSError(errno.EACCES) | |
def flush(self, path, fh): | |
if self.trace: print("flush", path, fh) | |
def release(self, path, fh): | |
if self.trace: print("release", path, fh) | |
def fsync(self, path, fdatasync, fh): | |
if self.trace: print("fsync", path, fdatasync, fh) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--trace", help="enable tracing of all filesystem commands", action="store_true") | |
parser.add_argument("--foreground", "-f", help="keep running in foreground", action="store_true") | |
parser.add_argument("corememory", help="path to serial interface of arduino with core memory shield") | |
parser.add_argument("mountpoint", help="where to mount the filesystem, duh'") | |
args = parser.parse_args() | |
FUSE(Corememory(args.corememory, trace=args.trace, foreground=args.foreground), args.mountpoint, foreground=args.foreground) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment