Created
October 24, 2023 18:28
-
-
Save patryk4815/a5959d50eda44e32b98167fd950fc6ed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import machine | |
import os | |
from micropython import const | |
import time | |
import network | |
import socket | |
_CMD_TIMEOUT = const(100) | |
_R1_IDLE_STATE = const(1 << 0) | |
# R1_ERASE_RESET = const(1 << 1) | |
_R1_ILLEGAL_COMMAND = const(1 << 2) | |
# R1_COM_CRC_ERROR = const(1 << 3) | |
# R1_ERASE_SEQUENCE_ERROR = const(1 << 4) | |
# R1_ADDRESS_ERROR = const(1 << 5) | |
# R1_PARAMETER_ERROR = const(1 << 6) | |
_TOKEN_CMD25 = const(0xFC) | |
_TOKEN_STOP_TRAN = const(0xFD) | |
_TOKEN_DATA = const(0xFE) | |
class SDCard: | |
def __init__(self, spi, cs, baudrate=1320000): | |
self.spi = spi | |
self.cs = cs | |
self.cmdbuf = bytearray(6) | |
self.dummybuf = bytearray(512) | |
self.tokenbuf = bytearray(1) | |
for i in range(512): | |
self.dummybuf[i] = 0xFF | |
self.dummybuf_memoryview = memoryview(self.dummybuf) | |
# initialise the card | |
self.init_card(baudrate) | |
def init_spi(self, baudrate): | |
try: | |
master = self.spi.MASTER | |
except AttributeError: | |
# on ESP8266 | |
self.spi.init(baudrate=baudrate, phase=0, polarity=0) | |
else: | |
# on pyboard | |
self.spi.init(master, baudrate=baudrate, phase=0, polarity=0) | |
def init_card(self, baudrate): | |
# init CS pin | |
self.cs.init(self.cs.OUT, value=1) | |
# init SPI bus; use low data rate for initialisation | |
self.init_spi(100000) | |
# clock card at least 100 cycles with cs high | |
for i in range(16): | |
self.spi.write(b"\xff") | |
# CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts) | |
for _ in range(5): | |
if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE: | |
break | |
else: | |
raise OSError("no SD card") | |
# CMD8: determine card version | |
r = self.cmd(8, 0x01AA, 0x87, 4) | |
if r == _R1_IDLE_STATE: | |
self.init_card_v2() | |
elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND): | |
self.init_card_v1() | |
else: | |
raise OSError("couldn't determine SD card version") | |
# get the number of sectors | |
# CMD9: response R2 (R1 byte + 16-byte block read) | |
if self.cmd(9, 0, 0, 0, False) != 0: | |
raise OSError("no response from SD card") | |
csd = bytearray(16) | |
self.readinto(csd) | |
if csd[0] & 0xC0 == 0x40: # CSD version 2.0 | |
self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024 | |
elif csd[0] & 0xC0 == 0x00: # CSD version 1.0 (old, <=2GB) | |
c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6 | |
c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7 | |
read_bl_len = csd[5] & 0b1111 | |
capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len) | |
self.sectors = capacity // 512 | |
else: | |
raise OSError("SD card CSD format not supported") | |
# print('sectors', self.sectors) | |
# CMD16: set block length to 512 bytes | |
if self.cmd(16, 512, 0) != 0: | |
raise OSError("can't set 512 block size") | |
# set to high data rate now that it's initialised | |
self.init_spi(baudrate) | |
def init_card_v1(self): | |
for i in range(_CMD_TIMEOUT): | |
time.sleep_ms(50) | |
self.cmd(55, 0, 0) | |
if self.cmd(41, 0, 0) == 0: | |
# SDSC card, uses byte addressing in read/write/erase commands | |
self.cdv = 512 | |
# print("[SDCard] v1 card") | |
return | |
raise OSError("timeout waiting for v1 card") | |
def init_card_v2(self): | |
for i in range(_CMD_TIMEOUT): | |
time.sleep_ms(50) | |
self.cmd(58, 0, 0, 4) | |
self.cmd(55, 0, 0) | |
if self.cmd(41, 0x40000000, 0) == 0: | |
self.cmd(58, 0, 0, -4) # 4-byte response, negative means keep the first byte | |
ocr = self.tokenbuf[0] # get first byte of response, which is OCR | |
if not ocr & 0x40: | |
# SDSC card, uses byte addressing in read/write/erase commands | |
self.cdv = 512 | |
else: | |
# SDHC/SDXC card, uses block addressing in read/write/erase commands | |
self.cdv = 1 | |
# print("[SDCard] v2 card") | |
return | |
raise OSError("timeout waiting for v2 card") | |
def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False): | |
self.cs(0) | |
# create and send the command | |
buf = self.cmdbuf | |
buf[0] = 0x40 | cmd | |
buf[1] = arg >> 24 | |
buf[2] = arg >> 16 | |
buf[3] = arg >> 8 | |
buf[4] = arg | |
buf[5] = crc | |
self.spi.write(buf) | |
if skip1: | |
self.spi.readinto(self.tokenbuf, 0xFF) | |
# wait for the response (response[7] == 0) | |
for i in range(_CMD_TIMEOUT): | |
self.spi.readinto(self.tokenbuf, 0xFF) | |
response = self.tokenbuf[0] | |
if not (response & 0x80): | |
# this could be a big-endian integer that we are getting here | |
# if final<0 then store the first byte to tokenbuf and discard the rest | |
if final < 0: | |
self.spi.readinto(self.tokenbuf, 0xFF) | |
final = -1 - final | |
for j in range(final): | |
self.spi.write(b"\xff") | |
if release: | |
self.cs(1) | |
self.spi.write(b"\xff") | |
return response | |
# timeout | |
self.cs(1) | |
self.spi.write(b"\xff") | |
return -1 | |
def readinto(self, buf): | |
self.cs(0) | |
# read until start byte (0xff) | |
for i in range(_CMD_TIMEOUT): | |
self.spi.readinto(self.tokenbuf, 0xFF) | |
if self.tokenbuf[0] == _TOKEN_DATA: | |
break | |
time.sleep_ms(1) | |
else: | |
self.cs(1) | |
raise OSError("timeout waiting for response") | |
# read data | |
mv = self.dummybuf_memoryview | |
if len(buf) != len(mv): | |
mv = mv[: len(buf)] | |
self.spi.write_readinto(mv, buf) | |
# read checksum | |
self.spi.write(b"\xff") | |
self.spi.write(b"\xff") | |
self.cs(1) | |
self.spi.write(b"\xff") | |
def write(self, token, buf): | |
self.cs(0) | |
# send: start of block, data, checksum | |
self.spi.read(1, token) | |
self.spi.write(buf) | |
self.spi.write(b"\xff") | |
self.spi.write(b"\xff") | |
# check the response | |
if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05: | |
self.cs(1) | |
self.spi.write(b"\xff") | |
return | |
# wait for write to finish | |
while self.spi.read(1, 0xFF)[0] == 0: | |
pass | |
self.cs(1) | |
self.spi.write(b"\xff") | |
def write_token(self, token): | |
self.cs(0) | |
self.spi.read(1, token) | |
self.spi.write(b"\xff") | |
# wait for write to finish | |
while self.spi.read(1, 0xFF)[0] == 0x00: | |
pass | |
self.cs(1) | |
self.spi.write(b"\xff") | |
def readblocks(self, block_num, buf): | |
# workaround for shared bus, required for (at least) some Kingston | |
# devices, ensure MOSI is high before starting transaction | |
self.spi.write(b"\xff") | |
nblocks = len(buf) // 512 | |
assert nblocks and not len(buf) % 512, "Buffer length is invalid" | |
if nblocks == 1: | |
# CMD17: set read address for single block | |
if self.cmd(17, block_num * self.cdv, 0, release=False) != 0: | |
# release the card | |
self.cs(1) | |
raise OSError(5) # EIO | |
# receive the data and release card | |
self.readinto(buf) | |
else: | |
# CMD18: set read address for multiple blocks | |
if self.cmd(18, block_num * self.cdv, 0, release=False) != 0: | |
# release the card | |
self.cs(1) | |
raise OSError(5) # EIO | |
offset = 0 | |
mv = memoryview(buf) | |
while nblocks: | |
# receive the data and release card | |
self.readinto(mv[offset : offset + 512]) | |
offset += 512 | |
nblocks -= 1 | |
if self.cmd(12, 0, 0xFF, skip1=True): | |
raise OSError(5) # EIO | |
def writeblocks(self, block_num, buf): | |
# workaround for shared bus, required for (at least) some Kingston | |
# devices, ensure MOSI is high before starting transaction | |
self.spi.write(b"\xff") | |
nblocks, err = divmod(len(buf), 512) | |
assert nblocks and not err, "Buffer length is invalid" | |
if nblocks == 1: | |
# CMD24: set write address for single block | |
if self.cmd(24, block_num * self.cdv, 0) != 0: | |
raise OSError(5) # EIO | |
# send the data | |
self.write(_TOKEN_DATA, buf) | |
else: | |
# CMD25: set write address for first block | |
if self.cmd(25, block_num * self.cdv, 0) != 0: | |
raise OSError(5) # EIO | |
# send the data | |
offset = 0 | |
mv = memoryview(buf) | |
while nblocks: | |
self.write(_TOKEN_CMD25, mv[offset : offset + 512]) | |
offset += 512 | |
nblocks -= 1 | |
self.write_token(_TOKEN_STOP_TRAN) | |
def ioctl(self, op, arg): | |
if op == 4: # get number of blocks | |
return self.sectors | |
if op == 5: # get block size in bytes | |
return 512 | |
# gpio12 - MISO | |
# gpio13 - MOSI | |
# gpio14 - SCK | |
# gpio4,5 - CS | |
CS_PIN = 4 | |
MISO_PIN = 12 | |
MOSI_PIN = 13 | |
SCLK_PIN = 14 | |
CS_SENSE_PIN = 5 | |
LED_PIN = 2 | |
class Main: | |
def __init__(self): | |
self.sd = None | |
self.server_socket = None | |
def led_on(self): | |
p = machine.Pin(LED_PIN) | |
p.on() | |
def led_off(slef): | |
p = machine.Pin(LED_PIN) | |
p.off() | |
def take_bus_control(self): | |
self.led_on() | |
# 4. CLK_XTAL | |
# 5. CLK_RTC | |
# 12. SPI_MISO | |
# 13. SPI_MOSI | |
# 14. SPI_CLK | |
# 15. SPI_SS | |
# UART RX/TX on pins 1 - 3, HSPI for pins 12-15 and CLK functions for pins 0, 4 and 5. | |
# enum { | |
# PIN_MODE_IN = 0, | |
# PIN_MODE_OUT, | |
# PIN_MODE_OPEN_DRAIN, | |
# PIN_MODE_ALT, | |
# PIN_MODE_SKIP, | |
# PIN_MODE_IT_RISING, | |
# PIN_MODE_IT_FALLING, | |
# PIN_MODE_IT_BOTH, | |
# }; | |
machine.Pin(MISO_PIN).mode(machine.Pin.OPEN_DRAIN) | |
machine.Pin(MOSI_PIN).mode(machine.Pin.OPEN_DRAIN) | |
machine.Pin(SCLK_PIN).mode(machine.Pin.OPEN_DRAIN) | |
machine.Pin(CS_PIN).mode(machine.Pin.OUT) | |
def relinquish_bus_control(self): | |
machine.Pin(MISO_PIN).mode(machine.Pin.IN) | |
machine.Pin(MOSI_PIN).mode(machine.Pin.IN) | |
machine.Pin(SCLK_PIN).mode(machine.Pin.IN) | |
machine.Pin(CS_PIN).mode(machine.Pin.IN) | |
self.led_off() | |
def connect_wifi(self): | |
wifi = network.WLAN(network.STA_IF) | |
wifi.active(True) | |
wifi.connect('YourNetworkSSID', 'YourNetworkPassword') | |
while not wifi.isconnected(): | |
time.sleep(1) | |
print('Connected to Wi-Fi:', wifi.ifconfig()) | |
def parser_header(self, data: bytes): | |
out = {} | |
lines = data.split(b'\r\n') | |
for line in lines[1:]: | |
key, value = line.split(b': ', 1) | |
out[key.decode().lower()] = value.decode() | |
firstdata = lines[0].decode().split(' ') | |
method = firstdata[0] | |
url = firstdata[1] | |
urlparams = '' | |
if '?' in url: | |
url, urlparams = url.split('?', 1) | |
return method, url, urlparams, out | |
def handle_start(self) -> bytes: | |
self.take_bus_control() | |
self.sd = SDCard( | |
spi=machine.SPI(sck=machine.Pin(SCLK_PIN), mosi=machine.Pin(MOSI_PIN), miso=machine.Pin(MISO_PIN)), | |
cs=machine.Pin(CS_PIN), | |
) | |
return b'OK' | |
def handle_stop(self) -> bytes: | |
self.relinquish_bus_control() | |
self.sd = None | |
return b'OK' | |
def handle_size(self) -> bytes: | |
if not self.sd: | |
return b'error not sdcard' | |
return f'sectors={self.sd.sectors}&blocksize={self.sd.cdv}'.encode() | |
def handle_read(self, sectorid: int) -> bytes: | |
if not self.sd: | |
return b'error not sdcard' | |
buf = bytes(bytearray([0]*512)) | |
self.sd.readblocks(sectorid, buf) | |
return buf | |
def handle_write(self, sectorid: int, buf: bytes) -> bytes: | |
if not self.sd: | |
return b'error not sdcard' | |
if len(buf) != 512: | |
return b'error size not match 512' | |
self.sd.writeblocks(sectorid, buf) | |
return b'OK' | |
def handle_data(self, client): | |
data = client.recv(1024) | |
headerdata, clientdata = data.split(b'\r\n\r\n', 1) | |
method, url, urlparams, header = self.parser_header(headerdata) | |
payload = b'' | |
if method == 'POST' and header.get('content-length'): | |
datalen = int(header.get('content-length')) - len(clientdata) | |
payload = clientdata | |
if datalen > 0: | |
payload += client.recv(datalen) | |
response = b'' | |
if method == 'GET' and url == '/start': | |
response = self.handle_start() | |
if method == 'GET' and url == '/stop': | |
response = self.handle_stop() | |
if method == 'GET' and url == '/size': | |
response = self.handle_size() | |
if method == 'GET' and url == '/read': | |
sectorid = int(urlparams) | |
response = self.handle_read(sectorid) | |
if method == 'POST' and url == '/write': | |
sectorid = int(urlparams) | |
response = self.handle_write(sectorid, payload) | |
client.send(b"HTTP/1.0 200 OK\r\nContent-Length: " + str(len(response)).encode() + b"\r\n\r\n") | |
client.send(response) | |
def start_server(self): | |
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
host = '0.0.0.0' | |
port = 8080 | |
self.server_socket.bind((host, port)) | |
self.server_socket.listen(5) | |
print("Server listening on port", port) | |
while True: | |
client_socket, client_address = self.server_socket.accept() | |
print("Accepted connection from", client_address) | |
self.handle_data(client_socket) | |
client_socket.close() | |
def run(self): | |
self.relinquish_bus_control() | |
self.connect_wifi() | |
self.start_server() | |
Main().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment