Skip to content

Instantly share code, notes, and snippets.

@telegraphic
Created February 19, 2015 17:20
Show Gist options
  • Save telegraphic/37de73d5227017582c17 to your computer and use it in GitHub Desktop.
Save telegraphic/37de73d5227017582c17 to your computer and use it in GitHub Desktop.
ROACH2-MMAP: Access ROACH2 FPGA data from the PowerPC
"""
fpga_mmap.py
------------
Utilities for reading FPGA data via the memory-mapped /dev/roach/mem file
on the ROACH2 filesystem. This file needs to be run from the ROACH2, and
is an alternative to reading data via KATCP. This provides access only;
you'll need to write your own functions for transferring / storing data.
"""
import os
import mmap
import struct
import time
class FpgaCore(object):
""" ROACH-based reading of FPGA register data via memory-mapping.
Parameters
----------
name (str): Name of register
offset (int): Offset from start of file in bytes (base 10)
n_bytes (int): Number of bytes of data, e.g. a 32-bit register is 4 bytes
"""
def __init__(self, name, offset, n_bytes):
super(FpgaCore, self).__init__()
self.name = name
self.offset = offset
self.n_bytes = n_bytes
self.mm_offset = offset - offset % mmap.ALLOCATIONGRANULARITY
self.mm_bytes = n_bytes + offset % mmap.ALLOCATIONGRANULARITY
def __repr__(self):
return "Reg: %s Offset: %s N_bytes: %s" % (self.name, self.offset, self.n_bytes)
def read(self):
""" Read register / BRAM data """
self.mem = open("/dev/roach/mem", "rb")
#self.mem.seek(self.offset)
#data = self.mem.read(self.n_bytes)
#self.mem.close()
self.mm = mmap.mmap(self.mem.fileno(), self.mm_bytes, access=mmap.ACCESS_READ, offset=self.mm_offset)
start_offset = self.offset - self.mm_offset
data = self.mm[start_offset:start_offset+self.n_bytes]
return data
def info(self):
""" Print some information to screen. For debugging """
print "MMAP offset: %s" % self.mm_offset
print "File offset: %s" % self.offset
print "FOFF - MOFF: %s" % (self.offset - self.mm_offset)
print "Bytes per MMAP read: %s" % self.mm_bytes
print "Bytes data payload: %s" % self.n_bytes
class FpgaMmap(object):
""" Read contents of FPGA memory (registers & BRAM) via mmap
Parameters
----------
core_info_file (str): Path to the core_info.tab file that describes the
layout of data within /dev/roach/mem. This file is
generated by the CASPER toolflow
"""
def __init__(self, core_info_file):
super(FpgaMmap, self).__init__()
self.cores = self.parse_core_info(core_info_file)
self.mem = open("/dev/roach/mem", "rb")
def read(self, core_id):
""" Read the contents of a core """
core = self.cores[core_id]
return core.read()
def read_int(self, regname):
""" Read a register and return value as integer"""
return struct.unpack('>L', self.read(regname))[0]
def parse_core_info(self, filepath):
""" Parse core_info.tab file into python dictionary of FpgaCores """
core_info = {}
f = open(filepath)
for line in f.readlines():
try:
ld = line.split()
r_name = ld[0]
r_type = int(ld[1], 16)
r_offset = int(ld[2], 16) # Offset is in hex
r_size = int(ld[3], 16)
except:
print ld
core_info[r_name] = FpgaCore(r_name, r_offset, r_size)
self.cores = core_info
return core_info
if __name__ == "__main__":
#############
# Example usage
#############
# Read a core_info.tab to generate FPGA core objects
# This creates a "FpgaMmap" object that holds the cores in a dict
fpga = FpgaMmap("core_info.tab")
# Read a register called acc_cnt
print struct.unpack('>L', fpga.read("acc_cnt"))
# There's a helper function to unpack this automatically:
print fpga.read_int("acc_cnt")
# Call the core directly and use its read function instead
acc_cnt = fpga.cores["acc_cnt"]
print struct.unpack('>L', acc_cnt.read())
# print MMAP information about acc_cnt
# Only really useful for debugging, but you might be interested
fpga.cores["acc_cnt"].info()
"""
ledaspec_mmap.py
----------------
Read LEDA spectrometer data via the MMAP /dev/roach/mem file
"""
import time
import struct
import spectrometer_config as config
import fpga_mmap
class LedaspecMmap(fpga_mmap.FpgaMmap):
""" Control class for the LEDA spectrometer
This adds a few helpful functions for reading data
"""
def snap_outriggers(self):
""" Snap all outriggers antennas """
outrigs = {}
mapping = config.antenna_fft_mapping
for ant_id, idxs in mapping.items():
ii, jj = idxs[0], idxs[1],
d = self.read('fft_f%i_spec%i_snap_bram' % (ii, jj))
outrigs[ant_id] = d
return outrigs
def read_acc_cnt(self):
""" Return the current accumulation ID """
return self.read_int(config.acc_cnt_reg)
def wait_for_acc(self, acc_cnt=None, timeout=10):
""" Wait until the next accumulation occurs """
if acc_cnt is None:
acc_init = self.read_acc_cnt()
else:
acc_init = acc_cnt
acc_new = acc_init
t1 = time.time()
while acc_new <= acc_init:
time.sleep(0.01)
acc_new = self.read_acc_cnt()
t2 = time.time()
if t2 - t1 > timeout:
raise RuntimeError("Acc timeout: acc_cnt %i" % acc_init)
return acc_new
"""
run_spec.py
-----------
Script to send spectrometer data via UDP packets to a storage server.
This uses memory-mapped data access from the FPGA via the FpgaMmap class.
Packet format: [antenna ID][accumulation ID][segment ID][data payload]
[ 4B ][ 8B ][ 4B ][ 4096B ]
The LEDA spectra are 4k channels, 32-bit per channel, so they don't fit in a
single UDP packet: the segment ID (0 to 3) is used to say what part of the band.
There's a lot of blank space in the packet header but it really doesn't matter.
"""
from ledaspec_mmap import *
import socket
import math
def packetize(ant_id, acc_id, data):
""" Packetize spectrum into UDP packets
ant_id: 4B string of antenna number (252A)
acc_id: accumulation counter integer
data: packed struct of spectral data
"""
msglist = []
#print len(data)
n_packets = max(len(data) / 4096, 1)
for ii in range(n_packets):
msg = "%s%8d%4d%s" % (ant_id, acc_id, ii, data[ii*4096:(ii+1)*4096])
#print ii,
msglist.append(msg)
return msglist
if __name__ == '__main__':
# Set up UDP socket
UDP_IP = "169.254.128.1"
UDP_PORT = 12345
print "UDP target IP:", UDP_IP
print "UDP target port:", UDP_PORT
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
# Connect to FPGA via mmap
print "Connecting to FPGA..."
fpga = LedaspecMmap('core_info.tab')
time.sleep(1)
# start data capture
print "Snapping data.."
acc_cnt = 0
while True:
t1 = time.time()
acc_cnt = fpga.wait_for_acc(acc_cnt)
t2 = time.time()
outrigs = fpga.snap_outriggers()
t3 = time.time()
#print outrigs.keys()
for key, data in outrigs.items():
for msg in packetize(key, acc_cnt, data):
sock.sendto(msg, (UDP_IP, UDP_PORT))
time.sleep(1e-3)
t4 = time.time()
print "-----------------------"
print "Integration ID: %s" % acc_cnt
print "Time: acc_wait: %2.2fs" % (t2 - t1)
print "Time: snap_all: %2.2fs" % (t3 - t2)
print "Time: udp_send: %2.2fs" % (t4 - t3)
print "Time: total : %2.2fs" % (t4 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment