Skip to content

Instantly share code, notes, and snippets.

@ryos36
Created December 9, 2017 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryos36/cf50b30425a37b49f177fe8a827d37cd to your computer and use it in GitHub Desktop.
Save ryos36/cf50b30425a37b49f177fe8a827d37cd to your computer and use it in GitHub Desktop.
from polyphony import testbench
from polyphony import rule
from polyphony import module
from polyphony.typing import bit8
from polyphony.io import Queue, Port
from polyphony import is_worker_running
from polyphony.timing import clksleep
from ascii_const import EOT, ACK, NAK, CAN
from xmodem_mem import x_mem, CMD_NOP, CMD_WRITE, CMD_READ, BLOCK_SIZE
@module
class xmodem_q2m:
GO_AHEAD = 0
FINISHED = 1
SUSPENDED = 2
ERROR = 3
CHECK_SUM_ERROR = 4
def __init__(self):
self.xmodem_in_q = Queue(bit8, 'in', maxsize=128)
self.xmodem_out_q = Queue(bit8, 'out', maxsize=128)
self.mem = x_mem()
self.idouk = Port(bit8, 'out', protocol='ready_valid')
self.append_worker(self.worker)
def transfer_queue2mem(self, correct_block_no):
data:bit8 = self.xmodem_in_q.rd()
if data == EOT:
print("EOT")
self.xmodem_out_q.wr(ACK)
return self.FINISHED
elif data == CAN:
return self.SUSPENDED
if data != correct_block_no:
print("block No. error:", data, " ", correct_block_no)
self.xmodem_out_q.wr(CAN)
return self.ERROR
data = self.xmodem_in_q.rd()
data &= 0xFF
if (data | correct_block_no ) != 0xFF:
print("not block No. error:", data, " ", correct_block_no)
self.xmodem_out_q.wr(CAN)
return self.ERROR
self.mem.cmd.wr(CMD_WRITE)
check_sum = 0
with rule(scheduling='pipeline'):
for i in range(BLOCK_SIZE):
data = self.xmodem_in_q.rd()
self.mem.bin_in_q.wr(data)
check_sum += data
data = self.xmodem_in_q.rd()
if ((check_sum & 0xFF) ^ data) != 0 :
print("Check Sum error:", check_sum, " ", data)
return self.CHECK_SUM_ERROR
return self.GO_AHEAD
def worker(self):
while is_worker_running():
block_no = 0
self.mem.cmd.wr(CMD_NOP)
self.xmodem_out_q.wr(NAK)
while True:
block_no += 1
rv = self.transfer_queue2mem(block_no)
print("rv:", rv, " block_no:", block_no)
if rv != self.GO_AHEAD:
break
clksleep(100)
print("verifiy:", block_no)
self.mem.cmd.wr(CMD_NOP)
print("NOP done")
for n in range(2):
print("n done", n)
self.mem.cmd.wr(CMD_READ)
print("CMD_READ done", n, ":", BLOCK_SIZE)
for i in range(BLOCK_SIZE):
d = self.mem.bin_out_q.rd()
#print("rd", n, ":", i, ":", d)
if d != (((i % 8) + 1) * 10):
print("error:", d, " vs ", (((i % 8) + 1) * 10))
self.idouk.wr(0)
@testbench
def test(m):
line_buf = [10, 20, 30, 40, 50, 60, 70, 80]
block_no:bit8 = 1
r = m.xmodem_out_q.rd()
if r == CAN:
return
for blk_n in range(2):
m.xmodem_in_q.wr(block_no)
m.xmodem_in_q.wr(~block_no)
check_sum = 0
xi = 0
for j in range(16):
for i in range(len(line_buf)):
bufi = i % 8
data = line_buf[bufi]
m.xmodem_in_q.wr(data)
check_sum += data
xi += 1
m.xmodem_in_q.wr(check_sum & 0xFF)
block_no += 1
m.xmodem_in_q.wr(EOT)
d = m.idouk.rd()
print("idouk:" , d)
m = xmodem_q2m()
test(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment