Skip to content

Instantly share code, notes, and snippets.

@tzechienchu
Last active December 16, 2022 03:19
Show Gist options
  • Save tzechienchu/43d85b837c6b5e42eb55b697e6d536ca to your computer and use it in GitHub Desktop.
Save tzechienchu/43d85b837c6b5e42eb55b697e6d536ca to your computer and use it in GitHub Desktop.
Amaranth HDL I2C Slave via amaranth-community-unofficial
from amaranth import Signal, Module, Cat, Elaboratable, Record, Mux
from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT
from amaranth.hdl.ast import Rose, Fell
from amaranth.compat import TSTriple
from amaranth.lib.cdc import FFSynchronizer
class I2CTarget(Elaboratable):
"""
Simple I2C target.
Clock stretching is not supported.
Builtin responses (identification, general call, etc.) are not provided.
Note that the start, stop, and restart strobes are transaction delimiters rather than direct
indicators of bus conditions. A transaction always starts with a start strobe and ends with
either a stop or a restart strobe. That is, a restart strobe, similarly to a stop strobe, may
be only followed by another start strobe (or no strobe at all if the device is not addressed
again).
:attr address:
The 7-bit address the target will respond to.
:attr start:
Start strobe. Active for one cycle immediately after acknowledging address.
:attr stop:
Stop stobe. Active for one cycle immediately after a stop condition that terminates
a transaction that addressed this device.
:attr restart:
Repeated start strobe. Active for one cycle immediately after a repeated start condition
that terminates a transaction that addressed this device.
:attr write:
Write strobe. Active for one cycle immediately after receiving a data octet.
:attr data_i:
Data octet received from the initiator. Valid when ``write`` is high.
:attr ack_o:
Acknowledge strobe. If active for at least one cycle during the acknowledge bit
setup period (one half-period after write strobe is asserted), acknowledge is asserted.
Otherwise, no acknowledge is asserted. May use combinatorial feedback from ``write``.
:attr read:
Read strobe. Active for one cycle immediately before latching ``data_o``.
:attr data_o:
Data octet to be transmitted to the initiator. Latched immediately after receiving
a read command.
"""
def __init__(self):
self.scl_t = TSTriple()
self.sda_t = TSTriple()
self.scl_i0 = Signal(name="scl_i") #self.scl_t.i
self.scl_o = Signal(name="scl_o",reset=1)
self.scl_oe = Signal(name="scl_oe") #self.scl_t.oe
self.sda_i0 = Signal(name="sda_i") #self.sda_t.i
self.sda_o = Signal(name="sda_o",reset=1)
self.sda_oe = Signal(name="sda_oe") #self.sda_t.oe
self.scl_i = Signal()
self.sda_i = Signal()
self.sample = Signal(name="bus_sample")
self.setup = Signal(name="bus_setup")
self.bus_start = Signal(name="bus_start")
self.bus_stop = Signal(name="bus_stop")
self.address = Signal(7)
self.busy = Signal() # clock stretching request (experimental, undocumented)
self.start = Signal()
self.stop = Signal()
self.restart = Signal()
self.write = Signal()
self.data_i = Signal(8)
self.ack_o = Signal()
self.read = Signal()
self.data_o = Signal(8)
self.ack_i = Signal()
def elaborate(self, platform):
m = Module()
bitno = Signal(3)
shreg_i = Signal(8)
shreg_o = Signal(8)
scl_r = Signal(reset=1,name="scl_r")
sda_r = Signal(reset=1,name="sda_r")
m.d.comb += [
self.scl_t.i.eq(self.scl_i0),
self.scl_t.o.eq(self.scl_o),
self.scl_t.oe.eq(self.scl_oe),
#self.scl_o.eq(0),
self.scl_oe.eq(~self.scl_o),
self.sda_t.i.eq(self.sda_i0),
self.sda_t.o.eq(self.sda_o),
self.sda_t.oe.eq(self.sda_oe),
#self.sda_o.eq(0),
self.sda_oe.eq(~self.sda_o),
self.sample.eq(~scl_r & self.scl_i),
self.setup.eq(scl_r & ~self.scl_i),
self.bus_start.eq(self.scl_i & sda_r & ~self.sda_i),
self.bus_stop.eq(self.scl_i & ~sda_r & self.sda_i),
]
m.d.sync += [
scl_r.eq(self.scl_i),
sda_r.eq(self.sda_i),
]
m.submodules += [
FFSynchronizer(self.scl_t.i, self.scl_i, reset=1),
FFSynchronizer(self.sda_t.i, self.sda_i, reset=1)
]
with m.FSM(name="i2c_fsm") as fsm:
with m.State('IDLE'):
with m.If(self.bus_start):
m.next = 'START'
with m.State('START'):
with m.If(self.bus_stop):
m.next = 'IDLE'
with m.Elif(self.setup):
m.next = 'ADDR-SHIFT'
m.d.sync += bitno.eq(0)
with m.State('ADDR-SHIFT'):
with m.If(self.bus_stop):
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.next = 'START'
with m.Elif(self.sample):
m.d.sync += shreg_i.eq((shreg_i << 1) | self.sda_i)
with m.Elif(self.setup):
m.d.sync += bitno.eq(bitno + 1)
with m.If(bitno == 7):
with m.If(shreg_i[1:] == self.address):
m.d.comb += self.start.eq(1)
m.d.sync += [
self.sda_o.eq(0),
bitno.eq(0)
]
m.next = 'ADDR-ACK'
with m.Else():
m.next = 'IDLE'
with m.State('ADDR-ACK'):
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.d.comb += self.restart.eq(1)
m.next = 'START'
with m.Elif(self.setup):
with m.If(~shreg_i[0]):
m.d.sync += self.sda_o.eq(1)
m.next = 'WRITE-SHIFT'
with m.Elif(self.sample):
with m.If(shreg_i[0]):
m.d.sync += shreg_o.eq(self.data_o)
m.d.comb += self.read.eq(1) #Befre Enter READ-STRETCH
m.next = 'READ-STRETCH'
with m.State('WRITE-SHIFT'):
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.d.comb += self.restart.eq(1)
m.next = 'START'
with m.Elif(self.sample):
m.d.sync += shreg_i.eq((shreg_i << 1) | self.sda_i) ###SDA
with m.If(self.setup):
m.d.sync += bitno.eq(bitno + 1)
with m.If(bitno == 7):
m.d.sync += self.data_i.eq(shreg_i)
m.next = 'WRITE-ACK'
with m.State('WRITE-ACK'):
m.d.comb += self.write.eq(1) #After Enter Write Ack
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.d.comb += self.restart.eq(1)
m.next = 'START'
with m.Elif(self.setup):
m.d.sync += self.sda_o.eq(1)
m.next = 'WRITE-SHIFT'
with m.Elif(~self.scl_i):
m.d.sync += self.scl_o.eq(~self.busy)
with m.If(self.ack_o):
m.d.sync += self.sda_o.eq(0)
with m.State('READ-STRETCH'):
with m.If(self.busy):
m.d.sync += shreg_o.eq(self.data_o)
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.next = 'START'
with m.Elif(self.busy):
with m.If(~self.scl_i):
m.d.sync += self.scl_o.eq(0)
with m.Else():
with m.If(~self.scl_i):
m.d.sync += self.sda_o.eq(shreg_o[7])
m.d.sync += self.scl_o.eq(1)
m.next = "READ-SHIFT"
with m.State('READ-SHIFT'):
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.d.comb += self.restart.eq(1)
m.next = 'START'
with m.Elif(self.setup):
m.d.sync += [
self.sda_o.eq(shreg_o[7]),
#shreg_o.eq(shreg_o << 1),
]
with m.Elif(self.sample):
m.d.sync += [
shreg_o.eq(shreg_o << 1),
bitno.eq(bitno + 1)
]
with m.If(bitno == 7):
m.next = 'READ-ACK'
with m.State('READ-ACK'):
with m.If(self.bus_stop):
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
with m.Elif(self.bus_start):
m.d.comb += self.restart.eq(1)
m.next = 'START'
with m.Elif(self.setup):
m.d.sync += self.sda_o.eq(1)
with m.Elif(self.sample):
with m.If(~self.sda_i):
m.d.sync += shreg_o.eq(self.data_o)
m.d.comb += self.read.eq(1)
m.next = 'READ-STRETCH'
with m.Else():
m.d.comb += self.stop.eq(1)
m.next = 'IDLE'
return m
if __name__ == "__main__":
from amaranth.sim import Simulator
top = I2CTarget()
sim = Simulator(top)
period = 8
wait_cycle = period // 4
def half_period():
for _ in range(period):
yield
def start(bus):
yield bus.sda_i0.eq(0)
yield from half_period()
def wait():
for _ in range(wait_cycle):
yield
def rep_start(bus):
yield bus.scl_i0.eq(0)
yield # tHD;DAT
yield bus.sda_i0.eq(1)
yield from half_period()
yield bus.scl_i0.eq(1)
yield from half_period()
yield from start()
def stop(bus):
yield bus.scl_i0.eq(0)
yield # tHD;DAT
yield bus.sda_i0.eq(0)
yield from half_period()
yield bus.scl_i0.eq(1)
yield from half_period()
yield bus.sda_i0.eq(1)
yield from half_period()
def write_bit(bus, bit):
yield bus.scl_i0.eq(0)
yield # tHD;DAT
yield bus.sda_i0.eq(bit)
yield from half_period()
yield bus.scl_i0.eq(1)
yield from half_period()
yield bus.sda_i0.eq(1)
def write_octet(bus, octet):
for bit in range(8)[::-1]:
yield from write_bit(bus, (octet >> bit) & 1)
def read_bit(bus):
yield bus.scl_i0.eq(0)
yield from half_period()
yield bus.scl_i0.eq(1)
bit = (yield bus.sda_o)
yield from half_period()
return bit
def read_octet(bus):
octet = 0
for bit in range(8):
octet = (octet << 1) | (yield from read_bit(bus))
return octet
def start_addr(bus, read):
yield from start(bus)
yield from write_octet(bus, 0b01010000 | read)
yield from read_bit(top)
def startup():
yield top.scl_i0.eq(1)
yield from half_period()
yield top.sda_i0.eq(1)
yield from half_period()
yield from half_period()
yield from half_period()
yield top.address.eq(0b0101000)
#Read
def test_bench():
yield from startup()
yield top.data_o.eq(0b10100101)
yield from start_addr(top, read=True)
yield from read_octet(top)
yield top.data_o.eq(0b00110011)
yield from write_bit(top, 0)
yield from read_octet(top)
yield from write_bit(top, 0)
#yield from self.assertState(tb, "READ-SHIFT")
#Read
def test_bench1():
yield from startup()
yield top.data_o.eq(0b10100101)
yield from start_addr(top, read=True)
yield from read_octet(top)
yield from write_bit(top, 0)
#yield from self.assertState(tb, "READ-SHIFT")
#Write
def test_bench2():
yield from startup()
yield from start_addr(top, read=False)
yield from write_octet(top,0b10100101)
yield from read_bit(top)
yield from write_octet(top,0b00100100)
yield from read_bit(top)
yield top.scl_i0.eq(0)
yield top.sda_i.eq(0)
yield from half_period()
yield top.scl_i0.eq(1)
yield from half_period()
yield top.sda_i0.eq(1)
yield from wait()
#yield from self.assertCondition(tb, lambda: (yield tb.dut.stop))
yield
#yield from self.assertState(tb, "IDLE")
#Read
def test_bench3():
yield from startup()
yield top.data_o.eq(0b10100101)
yield from start_addr(top, read=True)
yield from read_octet(top)
yield top.data_o.eq(0b00110011)
yield from write_bit(top,0)
yield from read_octet(top)
yield from write_bit(top,0)
#yield from self.assertState(tb, "READ-SHIFT")
yield from wait()
yield from wait()
sim.add_clock(10e-6) # 10 MHz
sim.add_sync_process(test_bench3)
with sim.write_vcd("i2cslave.vcd"):
sim.run()
from amaranth.back import verilog
with open("i2cslave.v", "w") as f:
f.write(verilog.convert(top, ports=[
top.start,
top.scl_i,
top.scl_o,
top.scl_oe,
top.sda_i,
top.sda_o,
top.sda_oe,
]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment