Skip to content

Instantly share code, notes, and snippets.

@patryk4815
Created October 24, 2023 18:28
Show Gist options
  • Save patryk4815/a5959d50eda44e32b98167fd950fc6ed to your computer and use it in GitHub Desktop.
Save patryk4815/a5959d50eda44e32b98167fd950fc6ed to your computer and use it in GitHub Desktop.
import machine
import os
from micropython import const
import time
import network
import socket
_CMD_TIMEOUT = const(100)
_R1_IDLE_STATE = const(1 << 0)
# R1_ERASE_RESET = const(1 << 1)
_R1_ILLEGAL_COMMAND = const(1 << 2)
# R1_COM_CRC_ERROR = const(1 << 3)
# R1_ERASE_SEQUENCE_ERROR = const(1 << 4)
# R1_ADDRESS_ERROR = const(1 << 5)
# R1_PARAMETER_ERROR = const(1 << 6)
_TOKEN_CMD25 = const(0xFC)
_TOKEN_STOP_TRAN = const(0xFD)
_TOKEN_DATA = const(0xFE)
class SDCard:
def __init__(self, spi, cs, baudrate=1320000):
self.spi = spi
self.cs = cs
self.cmdbuf = bytearray(6)
self.dummybuf = bytearray(512)
self.tokenbuf = bytearray(1)
for i in range(512):
self.dummybuf[i] = 0xFF
self.dummybuf_memoryview = memoryview(self.dummybuf)
# initialise the card
self.init_card(baudrate)
def init_spi(self, baudrate):
try:
master = self.spi.MASTER
except AttributeError:
# on ESP8266
self.spi.init(baudrate=baudrate, phase=0, polarity=0)
else:
# on pyboard
self.spi.init(master, baudrate=baudrate, phase=0, polarity=0)
def init_card(self, baudrate):
# init CS pin
self.cs.init(self.cs.OUT, value=1)
# init SPI bus; use low data rate for initialisation
self.init_spi(100000)
# clock card at least 100 cycles with cs high
for i in range(16):
self.spi.write(b"\xff")
# CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts)
for _ in range(5):
if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE:
break
else:
raise OSError("no SD card")
# CMD8: determine card version
r = self.cmd(8, 0x01AA, 0x87, 4)
if r == _R1_IDLE_STATE:
self.init_card_v2()
elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND):
self.init_card_v1()
else:
raise OSError("couldn't determine SD card version")
# get the number of sectors
# CMD9: response R2 (R1 byte + 16-byte block read)
if self.cmd(9, 0, 0, 0, False) != 0:
raise OSError("no response from SD card")
csd = bytearray(16)
self.readinto(csd)
if csd[0] & 0xC0 == 0x40: # CSD version 2.0
self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024
elif csd[0] & 0xC0 == 0x00: # CSD version 1.0 (old, <=2GB)
c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6
c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7
read_bl_len = csd[5] & 0b1111
capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len)
self.sectors = capacity // 512
else:
raise OSError("SD card CSD format not supported")
# print('sectors', self.sectors)
# CMD16: set block length to 512 bytes
if self.cmd(16, 512, 0) != 0:
raise OSError("can't set 512 block size")
# set to high data rate now that it's initialised
self.init_spi(baudrate)
def init_card_v1(self):
for i in range(_CMD_TIMEOUT):
time.sleep_ms(50)
self.cmd(55, 0, 0)
if self.cmd(41, 0, 0) == 0:
# SDSC card, uses byte addressing in read/write/erase commands
self.cdv = 512
# print("[SDCard] v1 card")
return
raise OSError("timeout waiting for v1 card")
def init_card_v2(self):
for i in range(_CMD_TIMEOUT):
time.sleep_ms(50)
self.cmd(58, 0, 0, 4)
self.cmd(55, 0, 0)
if self.cmd(41, 0x40000000, 0) == 0:
self.cmd(58, 0, 0, -4) # 4-byte response, negative means keep the first byte
ocr = self.tokenbuf[0] # get first byte of response, which is OCR
if not ocr & 0x40:
# SDSC card, uses byte addressing in read/write/erase commands
self.cdv = 512
else:
# SDHC/SDXC card, uses block addressing in read/write/erase commands
self.cdv = 1
# print("[SDCard] v2 card")
return
raise OSError("timeout waiting for v2 card")
def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False):
self.cs(0)
# create and send the command
buf = self.cmdbuf
buf[0] = 0x40 | cmd
buf[1] = arg >> 24
buf[2] = arg >> 16
buf[3] = arg >> 8
buf[4] = arg
buf[5] = crc
self.spi.write(buf)
if skip1:
self.spi.readinto(self.tokenbuf, 0xFF)
# wait for the response (response[7] == 0)
for i in range(_CMD_TIMEOUT):
self.spi.readinto(self.tokenbuf, 0xFF)
response = self.tokenbuf[0]
if not (response & 0x80):
# this could be a big-endian integer that we are getting here
# if final<0 then store the first byte to tokenbuf and discard the rest
if final < 0:
self.spi.readinto(self.tokenbuf, 0xFF)
final = -1 - final
for j in range(final):
self.spi.write(b"\xff")
if release:
self.cs(1)
self.spi.write(b"\xff")
return response
# timeout
self.cs(1)
self.spi.write(b"\xff")
return -1
def readinto(self, buf):
self.cs(0)
# read until start byte (0xff)
for i in range(_CMD_TIMEOUT):
self.spi.readinto(self.tokenbuf, 0xFF)
if self.tokenbuf[0] == _TOKEN_DATA:
break
time.sleep_ms(1)
else:
self.cs(1)
raise OSError("timeout waiting for response")
# read data
mv = self.dummybuf_memoryview
if len(buf) != len(mv):
mv = mv[: len(buf)]
self.spi.write_readinto(mv, buf)
# read checksum
self.spi.write(b"\xff")
self.spi.write(b"\xff")
self.cs(1)
self.spi.write(b"\xff")
def write(self, token, buf):
self.cs(0)
# send: start of block, data, checksum
self.spi.read(1, token)
self.spi.write(buf)
self.spi.write(b"\xff")
self.spi.write(b"\xff")
# check the response
if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05:
self.cs(1)
self.spi.write(b"\xff")
return
# wait for write to finish
while self.spi.read(1, 0xFF)[0] == 0:
pass
self.cs(1)
self.spi.write(b"\xff")
def write_token(self, token):
self.cs(0)
self.spi.read(1, token)
self.spi.write(b"\xff")
# wait for write to finish
while self.spi.read(1, 0xFF)[0] == 0x00:
pass
self.cs(1)
self.spi.write(b"\xff")
def readblocks(self, block_num, buf):
# workaround for shared bus, required for (at least) some Kingston
# devices, ensure MOSI is high before starting transaction
self.spi.write(b"\xff")
nblocks = len(buf) // 512
assert nblocks and not len(buf) % 512, "Buffer length is invalid"
if nblocks == 1:
# CMD17: set read address for single block
if self.cmd(17, block_num * self.cdv, 0, release=False) != 0:
# release the card
self.cs(1)
raise OSError(5) # EIO
# receive the data and release card
self.readinto(buf)
else:
# CMD18: set read address for multiple blocks
if self.cmd(18, block_num * self.cdv, 0, release=False) != 0:
# release the card
self.cs(1)
raise OSError(5) # EIO
offset = 0
mv = memoryview(buf)
while nblocks:
# receive the data and release card
self.readinto(mv[offset : offset + 512])
offset += 512
nblocks -= 1
if self.cmd(12, 0, 0xFF, skip1=True):
raise OSError(5) # EIO
def writeblocks(self, block_num, buf):
# workaround for shared bus, required for (at least) some Kingston
# devices, ensure MOSI is high before starting transaction
self.spi.write(b"\xff")
nblocks, err = divmod(len(buf), 512)
assert nblocks and not err, "Buffer length is invalid"
if nblocks == 1:
# CMD24: set write address for single block
if self.cmd(24, block_num * self.cdv, 0) != 0:
raise OSError(5) # EIO
# send the data
self.write(_TOKEN_DATA, buf)
else:
# CMD25: set write address for first block
if self.cmd(25, block_num * self.cdv, 0) != 0:
raise OSError(5) # EIO
# send the data
offset = 0
mv = memoryview(buf)
while nblocks:
self.write(_TOKEN_CMD25, mv[offset : offset + 512])
offset += 512
nblocks -= 1
self.write_token(_TOKEN_STOP_TRAN)
def ioctl(self, op, arg):
if op == 4: # get number of blocks
return self.sectors
if op == 5: # get block size in bytes
return 512
# gpio12 - MISO
# gpio13 - MOSI
# gpio14 - SCK
# gpio4,5 - CS
CS_PIN = 4
MISO_PIN = 12
MOSI_PIN = 13
SCLK_PIN = 14
CS_SENSE_PIN = 5
LED_PIN = 2
class Main:
def __init__(self):
self.sd = None
self.server_socket = None
def led_on(self):
p = machine.Pin(LED_PIN)
p.on()
def led_off(slef):
p = machine.Pin(LED_PIN)
p.off()
def take_bus_control(self):
self.led_on()
# 4. CLK_XTAL
# 5. CLK_RTC
# 12. SPI_MISO
# 13. SPI_MOSI
# 14. SPI_CLK
# 15. SPI_SS
# UART RX/TX on pins 1 - 3, HSPI for pins 12-15 and CLK functions for pins 0, 4 and 5.
# enum {
# PIN_MODE_IN = 0,
# PIN_MODE_OUT,
# PIN_MODE_OPEN_DRAIN,
# PIN_MODE_ALT,
# PIN_MODE_SKIP,
# PIN_MODE_IT_RISING,
# PIN_MODE_IT_FALLING,
# PIN_MODE_IT_BOTH,
# };
machine.Pin(MISO_PIN).mode(machine.Pin.OPEN_DRAIN)
machine.Pin(MOSI_PIN).mode(machine.Pin.OPEN_DRAIN)
machine.Pin(SCLK_PIN).mode(machine.Pin.OPEN_DRAIN)
machine.Pin(CS_PIN).mode(machine.Pin.OUT)
def relinquish_bus_control(self):
machine.Pin(MISO_PIN).mode(machine.Pin.IN)
machine.Pin(MOSI_PIN).mode(machine.Pin.IN)
machine.Pin(SCLK_PIN).mode(machine.Pin.IN)
machine.Pin(CS_PIN).mode(machine.Pin.IN)
self.led_off()
def connect_wifi(self):
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect('YourNetworkSSID', 'YourNetworkPassword')
while not wifi.isconnected():
time.sleep(1)
print('Connected to Wi-Fi:', wifi.ifconfig())
def parser_header(self, data: bytes):
out = {}
lines = data.split(b'\r\n')
for line in lines[1:]:
key, value = line.split(b': ', 1)
out[key.decode().lower()] = value.decode()
firstdata = lines[0].decode().split(' ')
method = firstdata[0]
url = firstdata[1]
urlparams = ''
if '?' in url:
url, urlparams = url.split('?', 1)
return method, url, urlparams, out
def handle_start(self) -> bytes:
self.take_bus_control()
self.sd = SDCard(
spi=machine.SPI(sck=machine.Pin(SCLK_PIN), mosi=machine.Pin(MOSI_PIN), miso=machine.Pin(MISO_PIN)),
cs=machine.Pin(CS_PIN),
)
return b'OK'
def handle_stop(self) -> bytes:
self.relinquish_bus_control()
self.sd = None
return b'OK'
def handle_size(self) -> bytes:
if not self.sd:
return b'error not sdcard'
return f'sectors={self.sd.sectors}&blocksize={self.sd.cdv}'.encode()
def handle_read(self, sectorid: int) -> bytes:
if not self.sd:
return b'error not sdcard'
buf = bytes(bytearray([0]*512))
self.sd.readblocks(sectorid, buf)
return buf
def handle_write(self, sectorid: int, buf: bytes) -> bytes:
if not self.sd:
return b'error not sdcard'
if len(buf) != 512:
return b'error size not match 512'
self.sd.writeblocks(sectorid, buf)
return b'OK'
def handle_data(self, client):
data = client.recv(1024)
headerdata, clientdata = data.split(b'\r\n\r\n', 1)
method, url, urlparams, header = self.parser_header(headerdata)
payload = b''
if method == 'POST' and header.get('content-length'):
datalen = int(header.get('content-length')) - len(clientdata)
payload = clientdata
if datalen > 0:
payload += client.recv(datalen)
response = b''
if method == 'GET' and url == '/start':
response = self.handle_start()
if method == 'GET' and url == '/stop':
response = self.handle_stop()
if method == 'GET' and url == '/size':
response = self.handle_size()
if method == 'GET' and url == '/read':
sectorid = int(urlparams)
response = self.handle_read(sectorid)
if method == 'POST' and url == '/write':
sectorid = int(urlparams)
response = self.handle_write(sectorid, payload)
client.send(b"HTTP/1.0 200 OK\r\nContent-Length: " + str(len(response)).encode() + b"\r\n\r\n")
client.send(response)
def start_server(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = '0.0.0.0'
port = 8080
self.server_socket.bind((host, port))
self.server_socket.listen(5)
print("Server listening on port", port)
while True:
client_socket, client_address = self.server_socket.accept()
print("Accepted connection from", client_address)
self.handle_data(client_socket)
client_socket.close()
def run(self):
self.relinquish_bus_control()
self.connect_wifi()
self.start_server()
Main().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment