Skip to content

Instantly share code, notes, and snippets.

@ryos36

ryos36/x_mem.py Secret

Created December 9, 2017 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryos36/62d48e37346048b00f502ce12d29565f to your computer and use it in GitHub Desktop.
Save ryos36/62d48e37346048b00f502ce12d29565f to your computer and use it in GitHub Desktop.
from polyphony import testbench
from polyphony import rule
from polyphony import module
from polyphony.typing import List, bit, bit2, bit8
from polyphony.io import Queue, Port
from polyphony import is_worker_running
from polyphony.timing import clksleep, clkfence
BLOCK_SIZE = 128
CMD_NOP = 0
CMD_WRITE = 1
CMD_READ = 2
@module
class x_mem:
def __init__(self, max_block_n = 8):
self.max_block_n = max_block_n
self.bin_in_q = Queue(bit8, 'in', maxsize=128)
self.bin_out_q = Queue(bit8, 'out', maxsize=128)
self.cmd = Queue(bit2, 'in')
self.cmd_busy = Port(bit, 'out', init=0)
self.append_worker(self.worker)
def worker(self):
block = [0] * self.max_block_n * BLOCK_SIZE # type: List[bit8]
wr_block_i = 0
rd_block_i = 0
while is_worker_running():
cmd = self.cmd.rd()
if cmd == CMD_NOP:
print("do CMD_NOP")
wr_block_i = 0
rd_block_i = 0
continue
self.cmd_busy.wr(1)
bytes_n = BLOCK_SIZE
print("cmd start:", cmd, " wr_block_i:", wr_block_i, " rd_block_i:", rd_block_i)
if cmd == CMD_WRITE:
with rule(scheduling='pipeline'):
for i in range(bytes_n):
block[wr_block_i] = self.bin_in_q.rd()
#print("in:", i, ":", wr_block_i, ":", block[wr_block_i])
wr_block_i += 1
else:
assert cmd == CMD_READ
print("read bytes", bytes_n)
with rule(scheduling='pipeline'):
for i in range(bytes_n):
self.bin_out_q.wr(block[rd_block_i])
rd_block_i += 1
print("cmd done:", cmd)
self.cmd_busy.wr(0)
#----------------------------------------------------------------
@testbench
def test(m):
block_n = 1
data = [None] * (block_n * BLOCK_SIZE)
for i in range(len(data)):
data[i] = i
m.cmd.wr(CMD_NOP)
m.cmd.wr(CMD_WRITE)
for i in range(len(data)):
m.bin_in_q.wr(data[i])
for i in range(1000):
b = m.cmd_busy.rd()
if ( b == 0 ) :
print("detect done:")
break
clksleep(1)
if b == 1:
print("Error !! Busy")
m.cmd.wr(CMD_READ)
for i in range(len(data)):
d = m.bin_out_q.rd()
if d != data[i]:
print("error data[", i, "] = ", data[i], " d = ", d)
#----------------------------------------------------------------
if __name__ == '__main__':
m = x_mem()
test(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment