Skip to content

Instantly share code, notes, and snippets.

@MatthewWilkes
Created March 31, 2024 22:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MatthewWilkes/e817df454fd814d0568901d072ece100 to your computer and use it in GitHub Desktop.
Save MatthewWilkes/e817df454fd814d0568901d072ece100 to your computer and use it in GitHub Desktop.
import math
import tidal
import utime
tidal.enable_peripheral_I2C()
from tidal import i2c, i2cp
class RAMBlockDev:
def __init__(self, block_size, num_blocks):
self.block_size = block_size
self.data = bytearray(block_size * num_blocks)
def readblocks(self, block_num, buf, offset=0):
addr = block_num * self.block_size + offset
for i in range(len(buf)):
buf[i] = self.data[addr + i]
def writeblocks(self, block_num, buf, offset=None):
if offset is None:
# do erase, then write
for i in range(len(buf) // self.block_size):
self.ioctl(6, block_num + i)
offset = 0
addr = block_num * self.block_size + offset
for i in range(len(buf)):
self.data[addr + i] = buf[i]
def ioctl(self, op, arg):
if op == 4: # block count
return len(self.data) // self.block_size
if op == 5: # block size
return self.block_size
if op == 6: # block erase
return 0
class EEPROM:
INTERNAL_BLOCK_SIZE = 32
BLOCK_SIZE = 128
CAPACITY = 4096
def __init__(self, bus, address):
self.bus = bus
self.address = address
def block_address(self, block_origin, block_offset, offset):
address = ((block_origin + block_offset) * self.BLOCK_SIZE) + offset
return bytes(((address >> 8) & 0xFF, (address >> 0) & 0xFF))
def readblocks(self, block_num, buf, offset=0):
print(f"readblocks({block_num}, buf<{len(buf)}>, {offset})")
address = self.block_address(block_num, 0, offset)
self.retrying_read(address, buf)
def retrying_read(self, address, buf):
while True:
try:
self.bus.writeto(self.address, address)
self.bus.readfrom_into(self.address, buf)
except OSError:
utime.sleep(0.1)
else:
return
def retrying_write(self, data):
utime.sleep(1)
while True:
try:
self.bus.writeto(self.address, data)
except OSError:
utime.sleep(0.1)
else:
return
def writeblocks(self, block_num, buf, offset=0):
for i in range(len(buf)):
address = self.block_address(block_num, 0, offset + i)
self.bus.writeto(self.address, address + buf[i:i+1])
utime.sleep(0.005)
"""print(f"writeblocks({block_num}, {buf}, {offset})")
internal_blocks_to_write = math.ceil(len(buf) / self.INTERNAL_BLOCK_SIZE)
print(f"Writing {internal_blocks_to_write} {self.INTERNAL_BLOCK_SIZE} byte blocks")
if offset is None:
clear = bytearray(self.INTERNAL_BLOCK_SIZE, 0)
for internal_offset in range(internal_blocks_to_write):
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE)
print(f"Clearing {self.INTERNAL_BLOCK_SIZE} bytes at {tuple(address)} with {address + clear}")
self.retrying_write(address + clear)
print("Okay")
del clear
for internal_offset in range(internal_blocks_to_write):
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE)
data = address + buf[internal_offset * self.INTERNAL_BLOCK_SIZE:(1+internal_offset) * self.INTERNAL_BLOCK_SIZE]
print(f"Writing {data} to {tuple(address)}")
self.retrying_write(data)
else:
for internal_offset in range(internal_blocks_to_write):
address = self.block_address(block_num, 0, internal_offset*self.INTERNAL_BLOCK_SIZE)
data = buf[internal_offset * self.INTERNAL_BLOCK_SIZE:(1+internal_offset) * self.INTERNAL_BLOCK_SIZE]
print(f"Writing {data} to {tuple(address)}")
self.retrying_write(address + data)
utime.sleep(0.1)
address = self.block_address(block_num, 0, offset)
self.bus.writeto(self.address, address + buf)"""
def ioctl(self, op, arg):
print(f"ioctl({op}, {arg})")
if op == 4: # get number of blocks
return self.CAPACITY // self.BLOCK_SIZE
if op == 5: # get block size
return self.BLOCK_SIZE
if op == 6: # Erase block
self.writeblocks(arg, bytearray(self.BLOCK_SIZE))
return 0
eeproms = [EEPROM(i2cp, addr) for addr in i2cp.scan()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment