Skip to content

Instantly share code, notes, and snippets.

@attie
Last active January 15, 2023 05:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save attie/be46b006bd35818327458e0bdddaa13d to your computer and use it in GitHub Desktop.
Save attie/be46b006bd35818327458e0bdddaa13d to your computer and use it in GitHub Desktop.
import logging
import asyncio
from aramanth import *
from ....support.endpoint import *
from ....gateware.pads import *
from ....gateware.pll import *
from ... import *
class VideoIkea2WireSubtarget(Elaboratable):
def __init__(self, pads, out_fifo):
self.d_pad = pads.d_t
self.out_fifo = out_fifo
def elaborate(self, platform):
sys_clk_freq = platform.default_clk_frequency
t_period = int(1 + sys_clk_freq * 22.6e-6)
t_one = int(1 + sys_clk_freq * 12.2e-6)
t_zero = int(1 + sys_clk_freq * 5.4e-6)
t_reset_period = int(1 + sys_clk_freq * 86.6e-6)
t_reset_lo = int(1 + sys_clk_freq * 26.6e-6)
m = Module()
d_out = Signal()
m.d.comb += [
self.d_pad.oe.eq(d_out),
self.d_pad.o.eq(1),
d_out.eq(1),
]
buf_in = Signal(32)
buf_out = Signal(27)
cyc_ctr = Signal(range(t_reset_period+1))
bit_ctr = Signal(range(buf_in.width+1))
with m.FSM():
with m.State('INIT'):
m.d.sync += bit_ctr.eq(0)
m.next = 'LOAD'
with m.State('LOAD'):
m.d.comb += self.out_fifo.r_en.eq(1)
with m.If(self.out_fifo.r_rdy):
m.d.sync += buf_in.eq(Cat(self.out_fifo.r_data, buf_in[:-8])),
with m.If(bit_ctr < buf_in.width - 8):
m.d.sync += bit_ctr.eq(bit_ctr + 8)
with m.Else():
m.next = 'LATCH'
with m.State('LATCH'):
m.d.sync += [
bit_ctr.eq(0),
cyc_ctr.eq(0),
buf_out.eq(Cat(buf_in[:3], buf_in[8:])),
]
m.next = 'RESET'
with m.State('RESET'):
with m.If(cyc_ctr < t_reset_lo):
m.d.comb += d_out.eq(0)
m.d.sync += cyc_ctr.eq(cyc_ctr + 1)
with m.Elif(cyc_ctr < t_reset_period):
m.d.sync += cyc_ctr.eq(cyc_ctr + 1)
with m.Else():
m.d.sync += [
bit_ctr.eq(0),
cyc_ctr.eq(0),
]
m.next = 'SEND'
with m.State('SEND'):
with m.If(cyc_ctr < t_zero):
m.d.comb += d_out.eq(0)
m.d.sync += cyc_ctr.eq(cyc_ctr + 1)
with m.Elif(cyc_ctr < t_one):
m.d.comb += d_out.eq(~buf_out[-1])
m.d.sync += cyc_ctr.eq(cyc_ctr + 1)
with m.Elif(cyc_ctr < t_period):
m.d.sync += cyc_ctr.eq(cyc_ctr + 1)
with m.Elif(bit_ctr < buf_out.width-1):
m.d.sync += [
bit_ctr.eq(bit_ctr + 1),
cyc_ctr.eq(0),
buf_out.eq(Cat(0, buf_out[:-1])),
]
with m.Else():
m.next = 'INIT'
return m
class VideoIkea2WireApplet(GlasgowApplet, name='ikea-2wire'):
logger = logging.getLogger(__name__)
help = 'drive IKEA 2-wire LED chain, 205.325.78'
description = """
Output RGB values to the whole chain of LEDs.
Connect the LED chain between the selected pin and 0v.
Add a 330 Ohm pull resistor (in parallel to the built-in 10 kOhm).
"""
@classmethod
def add_build_arguments(cls, parser, access):
super().add_build_arguments(parser, access)
access.add_pin_argument(parser, 'd', default=True)
@classmethod
def build(self, target, args):
self.mux_interface = iface = target.multiplexer.claim_interface(self, args)
subtarget = iface.add_subtarget(VideoIkea2WireSubtarget(
pads=iface.get_pads(args, pins=('d',)),
out_fifo=iface.get_out_fifo(),
))
return subtarget
async def run(self, device, args):
return await device.demultiplexer.claim_interface(self, self.mux_interface, args, write_buffer_size=8, pull_high={args.pin_d})
async def interact(self, device, args, chain):
while True:
# the format is:
# 8-bit : green
# 8-bit : red
# 8-bit : blue
# 3-bit : trailer (not required)
print('Green')
await chain.write([ 0x40, 0x00, 0x00, 0x00 ])
await chain.flush()
await asyncio.sleep(0.25)
print('Red')
await chain.write([ 0x00, 0x40, 0x00, 0x00 ])
await chain.flush()
await asyncio.sleep(0.25)
print('Blue')
await chain.write([ 0x00, 0x00, 0x40, 0x00 ])
await chain.flush()
await asyncio.sleep(0.25)
print('Off')
await chain.write([ 0x00, 0x00, 0x00, 0x00 ])
await chain.flush()
await asyncio.sleep(0.25)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment