Skip to content

Instantly share code, notes, and snippets.

@enjoy-digital
Last active January 27, 2022 22:37
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enjoy-digital/dc20a680035b7207a35ace88e526b5ef to your computer and use it in GitHub Desktop.
Save enjoy-digital/dc20a680035b7207a35ace88e526b5ef to your computer and use it in GitHub Desktop.
LiteEth Interboard demo between Arty and Butterstick
#!/usr/bin/env python3
# LiteEth UDP Inter-board stream demo.
#
# Copyright (c) 2022 Florent Kermarrec <florent@enjoy-digital.fr>
# SPDX-License-Identifier: BSD-2-Clause
# ./arty.py --build --load
import os
import argparse
from migen import *
from migen.genlib.cdc import MultiReg
from litex_boards.platforms import digilent_arty
from litex.soc.cores.clock import *
from litex.soc.integration.soc_core import *
from litex.soc.integration.builder import *
from liteeth.phy.mii import LiteEthPHYMII
from liteeth.core import LiteEthUDPIPCore
from liteeth.frontend.stream import LiteEthUDPStreamer
# CRG ----------------------------------------------------------------------------------------------
class _CRG(Module):
def __init__(self, platform, sys_clk_freq, with_rst=True):
self.rst = Signal()
self.clock_domains.cd_sys = ClockDomain()
self.clock_domains.cd_eth = ClockDomain()
# # #
# Clk/Rst.
clk100 = platform.request("clk100")
rst = ~platform.request("cpu_reset") if with_rst else 0
# PLL.
self.submodules.pll = pll = S7PLL(speedgrade=-1)
self.comb += pll.reset.eq(rst | self.rst)
pll.register_clkin(clk100, 100e6)
pll.create_clkout(self.cd_sys, sys_clk_freq)
pll.create_clkout(self.cd_eth, 25e6)
self.comb += platform.request("eth_ref_clk").eq(self.cd_eth.clk)
# Ignore sys_clk to pll.clkin path created by SoC's rst.
platform.add_false_path_constraints(self.cd_sys.clk, pll.clkin)
# BaseSoC ------------------------------------------------------------------------------------------
class BaseSoC(SoCMini):
def __init__(self, sys_clk_freq=int(100e6)):
platform = digilent_arty.Platform()
# CRG --------------------------------------------------------------------------------------
self.submodules.crg = _CRG(platform, sys_clk_freq)
# SoCMini ----------------------------------------------------------------------------------
SoCMini.__init__(self, platform, sys_clk_freq)
# LiteEth UDP/IP ---------------------------------------------------------------------------
self.submodules.ethphy = LiteEthPHYMII(
clock_pads = self.platform.request("eth_clocks"),
pads = self.platform.request("eth")
)
self.submodules.ethcore = LiteEthUDPIPCore(
phy = self.ethphy,
mac_address = 0x10e2d5000001,
ip_address = "192.168.1.50",
clk_freq = sys_clk_freq
)
# UDP Streamer -----------------------------------------------------------------------------
self.submodules.udp_streamer = udp_streamer = LiteEthUDPStreamer(self.ethcore.udp,
ip_address = "192.168.1.51",
udp_port = 2000,
data_width = 8
)
# UDP TX -----------------------------------------------------------------------------------
# Resynchronize Switches/Buttons.
switches = Signal(4)
buttons = Signal(4)
self.specials += MultiReg(platform.request_all("user_sw"), switches)
self.specials += MultiReg(platform.request_all("user_btn"), buttons)
# Regroup Switches/Buttons in tx_data.
tx_data = Signal(8)
tx_data_d = Signal(8)
self.comb += [
tx_data[0:4].eq(buttons),
tx_data[4:8].eq(switches),
]
self.sync += tx_data_d.eq(tx_data)
# Send tx_data over UDP on change.
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
fsm.act("IDLE",
If(tx_data != tx_data_d,
NextState("SEND")
)
)
fsm.act("SEND",
udp_streamer.sink.valid.eq(1),
udp_streamer.sink.data.eq(tx_data),
If(udp_streamer.sink.ready,
NextState("IDLE")
)
)
# UDP RX -----------------------------------------------------------------------------------
# Redirect rx_data from UDP to Leds.
leds = platform.request_all("user_led")
self.comb += udp_streamer.source.ready.eq(1)
self.sync += If(udp_streamer.source.valid,
leds.eq(~udp_streamer.source.data)
)
# Build --------------------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="iteEth UDP Inter-board stream demo on Arty")
parser.add_argument("--build", action="store_true", help="Build bitstream")
parser.add_argument("--load", action="store_true", help="Load bitstream")
args = parser.parse_args()
soc = BaseSoC()
builder = Builder(soc, csr_csv="csr.csv")
builder.build(run=args.build)
if args.load:
prog = soc.platform.create_programmer()
prog.load_bitstream(os.path.join(builder.gateware_dir, soc.build_name + ".bit"))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# LiteEth UDP Inter-board stream demo.
#
# Copyright (c) 2022 Florent Kermarrec <florent@enjoy-digital.fr>
# SPDX-License-Identifier: BSD-2-Clause
# ./butterstick.py --build --load
import os
import argparse
from migen import *
from migen.genlib.cdc import MultiReg
from litex_boards.platforms import gsd_butterstick
from litex.soc.cores.clock import *
from litex.soc.integration.soc_core import *
from litex.soc.integration.builder import *
from liteeth.common import convert_ip
from liteeth.phy.ecp5rgmii import LiteEthPHYRGMII
from liteeth.core import LiteEthUDPIPCore
from liteeth.frontend.stream import LiteEthUDPStreamer
# CRG ----------------------------------------------------------------------------------------------
class _CRG(Module):
def __init__(self, platform, sys_clk_freq):
self.clock_domains.cd_por = ClockDomain(reset_less=True)
self.clock_domains.cd_sys = ClockDomain()
# # #
# Clk / Rst
clk30 = platform.request("clk30")
rst_n = 1
# Power on reset
por_count = Signal(16, reset=2**16-1)
por_done = Signal()
self.comb += self.cd_por.clk.eq(clk30)
self.comb += por_done.eq(por_count == 0)
self.sync.por += If(~por_done, por_count.eq(por_count - 1))
# PLL
self.submodules.pll = pll = ECP5PLL()
self.comb += pll.reset.eq(~por_done | ~rst_n)
pll.register_clkin(clk30, 30e6)
pll.create_clkout(self.cd_sys, sys_clk_freq)
# BaseSoC ------------------------------------------------------------------------------------------
class BaseSoC(SoCMini):
def __init__(self, sys_clk_freq=int(125e6)):
platform = gsd_butterstick.Platform()
# CRG --------------------------------------------------------------------------------------
self.submodules.crg = _CRG(platform, sys_clk_freq)
# SoCMini ----------------------------------------------------------------------------------
SoCMini.__init__(self, platform, sys_clk_freq)
# LiteEth UDP/IP ---------------------------------------------------------------------------
self.submodules.ethphy = LiteEthPHYRGMII(
clock_pads = self.platform.request("eth_clocks"),
pads = self.platform.request("eth")
)
self.submodules.ethcore = LiteEthUDPIPCore(
phy = self.ethphy,
mac_address = 0x10e2d5000002,
ip_address = "192.168.1.51",
clk_freq = sys_clk_freq
)
# UDP Streamer -----------------------------------------------------------------------------
self.submodules.udp_streamer = udp_streamer = LiteEthUDPStreamer(self.ethcore.udp,
ip_address = "192.168.1.50",
udp_port = 2000,
data_width = 8
)
# UDP TX -----------------------------------------------------------------------------------
# Resynchronize Buttons.
buttons = Signal(2)
self.specials += MultiReg(platform.request_all("user_btn"), buttons)
# Regroup Switches/Buttons in tx_data.
tx_data = Signal(8)
tx_data_d = Signal(8)
self.comb += [
tx_data[0:2].eq(buttons),
]
self.sync += tx_data_d.eq(tx_data)
# Send tx_data over UDP on change.
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
fsm.act("IDLE",
If(tx_data != tx_data_d,
NextState("SEND")
)
)
fsm.act("SEND",
udp_streamer.sink.valid.eq(1),
udp_streamer.sink.data.eq(tx_data),
If(udp_streamer.sink.ready,
NextState("IDLE")
)
)
# UDP RX -----------------------------------------------------------------------------------
# Redirect rx_data from UDP to Leds.
leds = platform.request_all("user_led")
color = platform.request("user_led_color")
self.comb += udp_streamer.source.ready.eq(1)
self.sync += If(udp_streamer.source.valid,
leds.eq(udp_streamer.source.data[0:4]),
color.eq(udp_streamer.source.data[4:8]),
)
# Build --------------------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="VeriQloud Test SoC on ButterStick")
parser.add_argument("--build", action="store_true", help="Build bitstream")
parser.add_argument("--load", action="store_true", help="Load bitstream")
args = parser.parse_args()
soc = BaseSoC()
builder = Builder(soc, csr_csv="csr.csv")
builder.build(run=args.build)
if args.load:
prog = soc.platform.create_programmer()
prog.load_bitstream(os.path.join(builder.gateware_dir, soc.build_name + ".svf"))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
#
# This file is part of 360nosc0pe/scope project.
#
# Copyright (c) 2021 Felix Domke <tmbinc@elitedvb.net>
# Copyright (c) 2021 Florent Kermarrec <florent@enjoy-digital.fr>
# SPDX-License-Identifier: BSD-2-Clause
# ./sds1104xe.py --build --load
import os
import math
import argparse
from migen import *
from litex.build.generic_platform import *
from litex_boards.platforms import sds1104xe
from litex.soc.cores.clock import *
from litex.soc.integration.soc_core import *
from litex.soc.integration.builder import *
from litex.soc.cores.video import VideoVGAPHY
from litex.soc.interconnect import stream
from litedram.modules import MT41K64M16
from litedram.phy import s7ddrphy
from liteeth.phy.mii import LiteEthPHYMII
from liteeth.core import LiteEthUDPIPCore
from liteeth.frontend.stream import LiteEthUDPStreamer
from litescope import LiteScopeAnalyzer
from peripherals.frontpanel import FrontpanelLeds, FrontpanelButtons, FP_BTNS, FP_LEDS
# CRG ----------------------------------------------------------------------------------------------
class _CRG(Module):
def __init__(self, platform, sys_clk_freq, with_ethernet=False):
self.rst = Signal()
self.clock_domains.cd_sys = ClockDomain()
self.clock_domains.cd_sys4x = ClockDomain(reset_less=True)
self.clock_domains.cd_sys4x_dqs = ClockDomain(reset_less=True)
self.clock_domains.cd_idelay = ClockDomain()
self.clock_domains.cd_lcd = ClockDomain()
# # #
# Clk / Rst
clk25 = ClockSignal("eth_tx") if with_ethernet else platform.request("eth_clocks").rx
# PLL
self.submodules.pll = pll = S7PLL(speedgrade=-1)
pll.register_clkin(clk25, 25e6)
pll.create_clkout(self.cd_sys, sys_clk_freq)
pll.create_clkout(self.cd_sys4x, 4*sys_clk_freq)
pll.create_clkout(self.cd_sys4x_dqs, 4*sys_clk_freq, phase=90)
pll.create_clkout(self.cd_idelay, 200e6)
pll.create_clkout(self.cd_lcd, 33.3e6)
platform.add_false_path_constraints(self.cd_sys.clk, pll.clkin) # Ignore sys_clk to pll.clkin path created by SoC's rst.
self.submodules.idelayctrl = S7IDELAYCTRL(self.cd_idelay)
# ScopeSoC -----------------------------------------------------------------------------------------
class ScopeSoC(SoCCore):
def __init__(self, sys_clk_freq=int(100e6)):
# Platform ---------------------------------------------------------------------------------
platform = sds1104xe.Platform()
# SoCCore ----------------------------------------------------------------------------------
SoCCore.__init__(self, platform, sys_clk_freq,
ident = "ScopeSoC on Siglent SDS1104X-E",
ident_version = True,
uart_name = "crossover",
cpu_type = "vexriscv",
cpu_variant = "lite", # CPU only used to initialize DDR3 for now, Lite is enough.
integrated_rom_size = 0x10000,
)
self.uart.add_auto_tx_flush(sys_clk_freq=sys_clk_freq, timeout=1, interval=128)
# DDR3 SDRAM -------------------------------------------------------------------------------
self.submodules.ddrphy = s7ddrphy.A7DDRPHY(platform.request("ddram"),
memtype = "DDR3",
nphases = 4,
sys_clk_freq = sys_clk_freq)
self.add_sdram("sdram",
phy = self.ddrphy,
module = MT41K64M16(sys_clk_freq, "1:4"),
l2_cache_size = 1024,
l2_cache_reverse = False,
)
# CRG --------------------------------------------------------------------------------------
self.submodules.crg = _CRG(platform, sys_clk_freq, with_ethernet=True)
# Etherbone --------------------------------------------------------------------------------
self.submodules.ethphy = LiteEthPHYMII(
clock_pads = self.platform.request("eth_clocks"),
pads = self.platform.request("eth")
)
self.submodules.ethcore = LiteEthUDPIPCore(
phy = self.ethphy,
mac_address = 0x10e2d5000001,
ip_address = "192.168.1.50",
clk_freq = sys_clk_freq
)
# Frontpanel Leds --------------------------------------------------------------------------
self.submodules.fpleds = FrontpanelLeds(platform.request("led_frontpanel"), sys_clk_freq, with_csr=False)
# Frontpanel Buttons -----------------------------------------------------------------------
self.submodules.fpbtns = FrontpanelButtons(platform.request("btn_frontpanel"), sys_clk_freq, with_csr=False)
# UDP Streamer -----------------------------------------------------------------------------
self.submodules.udp_streamer = udp_streamer = LiteEthUDPStreamer(self.ethcore.udp,
ip_address = "192.168.1.51",
udp_port = 2000,
data_width = 8
)
# UDP TX -----------------------------------------------------------------------------------
# Send Channels buttons as tx_data (and also redirect to Scope leds).
tx_data = Signal(8)
tx_data_d = Signal(8)
self.comb += [
tx_data[0].eq(~((self.fpbtns.value & FP_BTNS.CHANNEL_1) != 0)),
tx_data[1].eq(~((self.fpbtns.value & FP_BTNS.CHANNEL_2) != 0)),
tx_data[2].eq(~((self.fpbtns.value & FP_BTNS.CHANNEL_3) != 0)),
tx_data[3].eq(~((self.fpbtns.value & FP_BTNS.CHANNEL_4) != 0)),
tx_data[4:8].eq(0b010),
]
self.sync += tx_data_d.eq(tx_data)
# Send tx_data over UDP on change.
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
fsm.act("IDLE",
If(tx_data != tx_data_d,
NextState("SEND")
)
)
fsm.act("SEND",
udp_streamer.sink.valid.eq(1),
udp_streamer.sink.data.eq(tx_data),
If(udp_streamer.sink.ready,
NextState("IDLE")
)
)
# UDP RX -----------------------------------------------------------------------------------
# Redirect rx_data from UDP to Leds.
leds = Signal(8)
self.comb += udp_streamer.source.ready.eq(1)
self.sync += If(udp_streamer.source.valid,
leds.eq(udp_streamer.source.data)
)
# Leds -------------------------------------------------------------------------------------
self.comb += [
self.fpleds.value[int(math.log2(FP_LEDS.CHANNEL_1))].eq(tx_data[0]),
self.fpleds.value[int(math.log2(FP_LEDS.CHANNEL_2))].eq(tx_data[1]),
self.fpleds.value[int(math.log2(FP_LEDS.CHANNEL_3))].eq(tx_data[2]),
self.fpleds.value[int(math.log2(FP_LEDS.CHANNEL_4))].eq(tx_data[3]),
self.fpleds.value[int(math.log2(FP_LEDS.RUN_STOP_GREEN))].eq(~leds[0]),
self.fpleds.value[int(math.log2(FP_LEDS.RUN_STOP_RED))].eq( ~leds[1]),
]
# LCD --------------------------------------------------------------------------------------
video_timings = ("800x480@60Hz", {
"pix_clk" : 33.3e6,
"h_active" : 800,
"h_blanking" : 256,
"h_sync_offset" : 210,
"h_sync_width" : 1,
"v_active" : 480,
"v_blanking" : 45,
"v_sync_offset" : 22,
"v_sync_width" : 1,
})
self.submodules.lcdphy = VideoVGAPHY(platform.request("lcd"), clock_domain="lcd")
self.submodules.lcdphy_mux = stream.Multiplexer(self.lcdphy.sink.description, n=2)
self.add_video_framebuffer(phy=self.lcdphy_mux.sink0, timings=video_timings, clock_domain="lcd")
self.add_video_terminal( phy=self.lcdphy_mux.sink1, timings=video_timings, clock_domain="lcd")
self.comb += self.lcdphy_mux.source.connect(self.lcdphy.sink)
menu_on_off = Signal()
menu_on_off_d = Signal()
self.sync += menu_on_off.eq((self.fpbtns.value & FP_BTNS.MENU_ON_OFF.value) != 0)
self.sync += menu_on_off_d.eq(menu_on_off)
self.sync += If(menu_on_off & ~menu_on_off_d, self.lcdphy_mux.sel.eq(~self.lcdphy_mux.sel))
# Build --------------------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Experiments with a SDS1104X-E Scope and LiteX.")
parser.add_argument("--build", action="store_true", help="Build bitstream.")
parser.add_argument("--load", action="store_true", help="Load bitstream.")
args = parser.parse_args()
soc = ScopeSoC()
builder = Builder(soc, csr_csv="test/csr.csv")
builder.build(run=args.build)
if args.load:
prog = soc.platform.create_programmer()
prog.load_bitstream(os.path.join(builder.gateware_dir, soc.build_name + ".bit"), device=1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment