Skip to content

Instantly share code, notes, and snippets.

@telegraphic
Created June 21, 2022 05:02
Show Gist options
  • Save telegraphic/b7243becf9ffccb9dc3702298f55ea8e to your computer and use it in GitHub Desktop.
Save telegraphic/b7243becf9ffccb9dc3702298f55ea8e to your computer and use it in GitHub Desktop.
Gist for controlling multiple snap boards through `casperfpga` and multiprocessing
from utmost2d_snap import UtmostSnap
from rpi_bf_control import RpiBeamformerController
import time
from multiprocessing import JoinableQueue
from threading import Thread
class BoardManager(object):
""" Base class for multithreaded control of multiple boards
Args:
board_cfg_list (list): List of board config files, or list of IP addresses,
or other string that identifies board IDs.
board_controller (fn): A board controller function, such as the UtmostSnap
or RpiBeamformerController.
Notes:
The main method is _run_on_all(), which will run a given method from the
controller class on all boards. For example, if the class had a method
self.program(fpg_filename), you could call _run_on_all('program', fpg_filename)
The board_controller must have an attribute self.host, which is used to
start a unique thread and parse relevant dictionaries used in control.
"""
def __init__(self, board_cfg_list, board_controller):
self.controller = board_controller
self.boards = [self.controller(cfg) for cfg in board_cfg_list]
self.task_queue = JoinableQueue()
def __repr__(self):
rstr = f"<BoardManager - controller: {self.controller}>"
for brd in self.boards:
rstr += "\n %s" % str(brd)
return rstr
def _run(self, q, proc_id, fn_to_run, *args, **kwargs):
""" Multithreaded run command
Args:
q (JoinableQueue): Queue used for multithreading
proc_id (str): Unique name for process ID
fn_to_run (fn): Python method to run
args: list of arguments to pass to fn_to_run
"""
return_val = fn_to_run(*args, **kwargs)
q.put([proc_id, return_val])
q.task_done()
def _run_on_all(self, fn_to_run, *args, **kwargs):
""" Run a command in parallel across all boards
Args:
fn_to_run (fn): Python method to run (eg from board controller)
args: arguments to pass to fn_to_run
kwargs: keyword arguments to pass to fn_to_run
Notes:
If different values need to be passed to different boards, you can
pass a python dictionary, e.g.
`regvalue = {'host1': regval1, 'host2': regval2}`
"""
q = JoinableQueue()
for s in self.boards:
s_name = s.host
try:
method = getattr(s, fn_to_run)
except AttributeError:
raise RuntimeError("Cannot find method %s" % fn_to_run)
# Setup arguments and keyword args
all_args = [q, s_name, method]
if kwargs is None:
kwargs = {}
if args is not None:
for aa in args:
if isinstance(aa, dict):
all_args.append(aa[s_name])
else:
all_args.append(aa)
t = Thread(target=self._run,
name=s_name,
args=all_args,
kwargs=kwargs)
t.daemon = True
t.start()
q.join()
# Iterate through queue and get output
outdict = {}
for ii in range(0, len(self.boards)):
d_key, d_out = q.get()
outdict[d_key] = d_out
return outdict
class UtmostSnapManager(BoardManager):
""" Manager for multiple UtmostSnap boards
Args:
snapdict (dict): Dictionary of SNAP board configs, as set in config.py
Notes:
snapdict be of the form:
```
snapdict = {0: {'ip': snap0_ip, 'config': snap0_config},
1: {'ip': snap1_ip, 'config': snap1_config},
...}
```
"""
def __init__(self, snapdict):
board_controller = UtmostSnap
blist = [snapdict[ii]['config'] for ii in snapdict.keys()]
super(UtmostSnapManager, self).__init__(blist, board_controller)
self._run_on_all('get_system_information')
def configure_coarse_delays(self, delay_dict, wait_for_pps=True):
""" Configure coarse delays across boards
Runs the UtmostSnap.configure_coarse_delays() method, applying
given delays for each board.
Args:
delay_dict (dict): Dictionaries of delay values
wait_for_pps (bool): Wait for PPS or apply immediately? (Default True)
"""
if wait_for_pps:
self.wait_for_pps()
vv = {b.host: False for b in self.boards}
self._run_on_all('configure_coarse_delays', delay_dict, vv)
def read_coarse_delays(self):
""" Read coarse delays from SNAP registers
Returns: dict of delays per board
"""
return self._run_on_all('read_coarse_delays')
def wait_for_pps(self, verbose=False):
""" Wait for a PPS
Connects to board[0] and waits for a PPS, with 1ms waits between
register reads
"""
if verbose:
print("Waiting for next PPS..."),
# Wait for a PPS tick
ppsC = self.boards[0].read_int('pps_count')
while ppsC == self.boards[0].read_int('pps_count'):
time.sleep(0.001)
if verbose:
print('OK.')
class RpiBeamformerManager(BoardManager):
""" Manager for multipel RpiBeamformer boards
Args:
bfdict (dict): Dictionary of RPI Beamformer configs, as set in config.py
Notes:
bfdict should be of the form:
```
bfdict = {
'M1' : {'ip': '172.17.228.209', 'active_addrs': [0, 5], 'active_pols': ['hp', 'vp']},
'NO' : {'ip': '172.17.228.210', 'active_addrs': [1], 'active_pols': ['hp', 'vp']},
...}
```
"""
def __init__(self, bfdict):
self.cfg = {b['ip']: b for b in bfdict.values()}
board_controller = RpiBeamformerController
blist = [bfdict[k]['ip'] for k in bfdict.keys()]
super(RpiBeamformerManager, self).__init__(blist, board_controller)
def point(self, angle, verbose=False):
""" Point all boards towards given angle
Runs RpiBeamformerController.point_multi() method on all boards
Args:
angle (int): Value between -60 to 60 degrees.
print_cmd (bool): Print commands to screen
"""
addr_dict = {b: self.cfg[b]['active_addrs'] for b in self.cfg.keys()}
pol_dict = {b: self.cfg[b]['active_pols'] for b in self.cfg.keys()}
angle_dict = {b: angle for b in self.cfg.keys()}
print_dict = {b: verbose for b in self.cfg.keys()}
self._run_on_all('point_multi', addr_dict, pol_dict, angle_dict, print_dict)
if __name__ == "__main__":
import config
import redis
import time
r = redis.Redis(config.redis_db_ip)
rm = RpiBeamformerManager(config.bfdict)
rm.point(30, print_cmd=True)
# Pause any SNAP monitors by setting redis flag
print("Pausing data monitor..."),
r.set('pause_data_monitor', 'T')
print("OK")
sm = UtmostSnapManager(config.snapdict)
#sm.wait_for_pps()
t0 = time.time()
test_delays = {'172.17.228.248': [3] * 12,
'172.17.228.242': [4] * 12,
'172.17.228.244': [5] * 12}
sm.configure_coarse_delays(test_delays)
t1 = time.time() - t0
print(f"Time elapsed: {t1:2.2f}s")
r.set('pause_data_monitor', 'F')
#!/usr/bin/env python
"""
# rpi_bf_control.py
Control raspberry Pi beamformers
"""
import sys, redis, time, os
import config
import subprocess
class RpiBeamformerController(object):
""" Controller class for Raspberry Pi powered beamformer boards """
def __init__(self, ip):
self.ip = ip
self.host = ip # Used in multiboard controller
def _run_cmd(self, command, *args, print_cmd=False):
sshcmd = 'ssh -i ~/.ssh/id_rsa_pi'
ctlscript = 'sudo /mnt/utmost2d/beam_control_local/beam_bidir_v1/RPI_BIDIRECTIONAL_V1.py'
argstr = ' '.join(map(str, args))
bf_cmd = '{ssh} pi@{ip} {ctlscript} --{cmd} {args}'.format(ssh=sshcmd, ip=self.ip, ctlscript=ctlscript,
cmd=command, args=argstr)
if print_cmd:
print(bf_cmd)
sp = subprocess.Popen(bf_cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
out,err = sp.communicate()
rc = sp.wait()
time.sleep(1)
if print_cmd:
print(out)
def __repr__(self):
rstr = f"<RpiBeamformerController: {self.host}>"
return rstr
def _check_addr_pol(self, addr, pol, zero_ok=True):
""" Check address and pol are valid """
addr_range = range(0, 7) if zero_ok else range(1, 7)
if addr not in addr_range:
raise RuntimeError("Address must be between %i--6" % addr_range[0])
if pol.lower() not in ('hp', 'vp'):
raise RuntimeError("Pol must be 'vp' or 'hp'")
def point(self, addr, pol, angle, print_cmd=False):
""" Point the beamformer
Args:
addr (int): Address 1--6 for single BF pointing or 0 for broadcasting to all
pol (str): Either 'hp' or 'vp'
angle (int): Angle between -60 to 60 degrees
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(addr, pol)
if not (-60 <= angle <= 60):
raise RuntimeError("Angle must be between -60 and 60")
self._run_cmd('point', addr, pol, round(angle), print_cmd=print_cmd)
def point_multi(self, addr_list, pol_list, angle, print_cmd=False):
""" Point multiple pols for multiple addresses to given angle
Args:
addr_list (list): List of addresses
pol_list (list): List of pols (must be list even if ['vp'])
angle (int): Angle between -60 to 60 degrees
print_cmd (bool): Print runtime command to screen
"""
for addr in addr_list:
for pol in pol_list:
self.point(addr, pol, angle, print_cmd=print_cmd)
def lstate(self, addr, pol, state, print_cmd=False):
""" control the state of the LNAs
Args:
addr (int): Address 1--6 for single BF pointing or 0 for broadcasting to all
pol (str): Either 'hp' or 'vp'
state (str or int): A LNA state setting, either 'on', 'off' or 8-bit word of 0's and 1's
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(addr, pol)
self._run_cmd('lstate', addr, pol, state.lower(), print_cmd=print_cmd)
def query(self, addr, pol, print_cmd=False):
""" Query current switch settings
Args:
addr (int): Address 1--6 for single BF pointing
pol (str): Either 'hp' or 'vp'
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(addr, pol, zero_ok=False)
self._run_cmd('query', addr, pol, print_cmd=print_cmd)
def cur(self, addr, pol, print_cmd=False):
""" Query the current
Args:
addr (int): Address 1--6 for single BF pointing
pol (str): Either 'hp' or 'vp'
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(addr, pol, zero_ok=False)
self._run_cmd('cur', addr, pol, print_cmd=print_cmd)
def ver(self, addr, pol, print_cmd=False):
""" Query BF software version
Args:
addr (int): Address 1--6 for single BF pointing
pol (str): Either 'hp' or 'vp'
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(addr, pol, zero_ok=False)
self._run_cmd('ver', addr, pol, print_cmd=print_cmd)
def out(self, addr, pol, state, print_cmd=False):
""" turn the output amplifier on or off
Args:
addr (int): Address 1--6 for single BF pointing
pol (str): Either 'hp' or 'vp'
print_cmd (bool): Print the runtime command to screen
"""
self._check_addr_pol(add, pol)
self._run_cmd('out', addr, pol, state, print_cmd=print_cmd)
def sstate(self, state, print_cmd=False):
""" Put modules to sleep or wake them
Args:
state (str): Either 'on' or 'off'
print_cmd (bool): Print the runtime command to screen
"""
self._run_cmd('out', state.lower(), print_cmd=print_cmd)
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser(description='RPi beamformer controller', prefix_chars='@')
p.add_argument('ip', type=str, help='IP address of beamformer')
args = p.parse_args()
bf = RpiBeamformerController(args.ip)
#!/usr/bin/env python
"""
# utmost2d_snap.py
Monitor and control class for SNAP boards in UTMOST-2D project.
Provides a class, UtmostSnap, which wraps the CasperFpga class, adding
UTMOST-specific features for control and configuration.
Usage:
from utmost2d_snap import UtmostSnap
snap = UtmostSnap('../path/to/config.txt')
snap.program()
d_adc = snap.grab_adc_samples()
d_spec = snap.grab_spectra()
"""
import casperfpga
from casperfpga import Mac, IpAddress
import numpy as np
import struct
import time
import sys
import os
from config_parser import read_config_file, validate_config
class UtmostSnap(casperfpga.CasperFpga):
""" Python control class for UTMOST-2D SNAP board """
def __init__(self, config_file, katcp_port=7147, validate=True):
paramdict = read_config_file(config_file, validate=validate)
# Basic setup
self.ADC_CLK = float(paramdict['ADC_CLK']) # ADC Clock frequency, in MHz
self.ADC_GAIN = paramdict['ADC_GAIN'] # Digital gain on ADC
self.PI_IP = paramdict['PI_IP'] # string, IP of host raspberry pi
self.BOFFILE = paramdict['BOFFILE'] # string, name of bof
self.SNAPID = paramdict['SNAPID'] # 8 bit integer ID for board
# Ethernet
self.SNAPMAC1 = Mac(paramdict['SNAPMAC1']).mac_int # MAC address for GbE1
self.SNAPMAC2 = Mac(paramdict['SNAPMAC2']).mac_int # MAC address for GbE2
self.SNAPIP1 = IpAddress(paramdict['SNAPIP1']).ip_int # IP address for GbE1
self.SNAPIP2 = IpAddress(paramdict['SNAPIP2']).ip_int # IP address for GbE2
self.SNAPPORT1 = paramdict['SNAPPORT1'] # integer, port number for GbE1
self.SNAPPORT2 = paramdict['SNAPPORT2'] # integer, port number for GbE2
self.DESTMACS1 = paramdict['DESTMACS1'] # 256x MACs for GbE1 ARP table
self.DESTMACS2 = paramdict['DESTMACS2'] # 256x MACs for GbE2 ARP table
self.DESTIPS1 = paramdict['DESTIPS1'] # 10x dest IPs for each subband
self.DESTIPS2 = paramdict['DESTIPS2'] # 10x dest IPs for each subband
self.DESTPORTS1 = paramdict['DESTPORTS1'] # List of 10 destination IP ports
self.DESTPORTS2 = paramdict['DESTPORTS2'] # List of 10 destination IP ports
# Convert to integers
self.DESTMACS1 = [Mac(m).mac_int for m in self.DESTMACS1]
self.DESTMACS2 = [Mac(m).mac_int for m in self.DESTMACS2]
# Channel selection
self.START_CHANNEL_1 = paramdict['START_CHANNEL_1'] # Lowest chan for 320 channels on GbE1
self.START_CHANNEL_2 = paramdict['START_CHANNEL_2'] # Lowest chan for 320 channels on GbE2
# Coarse delay and F-engine
self.COARSE_DELAYS = paramdict['COARSE_DELAYS'] # Coarse delay (clk cycles) ADC0-11
self.PFB_FFTSHIFT = paramdict['FFTSHIFT']
self.PFB_GAIN = paramdict['GAIN']
self.FLIP_BAND = paramdict['FLIP_BAND']
# Setup frequency values (currently hardcoded Nyquist zone)
NYQ_ZONE = 8
self.freqs = np.arange(0, 1024) * self.ADC_CLK / 2 / 1024 + self.ADC_CLK / 2 * NYQ_ZONE
# Mapping of antenna ID to stream ID for spectrometer
self._spec_stream_map = {
0: [0, 0],
2: [0, 1024],
1: [1, 0],
3: [1, 1024],
4: [2, 0],
6: [2, 1024],
5: [3, 0],
7: [3, 1024],
8: [4, 0],
10: [4, 1024],
9: [5, 0],
11: [5, 1024]
}
self._spec_ant_id_map = {
0: [0, 2],
1: [1, 3],
2: [4, 6],
3: [5, 7],
4: [8, 10],
5: [9, 11]
}
super(UtmostSnap, self).__init__(self.PI_IP, katcp_port)
def __repr__(self):
return "<UtmostSnap IP:{ip} CLK:{clk}MHz FPG:{fpg}>".format(ip=self.PI_IP, clk=self.ADC_CLK,
fpg=os.path.basename(self.BOFFILE))
def program(self, fpg=None):
""" Program SNAP board with firmware and calibrate ADCS
Args:
fpg (str): Name of fpg file to upload and program. Defaults to None.
(Will use fpg from config file as default).
"""
name_of_bof = self.BOFFILE
#if name_of_bof in r.listbof():
print("Upload and program %s..." % name_of_bof)
self.upload_to_ram_and_program(name_of_bof)
#r.adc.initialize(chips='all', demux_mode=1, gain=0b0110)
print("Initialize ADCs...")
adc = self.adcs.adc_snap_adc
is_cal = adc.init(sample_rate=self.ADC_CLK, num_channel=4)
if is_cal == 0:
print('ADC calibrated')
else:
print('ADC CALIBRATION ERROR')
exit()
print("\tADC digital gain: %i" % self.ADC_GAIN)
adc.set_gain([self.ADC_GAIN]*4, use_linear_step=True)
print("Checking ethernet status..."),
time.sleep(1)
status1 = self.read_int('eth_gbe1_status')
status2 = self.read_int('eth_gbe2_status')
if (status1 == 4): # and (status2 == 4):
print('Ready to configure ethernet and packetizer.')
elif status1 != 4:
print("There's a problem, ethernet1 status register = ", self.read_int('eth_gbe1_status'))
if status1 < 4:
print("Status msb = 0 means that the ethernet link is not up.")
print("ERROR programming not fully successful")
else:
print("Programmed OK.")
def configure_ethernet(self):
""" Configure 10 GbE Ethernet cores
Ethernet settings are read from the config_file.
"""
print('Configuring 10Gb ethernet...')
# set SNAP board identifier
self.write_int('eth_gbe1_header_id', self.SNAPID)
self.write_int('eth_gbe2_header_id', self.SNAPID)
self.gbes.eth_gbe1.setup(mac=self.SNAPMAC1, ipaddress=self.SNAPIP1, port=self.SNAPPORT1,
gateway=1, subnet_mask='255.255.255.0')
self.gbes.eth_gbe2.setup(mac=self.SNAPMAC2, ipaddress=self.SNAPIP2, port=self.SNAPPORT2,
gateway=1, subnet_mask='255.255.255.0')
self.gbes.eth_gbe1.set_arp_table(self.DESTMACS1)
self.gbes.eth_gbe2.set_arp_table(self.DESTMACS2)
self.gbes.eth_gbe1.configure_core()
self.gbes.eth_gbe2.configure_core()
print('Configuring destination IP addresses and ports...')
for I in range(len(self.DESTIPS1)):
print('\tGBE1-%i %s:%i' % (I, self.DESTIPS1[I], self.DESTPORTS1[I]))
self.write_int('eth_gbe1_dest_ip%i' % I, IpAddress(self.DESTIPS1[I]).ip_int)
self.write_int('eth_gbe1_dest_port%i' % I, self.DESTPORTS1[I])
for I in range(len(self.DESTIPS2)):
print('\tGBE2-%i %s:%i' % (I, self.DESTIPS2[I], self.DESTPORTS2[I]))
self.write_int('eth_gbe2_dest_ip%i' % I, IpAddress(self.DESTIPS2[I]).ip_int)
self.write_int('eth_gbe2_dest_port%i' % I, self.DESTPORTS2[I])
def reset_and_sync(self, use_pps=True):
""" reset ethernet block and send sync pulse
Args:
use_pps (bool): Use the pulse-per-second to arm sync pulse. If false, a
'fake' PPS signal will be sent manually.
"""
if use_pps:
print("Checking PPS status..."),
self.write_int('software_start', 0)
self.write_int('pps_counter_rst', 0)
self.write_int('pps_counter_rst', 1)
self.write_int('pps_counter_rst', 0)
# Wait until next unix tick
t0 = time.time()
td = t0 - int(t0)
time.sleep(1 - td + 0.1)
# Read PPS
pps0 = self.read_int('pps_count')
time.sleep(1.01)
pps1 = self.read_int('pps_count')
if pps1 - pps0 != 1:
print("\nError: PPS is not being receieved each second!")
print("Error: PPS0: {} PPS1: {}".format(pps0, pps1))
else:
print("OK.")
print("Waiting for next PPS..."),
while pps1 == self.read_int('pps_count'):
time.sleep(0.01)
print("OK.")
pps_start = self.read_int('pps_count') + 1
unix_start = int(time.time()) + 1
self.write_int('software_start', 1)
time.sleep(1.01)
self.write_int('software_start', 0)
self.write_int('eth_tx_enable', 1)
print("Starting on PPS {}. Unix start {}".format(pps_start, unix_start))
else:
print('Sending manual pulse to reset ethernet and sync...')
self.write_int('force_new_sync',1)
time.sleep(0.1)
self.write_int('force_new_sync',0)
time.sleep(1.01)
self.write_int('eth_tx_enable', 1)
time.sleep(1.01)
def check_ethernet_status(self):
""" Read ethernet status registers
If value == 4 everything is good and link is up.
If value < 4 the physical link isn't up.
"""
print('Ethernet1 status = %i' % self.read_int('eth_gbe1_status'))
print('Ethernet2 status = %i' % self.read_int('eth_gbe2_status'))
def configure_coarse_delays(self, coarse_delays=None, verbose=True):
""" Apply coarse delays to ADC inputs.
If no arguments supplied, will use values from config file.
Args:
coarse_delays (list/array): Integer delays to apply to each input.
Should be list or array of length 12.
"""
if coarse_delays is not None:
try:
assert len(coarse_delays) == 12
self.COARSE_DELAYS = coarse_delays
except AssertionError:
raise RuntimeError("Coarse delays must be of length 12")
if verbose:
print('Configuring coarse delays.')
for ii in range(12):
if verbose:
print('\tADC %i: %i' % (ii, self.COARSE_DELAYS[ii]))
self.write_int('adc_delay%i' % ii, self.COARSE_DELAYS[ii])
def read_coarse_delays(self):
""" Read current coarse delays
Returns:
coarse_delays (list/array): Integer delays to apply to each input, list of length 12.
"""
coarse_delays = np.zeros(12, dtype='int32')
for ii in range(12):
coarse_delays[ii] = self.read_int('adc_delay%i' % ii)
return coarse_delays
def configure_fengine(self, fft_shift=None, rq_gain=None):
""" Configure the F-engine
Applies FFT shift, requantization gain, and sets up data reorder
for packetization.
If no arguments supplied, will use settings from config file
Args:
fft_shift (int): Shift schedule to apply (see docs).
rq_gain (int): Gain value to use in requantization (see docs).
"""
print("Configuring F-engine...")
self.write_int('pfb_fftshift', self.PFB_FFTSHIFT)
self.write_int('pfb_gain', self.PFB_GAIN)
self.write_int('band_flip', self.FLIP_BAND)
print("\tFFT shift schedule: %s" % bin(self.PFB_FFTSHIFT))
print("\tRequantization gain: %s" % bin(self.PFB_GAIN))
print("Configuring reorder map...")
#set reorder map
T = 8 #number of spectra per packet
Bt = 32 #TOTAL number of subbands in spectrum
B = 10 #number of subbands TO SEND
F = 32 #number of frequency channels per subband
M = 2 #number of modules per fft output stream
reorder_length = T*Bt*F*M;
# start = 330; #lowest frequency channel number to send
#start1 = self.START_CHANNEL_1 + 2
#start2 = self.START_CHANNEL_2 + 2
start1 = self.START_CHANNEL_1
start2 = self.START_CHANNEL_2
I = np.ones(reorder_length);
reorder_length = T*Bt*F*M;
I = np.ones(reorder_length)
j=0;
for b in range(B):
for t in range(T):
for f in range(F):
for m in range(M):
i1 = Bt*F*M*t + F*b +f + Bt*F*m + start1;
i2 = Bt*F*M*t + F*b +f + Bt*F*m + start2;
#p = m+f+b+t; This was useful for debugging.
I[2*j] = i1;
I[2*j +1] = i2;
j +=1;
j +=128; #defines gap between packets (gap between one set of subbands and the next)
reordermap = I.astype(int)
s = struct.Struct('>16384H')
self.write('reorder_reorder1_map1',s.pack(*reordermap))
time.sleep(0.1)
def grab_adc_samples(self):
""" Read raw ADC samples
Returns:
Dictionary of lists, each list 4096 samples long. Keys are 'a0' through 'a11'.
data = { 'a0': [sample0, sample1, ... sample4095],
'a1': [sample0, ...],
'a11': ... }
"""
ss = self.devices['adc_samples_ss']
d = ss.read(man_trig=True, man_valid=True)
d = d['data']
for k, v in d.items():
d[k] = np.array(v)
return d
def _grab_spectra_stream(self, stream=0):
""" Grab spectra for given input stream
Two antpols on each stream stored in BRAM, use grab_spectrum(ant_id).
Args:
stream (int): Integer between 0-6. Two antpols on each stream.
Returns: np.array of length 2048, dtype='uint64'.
"""
d = np.fromstring(self.read('spec_bram%i' % stream, 2048 * 8), dtype='uint64').byteswap()
return d
def grab_spectrum(self, ant_id):
""" Grab spectrum for given antpol input (0-11).
Args:
ant_id (int): Antpol ID (0-11)
Returns: np.array of length 1024, dtype='float32'
"""
stream_id, steam_offset = self._spec_stream_map[ant_id]
d_stream = self._grab_spectra_stream(stream_id).astype('float32')
return d_stream[stream_offset:stream_offset+1024] / 2**32
def grab_spectra(self):
""" Grab spectra for all 12 inputs from on-board spectrometer.
Returns: python dict {'a0': np.array([chan0, .. chan1023], dtype='float32'), ...
'a11': np.array([chan0, .. chan1023], dtype='float32)}
"""
d = {}
for stream, (ant0, ant1) in self._spec_ant_id_map.items():
dstream = np.fromstring(self.read('spec_bram%i' % stream, 2048 * 8), dtype='uint64').byteswap()
d['a%i' % ant0] = dstream[:1024].astype('float32') / 2**32
d['a%i' % ant1] = dstream[1024:].astype('float32') / 2**32
return d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment