-
-
Save ryos36/cf50b30425a37b49f177fe8a827d37cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from polyphony import testbench | |
from polyphony import rule | |
from polyphony import module | |
from polyphony.typing import bit8 | |
from polyphony.io import Queue, Port | |
from polyphony import is_worker_running | |
from polyphony.timing import clksleep | |
from ascii_const import EOT, ACK, NAK, CAN | |
from xmodem_mem import x_mem, CMD_NOP, CMD_WRITE, CMD_READ, BLOCK_SIZE | |
@module | |
class xmodem_q2m: | |
GO_AHEAD = 0 | |
FINISHED = 1 | |
SUSPENDED = 2 | |
ERROR = 3 | |
CHECK_SUM_ERROR = 4 | |
def __init__(self): | |
self.xmodem_in_q = Queue(bit8, 'in', maxsize=128) | |
self.xmodem_out_q = Queue(bit8, 'out', maxsize=128) | |
self.mem = x_mem() | |
self.idouk = Port(bit8, 'out', protocol='ready_valid') | |
self.append_worker(self.worker) | |
def transfer_queue2mem(self, correct_block_no): | |
data:bit8 = self.xmodem_in_q.rd() | |
if data == EOT: | |
print("EOT") | |
self.xmodem_out_q.wr(ACK) | |
return self.FINISHED | |
elif data == CAN: | |
return self.SUSPENDED | |
if data != correct_block_no: | |
print("block No. error:", data, " ", correct_block_no) | |
self.xmodem_out_q.wr(CAN) | |
return self.ERROR | |
data = self.xmodem_in_q.rd() | |
data &= 0xFF | |
if (data | correct_block_no ) != 0xFF: | |
print("not block No. error:", data, " ", correct_block_no) | |
self.xmodem_out_q.wr(CAN) | |
return self.ERROR | |
self.mem.cmd.wr(CMD_WRITE) | |
check_sum = 0 | |
with rule(scheduling='pipeline'): | |
for i in range(BLOCK_SIZE): | |
data = self.xmodem_in_q.rd() | |
self.mem.bin_in_q.wr(data) | |
check_sum += data | |
data = self.xmodem_in_q.rd() | |
if ((check_sum & 0xFF) ^ data) != 0 : | |
print("Check Sum error:", check_sum, " ", data) | |
return self.CHECK_SUM_ERROR | |
return self.GO_AHEAD | |
def worker(self): | |
while is_worker_running(): | |
block_no = 0 | |
self.mem.cmd.wr(CMD_NOP) | |
self.xmodem_out_q.wr(NAK) | |
while True: | |
block_no += 1 | |
rv = self.transfer_queue2mem(block_no) | |
print("rv:", rv, " block_no:", block_no) | |
if rv != self.GO_AHEAD: | |
break | |
clksleep(100) | |
print("verifiy:", block_no) | |
self.mem.cmd.wr(CMD_NOP) | |
print("NOP done") | |
for n in range(2): | |
print("n done", n) | |
self.mem.cmd.wr(CMD_READ) | |
print("CMD_READ done", n, ":", BLOCK_SIZE) | |
for i in range(BLOCK_SIZE): | |
d = self.mem.bin_out_q.rd() | |
#print("rd", n, ":", i, ":", d) | |
if d != (((i % 8) + 1) * 10): | |
print("error:", d, " vs ", (((i % 8) + 1) * 10)) | |
self.idouk.wr(0) | |
@testbench | |
def test(m): | |
line_buf = [10, 20, 30, 40, 50, 60, 70, 80] | |
block_no:bit8 = 1 | |
r = m.xmodem_out_q.rd() | |
if r == CAN: | |
return | |
for blk_n in range(2): | |
m.xmodem_in_q.wr(block_no) | |
m.xmodem_in_q.wr(~block_no) | |
check_sum = 0 | |
xi = 0 | |
for j in range(16): | |
for i in range(len(line_buf)): | |
bufi = i % 8 | |
data = line_buf[bufi] | |
m.xmodem_in_q.wr(data) | |
check_sum += data | |
xi += 1 | |
m.xmodem_in_q.wr(check_sum & 0xFF) | |
block_no += 1 | |
m.xmodem_in_q.wr(EOT) | |
d = m.idouk.rd() | |
print("idouk:" , d) | |
m = xmodem_q2m() | |
test(m) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment