Skip to content

Instantly share code, notes, and snippets.

@cfelton
Created August 12, 2015 05:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cfelton/7af8ad50c36e7c62d223 to your computer and use it in GitHub Desktop.
Save cfelton/7af8ad50c36e7c62d223 to your computer and use it in GitHub Desktop.
from __future__ import division
from __future__ import print_function
from random import shuffle, randint
import myhdl
from myhdl import (Signal, ResetSignal, intbv, always_seq, always,
delay, instance, traceSignals, Simulation)
class DataStream:
def __init__(self, val, nbits):
""" Interface between processing elements """
self._ival = val
self.data_valid = Signal(bool(0))
self.data = Signal(intbv(val)[nbits:])
def __repr__(self):
return str(self._ival)
def pe(clock, reset, sti, sto, toggle_bits=0, row=0, col=0):
""" Simple processing element """
@always_seq(clock.posedge, reset=reset)
def rtl():
if sti.data_valid:
sto.data.next = sti.data ^ toggle_bits
sto.data_valid.next = True
else:
sto.data_valid.next = False
return rtl
def pe_matrix(clock, reset, data_in, data_out,
number_of_columns=48, number_of_rows=32):
""" Create a matrix of processing elements
This top-level creates a matrix of randomly connected
processing elements. Each column is randomly connected
to the subsequent column. An input arbitrator feeds the
matrix and an output arbitrator collects the data from
the matrix. The PE matrix ogranization is illusgtrated in
the following:
row/col 0 1 2 3
0 [pe] [pe] [pe]
1 [arb in] [pe] [pe] [pe] [arb out]
2 [pe] [pe] [pe]
"""
assert len(data_in) == len(data_out)
nbits = len(data_in)
# A list-of-signals (list of rows) to wire up the
# processing emlements, maxtrix_wires[row][col]
matrix_wires = [[None for col in range(number_of_columns+1)]
for row in range(number_of_rows)]
# A list-of-generators (module instances) to keep track
matrix_inst = [[None for col in range(number_of_columns)]
for row in range(number_of_rows)]
# fill in the first column with interface objects
for row in range(number_of_rows):
matrix_wires[row][0] = DataStream(row*number_of_rows, nbits)
# instantiated the input arbitrator
dci = extract_column(matrix_wires, 0)
arb_input_inst = pe_input_arb(clock, reset, data_in, dci)
# create the matrix of instances (generators) of the PEs.
for col in range(1, number_of_columns+1):
rowass = list(range(number_of_rows))
# create the random row assignment from previous column
# to the current wire column (output of the current PE)
shuffle(rowass)
for row, rnd in enumerate(rowass):
sti = matrix_wires[rnd][col-1]
sto = DataStream(row*number_of_rows+col, nbits)
matrix_wires[row][col] = sto
# instantiate and map the PE
toggle_bits = randint(0, sto.data.max-1)
assert matrix_inst[row][col-1] is None
matrix_inst[row][col-1] = pe(clock, reset, sti, sto,
toggle_bits, row, col-1)
# instantiated the input arbitrator
dco = extract_column(matrix_wires, number_of_columns)
arb_output_inst = pe_output_arb(clock, reset, dco, data_out)
# flatten the matrix of instances (generators)
pe_insts = [matrix_inst[row][col] for col in range(number_of_columns)
for row in range(number_of_rows)]
assert None not in pe_insts
return arb_input_inst, pe_insts, arb_output_inst
def extract_column(matrix, col):
nrows = len(matrix)
rowlos = [None for _ in range(nrows)]
for row in range(nrows):
rowlos[row] = matrix[row][col]
return rowlos
def extract_interface(data_col):
data = [rr.data for rr in data_col]
dv = [rr.data_valid for rr in data_col]
return data, dv
def pe_input_arb(clock, reset, data_in, data_out_col):
nrows = len(data_out_col)
cnt = Signal(intbv(0, min=0, max=nrows))
data, dv = extract_interface(data_out_col)
@always_seq(clock.posedge, reset=reset)
def rtl():
if cnt == nrows-1:
cnt.next = 0
else:
cnt.next = cnt + 1
for row in range(nrows):
if row == cnt:
data[row].next = data_in
dv[row].next = True
else:
data[row].next = 0
dv[row].next = False
return rtl
def pe_output_arb(clock, reset, data_in_col, data_out):
nrows = len(data_in_col)
data, dv = extract_interface(data_in_col)
@always_seq(clock.posedge, reset=reset)
def rtl():
for row in range(nrows):
if dv[row]:
data_out.next = data[row]
# @todo: check for multiple dv!
return rtl
def test():
""" Simple test to verify "something" happens """
clock = Signal(bool(1))
reset = ResetSignal(0, active=1, async=False)
data_in = Signal(intbv(0)[4:])
data_out = Signal(intbv(0)[4:])
def _bench():
tbdut = pe_matrix(clock, reset, data_in, data_out)
@always(delay(5))
def tbclk():
clock.next = not clock
@instance
def tbstim():
for ii in range(1000):
data_in.next = x = randint(0, data_in.max-1)
yield clock.posedge
raise myhdl.StopSimulation
return tbdut, tbclk, tbstim
Simulation(traceSignals(_bench)).run()
myhdl.toVerilog(pe_matrix, clock, reset, data_in, data_out)
myhdl.toVHDL(pe_matrix, clock, reset, data_in, data_out)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment