Skip to content

Instantly share code, notes, and snippets.

@PhilReinhold
Created January 25, 2015 18:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PhilReinhold/05b475549e4c37e2f95a to your computer and use it in GitHub Desktop.
Save PhilReinhold/05b475549e4c37e2f95a to your computer and use it in GitHub Desktop.
PyQT Experiment GUI
from ipython_widget import QIPythonWidget
from PyQt4 import QtGui, QtCore
from PyQt4.QtCore import Qt
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt4agg import FigureCanvasQTAgg, NavigationToolbar2QT
import objectsharer as objsh
import socket
import os
import logging
from instrumentserver.instrument_gui import InstrumentsContainer
from qsci_simple_pythoneditor import SimplePythonEditor
import base_experiment
from YngwieDecoding import ResultRecord
import traceback
import fpga_config
import importlib
import subprocess
import time
import cgi
BLOCK_FMT = 'Data_0x%04d[0x00]B0_#%04d.bin'
class ExperimentTab(QtGui.QWidget):
tab_actions = [
('run', 'F5'),
('stop', 'F8'),
('clone', 'Ctrl+n'),
('reload', 'Ctrl+r'),
('update', 'Ctrl+u'),
('screenshot', None)
]
def __init__(self, experiment, window, name):
'''
:type experiment: base_experiment.BaseExperiment
:type window: ExperimentWindow
:type name: str
'''
super(ExperimentTab, self).__init__()
self.experiment = experiment
self.name = name
self.filename = str(os.path.join('experiments', *name.split('.'))) + '.py'
self.experiment.run_step.connect(self.update_progress)
self.experiment.run_finished.connect(self.run_finished)
self.window = window
self.view_box = QtGui.QTabWidget()
self.main_plot = PlotWidget()
self.code_widget = CodeWidget(self.filename)
self.history_widget = HistoryWidget(experiment, window.dataserver)
self.log_view = QtGui.QPlainTextEdit()
self.log_view.document().setDefaultFont(QtGui.QFont("Consolas", 10))
log_view_handler = LogViewHandler('Experiment', window)
log_view_handler.setFormatter(fpga_config.formatter)
self.experiment.logger.addHandler(log_view_handler)
layout = QtGui.QVBoxLayout(self)
layout.addWidget(self.view_box)
VBoxTab(self.view_box, 'plot', self.main_plot, key='Alt+p')
VBoxTab(self.view_box, 'code', self.code_widget, 'Alt+c')
VBoxTab(self.view_box, 'history', self.history_widget, 'Alt+h')
VBoxTab(self.view_box, 'log', self.log_view, 'Alt+l')
self.param_box = ParamsWidget(experiment)
layout.addWidget(self.param_box)
self.display_box = TextDisplay()
layout.addWidget(self.display_box)
button_layout = QtGui.QHBoxLayout()
layout.addLayout(button_layout)
self.buttons = {}
for name, keys in self.tab_actions:
proper_name = name.capitalize().replace('_', ' ')
action = QtGui.QAction(proper_name, self)
if keys is not None:
action.setShortcut(QtGui.QKeySequence(keys))
action.triggered.connect(lambda x=None, n=name: getattr(self, n)())
self.addAction(action)
self.buttons[name] = button = QtGui.QPushButton(proper_name)
button.clicked.connect(action.triggered)
button_layout.addWidget(button)
self.buttons['update'].setEnabled(hasattr(self.experiment, 'update'))
self.buttons['run'].clicked.connect(lambda: self.view_box.setCurrentIndex(0))
log_file = self.experiment.log_file
self.file_watcher = QtCore.QFileSystemWatcher([self.filename, log_file])
def file_changed(path):
path = str(path)
if path == self.filename:
style_sheet = "QPushButton { background-color: red }"
self.buttons['reload'].setStyleSheet(style_sheet)
QtCore.QTimer.singleShot(100, self.code_widget.load_from_disk)
elif path == log_file:
with open(log_file, 'r') as f:
text = f.read()
self.log_view.setPlainText(text)
QtGui.QApplication.instance().processEvents()
scroll_bar = self.log_view.verticalScrollBar()
scroll_bar.setValue(scroll_bar.maximum())
else:
logger.warn('Unknown path %s change notified', path)
self.file_watcher.fileChanged.connect(file_changed)
if os.path.exists(log_file):
file_changed(log_file)
def plot(self):
self.main_plot.fig.clear()
self.experiment.plot_last(self.main_plot.fig)
for name, val in self.experiment.fit_params.items():
fmt_str, denom = self.experiment.fit_fmt.get(name, ("%.2e", 1))
self.display_box.display(name, fmt_str % (val / denom))
self.main_plot.canvas.draw()
def clone(self):
logger.info('cloning %s, getting path', self.name)
expt_path = os.path.abspath('experiments')
new_path = str(QtGui.QFileDialog.getSaveFileName(
self, 'Copy %s to' % self.name, expt_path, 'Python Files (*.py)'
))
new_classname = os.path.basename(new_path).split('.py')[0]
self_path = os.path.join(expt_path, *self.name.split('.')) + '.py'
self_classname = os.path.basename(self_path).split('.py')[0]
with open(self_path, 'r') as f:
self_content = f.read()
logging.debug('old %s --> new %s', self_classname, new_classname)
new_content = self_content.replace('class %s' % self_classname, 'class %s' % new_classname)
with open(new_path, 'w') as f:
f.write(new_content)
logger.info('cloned %s --> %s', self_path, new_path)
def reload(self):
self.window.reload_tab(self)
def run(self):
logger.info('Running %s', self.name)
self.window.progress_bar.setMaximum(self.experiment.n_steps)
self.window.progress_bar.setValue(0)
self.window.instruments.save_instruments('ins_settings.txt')
self.experiment.run()
def stop(self):
logger.info('Aborted %s', self.name)
self.experiment.stop()
QtGui.QApplication.instance().processEvents()
def update(self):
self.experiment.update()
def run_finished(self):
self.window.progress_bar.setValue(0)
import numpy
vars = {
'np': numpy,
'exp': self.experiment,
'raw_data': self.experiment.raw_data,
'processed_data': self.experiment.processed_data,
'run_params': self.experiment.run_params,
'run_inst_params': self.experiment.run_inst_params,
'fit_params': self.experiment.fit_params,
'fig': self.main_plot.fig,
}
win.shell.pushVariables(vars)
logger.info('run finished')
def update_progress(self, cur_blocks, tot_blocks):
self.plot()
if self.experiment.is_running:
self.window.progress_bar.setMaximum(tot_blocks)
self.window.progress_bar.setValue(cur_blocks)
logger.info('%s / %s blocks', cur_blocks, tot_blocks)
def get_state(self):
return self.param_box.get_all_parameters()
def screenshot(self):
ss_dir = os.path.join(self.experiment.directory, 'screenshots')
if not os.path.exists(ss_dir):
os.mkdir(ss_dir)
fn = os.path.join(ss_dir, time.strftime('%Y%m%d_%H%M%S.png'))
QtGui.QPixmap.grabWidget(self, self.rect()).save(fn, 'PNG')
logger.info('Saving screenshot %s', fn)
# self.window.journal.add_image(fn)
class FPGAExperimentTab(ExperimentTab):
tab_actions = ExperimentTab.tab_actions + [
('make_tables', 'Ctrl+t'),
]
def __init__(self, experiment, window, name):
super(FPGAExperimentTab, self).__init__(experiment, window, name)
self.poll_timer = QtCore.QTimer()
self.poll_timer.timeout.connect(self.experiment.process_available_blocks)
self.experiment.run_started.connect(lambda: self.poll_timer.start(500))
self.tables_widget = TablesWidget(experiment)
self.rrec_widget = ResultRecordsWidget(experiment)
self.waves_widget = WavesWidget(experiment)
VBoxTab(self.view_box, 'tables', self.tables_widget, 'Alt+t')
VBoxTab(self.view_box, 'rrecs', self.rrec_widget, 'Alt+r')
VBoxTab(self.view_box, 'waves', self.waves_widget, 'Alt+w')
self.param_box.param_widgets['n_blocks'].textChanged.connect(self.update_seq_time)
self.param_box.param_widgets['averages_per_block'].textChanged.connect(self.update_seq_time)
self.buttons['run'].setDisabled(window.exp_is_running)
self.buttons['stop'].setEnabled(window.exp_is_running)
window.exp_running_sig.connect(self.buttons['run'].setDisabled)
window.exp_running_sig.connect(self.buttons['stop'].setEnabled)
# self.buttons['make_tables'].clicked.connect(lambda: self.view_box.setCurrentIndex(1))
def run(self):
self.make_tables()
super(FPGAExperimentTab, self).run()
self.window.exp_running_sig.emit(True)
def run_finished(self):
self.poll_timer.stop()
self.window.exp_running_sig.emit(False)
super(FPGAExperimentTab, self).run_finished()
def make_tables(self):
try:
self.experiment.compile_raw()
self.tables_widget.set_raw_seq()
except Exception as e:
self.tables_widget.dsl_table.setText(str(e))
try:
self.experiment.write_tables()
self.update_seq_time()
self.tables_widget.set_sequences()
self.waves_widget.load_waves()
master_seq = self.tables_widget.master_table.get_condensed()
self.rrec_widget.set_master_seq(master_seq)
except Exception as e:
self.tables_widget.master_table.condensed_view.setText(str(e))
raise
def update_seq_time(self):
self.seq_time = self.experiment.seq_time
averages_per_block = self.experiment.averages_per_block
n_blocks = self.experiment.n_blocks
averages = averages_per_block * n_blocks
expected_time = (4 * self.seq_time * averages) / 1e9
time_str = human_time(expected_time)
expected_block_time = (4 * self.seq_time * averages_per_block) / 1e9
self.display_box.display('Averages', averages)
self.display_box.display('Estimated time', time_str)
self.display_box.display('Block time', human_time(expected_block_time))
self.display_box.display('Result Shape', self.experiment.result_shape)
self.buttons['run'].setText('run (%s)' % time_str)
class ScanExperimentTab(ExperimentTab):
def __init__(self, experiment, window, name):
super(ScanExperimentTab, self).__init__(experiment, window, name)
self.points_combo_box = QtGui.QComboBox()
self.points_combo_box.currentIndexChanged.connect(self.point_selected)
self.points_plot = PlotWidget()
points_layout = VBoxTab(self.view_box, 'points')
points_layout.addWidget(self.points_combo_box)
points_layout.addWidget(self.points_plot)
def plot(self):
super(ScanExperimentTab, self).plot()
n_new = len(self.experiment.points_data)
n_old = self.points_combo_box.count()
if n_old > n_new:
self.points_combo_box.clear()
self.points_combo_box.addItems(map(str,range(n_new)))
for i in range(n_old, n_new):
self.points_combo_box.addItem(str(i))
def point_selected(self):
name = str(self.points_combo_box.currentText())
logger.debug('point: %s', name)
if name != '':
data = self.experiment.points_data[int(name)]
self.experiment.child_instance.plot(self.points_plot.fig, data)
self.points_plot.canvas.draw()
class ExperimentListModel(QtGui.QFileSystemModel):
def __init__(self):
super(ExperimentListModel, self).__init__()
self.setRootPath(os.path.abspath('experiments'))
self.setNameFilters(['[^_]?*.py'])
self.setNameFilterDisables(False)
def columnCount(self, index):
return 1
def data(self, index, role):
ret = super(ExperimentListModel, self).data(index, role)
if role == Qt.DisplayRole:
return str(ret).replace('.py', '')
if role == Qt.DecorationRole:
return None
return ret
class ExperimentList(QtGui.QTreeView):
def __init__(self):
super(ExperimentList, self).__init__()
model = ExperimentListModel()
self.setModel(model)
self.setRootIndex(model.index(os.path.abspath('experiments')))
enter_key = QtGui.QShortcut(Qt.Key_Return, self)
enter_key.setContext(Qt.WidgetShortcut)
enter_key.activated.connect(self.toggle_current)
def toggle_current(self):
idx = self.currentIndex()
if self.model().hasChildren(idx):
self.setExpanded(idx, not(self.isExpanded(idx)))
else:
self.doubleClicked.emit(idx)
class ExperimentWindow(QtGui.QMainWindow):
exp_running_sig = QtCore.pyqtSignal(bool)
exp_is_running = False
def __init__(self):
super(ExperimentWindow, self).__init__()
self.tab_widget = QtGui.QTabWidget()
self.tab_widget.setTabsClosable(True)
self.tab_widget.setMovable(True)
self.tab_widget.tabCloseRequested.connect(self.close_tab)
self.tab_widget.currentChanged.connect(self.set_experiment_actions)
self.opened = {}
self.experiments = {}
self.exp_list = ExperimentList()
self.exp_list.doubleClicked.connect(self.load_from_list)
self.shell = QIPythonWidget()
self.shell.pushVariables({'exps':self.experiments})
self.log_view = LogView()
# self.journal = JournalWidget()
self.calibrations = CalibrationsWidget()
self.get_remote_objects()
self.instruments_container = InstrumentsContainer(self.instruments)
self.progress_bar = QtGui.QProgressBar()
self.progress_bar.setStyleSheet(
"QProgressBar {"
" border: 2px solid grey;"
" border-radius: 0px;"
" text-align: center;}"
"QProgressBar::chunk {"
" background-color: #3add36;"
" width: 1px;}"
)
self.statusBar().addPermanentWidget(self.progress_bar)
self.exp_running_sig.connect(lambda val: setattr(self, 'exp_is_running', val))
file_menu = self.menuBar().addMenu('File')
file_menu.addAction('Reload all modules', self.reload_all, QtGui.QKeySequence('Ctrl+Shift+R'))
self.experiment_menu = self.menuBar().addMenu('Experiment')
self.view_menu = view_menu = self.menuBar().addMenu('View')
def increment_tab():
new_idx = self.tab_widget.currentIndex() + 1
new_idx %= self.tab_widget.count()
self.tab_widget.setCurrentIndex(new_idx)
def decrement_tab():
new_idx = self.tab_widget.currentIndex() - 1
new_idx %= self.tab_widget.count()
self.tab_widget.setCurrentIndex(new_idx)
view_menu.addAction('Previous Experiment Tab', decrement_tab, QtGui.QKeySequence('Ctrl+['))
view_menu.addAction('Next Experiment Tab', increment_tab, QtGui.QKeySequence('Ctrl+]'))
condense_tables_action = view_menu.addAction('Condensed Tables')
SequenceTable.condense_action = condense_tables_action
condense_tables_action.setCheckable(True)
condense_tables_action.setChecked(True)
self.setCentralWidget(self.tab_widget)
self.add_dock(self.exp_list, 'Experiments', 'Left', 150)
self.add_dock(self.shell, 'Shell', 'Right')
self.add_dock(self.log_view, 'Log', 'Left')
inst_dock = self.add_dock(self.instruments_container, 'Instruments', 'Right', 300)
calib_dock = self.add_dock(self.calibrations, 'Calibrations', 'Right')
self.tabifyDockWidget(inst_dock, calib_dock)
instruments_menu = self.menuBar().addMenu('Instruments')
instruments_view_menu = instruments_menu.addMenu('View')
for action in self.instruments_container.actions():
instruments_menu.addAction(action)
for name, action in self.instruments_container.show_tab_actions.items():
instruments_view_menu.addAction(action)
if name not in ('qubit_info', 'cavity_info', 'system_info', 'yngwie'):
action.setChecked(False)
objsh.backend.add_qt_timer()
logger.info('Ready')
def add_dock(self, widget, name, location, min_width=None):
dock = QtGui.QDockWidget(name)
dock.setWidget(widget)
self.view_menu.addAction(dock.toggleViewAction())
loc_const = getattr(Qt, location+'DockWidgetArea')
self.addDockWidget(loc_const, dock)
if min_width is not None:
dock.setMinimumWidth(min_width)
return dock
def setup_logging(self):
self.log_view.add_section('Experiment')
self.log_view.add_section('Errors')
def excepthook(type, value, tb):
tb_str = '\n'.join(traceback.format_exception(type, value, tb))
print tb_str
win.log_view.log_to('Errors', '<pre>%s</pre>' % cgi.escape(tb_str))
sys.excepthook = excepthook
logger.addHandler(StatusBarHandler(self))
logger.addHandler(LogViewHandler('GUI', self))
objsh.logger.addHandler(LogViewHandler('Objsh', self))
def set_experiment_actions(self, tab_index):
tab = self.tab_widget.widget(tab_index)
self.experiment_menu.clear()
if tab is None:
return
for action in tab.actions():
self.experiment_menu.addAction(action)
def get_remote_objects(self):
objsh.backend.start_server('127.0.0.1', 55557)
objsh.register(self, 'experiment_gui')
self.dataserver = self.get_dataserver()
self.instruments = self.get_instruments()
try:
self.yng = self.instruments['yngwie']
except KeyError:
logger.error('Yngwie not present in instruments, create it?')
# for i, name in enumerate(('yngwie', 'qubit_info', 'cavity_info', 'system_info')):
# tab = self.instruments_container._ins_widgets[name]
# index = self.instruments_container.indexOf(tab)
# self.instruments_container.tabBar().moveTab(index, i)
def load_from_list(self, index):
full_path = str(self.exp_list.model().filePath(index).replace('.py', ''))
path = os.path.relpath(full_path, 'experiments').split(os.sep)
name = '.'.join(path)
if name not in self.opened:
self.add_tab(name)
else:
tab_index = self.tab_widget.indexOf(self.opened[name])
self.tab_widget.setCurrentIndex(tab_index)
def reload_all(self):
logger.info('Reload all')
import fpga_compiler
reload(fpga_compiler)
import base_experiment
reload(base_experiment)
base_experiment.Parameter.gui = self
import fpga_dsl
reload(fpga_dsl)
for tab in self.opened.values():
self.reload_tab(tab, view=False)
def reload_tab(self, tab, view=True):
logger.info('Reloading %s', tab.name)
name = tab.experiment.name
index = self.tab_widget.indexOf(tab)
view_index = tab.view_box.currentIndex()
params = tab.experiment.get_params()
self.close_tab(index)
tab = self.add_tab(name)
tab.experiment.set_params(params)
def set_index():
self.opened[name].view_box.setCurrentIndex(view_index)
if view:
QtCore.QTimer.singleShot(600, set_index)
def add_tab(self, name):
if name in self.opened:
return self.opened[name]
logger.info('Adding tab %s', name)
mod_name = 'experiments.' + name
mod = importlib.import_module(mod_name)
reload(mod)
self.experiments[name] = exp = getattr(mod, name.split('.')[-1])(name)
tab_cls = FPGAExperimentTab
if isinstance(exp, base_experiment.ScanExperiment):
child_inst = self.add_tab(exp.child_experiment).experiment
exp.child_instance = child_inst
tab_cls = ScanExperimentTab
self.opened[name] = tab = tab_cls(exp, self, name)
index = self.tab_widget.addTab(tab, name)
self.tab_widget.setCurrentIndex(index)
return tab
def close_tab(self, tab_index):
tab = self.tab_widget.widget(tab_index)
if tab.experiment.is_running:
logger.error('Stop experiment before closing!')
return
logger.info('Closing %s', tab.name)
self.opened.pop(tab.name)
self.experiments.pop(tab.name)
self.tab_widget.removeTab(tab_index)
def get_param(self, name, param_name):
if name in self.opened:
return self.opened[name].param_box.get_param(param_name)
elif name in self.calibrations.mode_params:
return self.calibrations.mode_params[name].get_param(param_name)
else:
return None
def set_param(self, exp_name, param_name, val):
if exp_name in self.opened:
self.opened[exp_name].param_box.set_param(param_name, val)
def get_instruments(self):
working_dir = os.path.join(fpga_config.qrlab_directory, 'instrumentserver')
fn = os.path.join(working_dir, 'instruments_server.py')
return self.get_object('instruments', fn, 55555, cwd=working_dir)
def get_dataserver(self):
fn = os.path.join(
fpga_config.qrlab_directory, 'dataserver', 'dataserver.py'
)
return self.get_object('dataserver', fn, 55556)
def get_object(self, objname, fn, port, **kwargs):
addr = 'tcp://127.0.0.1:%d' % port
try:
objsh.backend.connect_to(addr)
except socket.error:
logger.warn('Could not connect to %s, launching server', addr)
DETACHED = 8 # magic
cmd = [sys.executable, fn]
logger.debug(cmd)
subprocess.Popen(cmd, creationflags=DETACHED, shell=True, **kwargs)
time.sleep(3)
objsh.backend.connect_to(addr)
logger.info('Connected to %s', addr)
obj = objsh.find_object(objname)
assert obj is not None, "Could not find object"
return obj
class VBoxTab(QtGui.QVBoxLayout):
def __init__(self, container, text, main_widget=None, key=None):
self.widget = QtGui.QWidget()
super(VBoxTab, self).__init__(self.widget)
container.addTab(self.widget, text)
if main_widget is not None:
self.addWidget(main_widget)
if key is not None:
key_action = QtGui.QAction('View %s' % text, container)
key_action.setShortcut(QtGui.QKeySequence(key))
def set_visible():
container.setCurrentIndex(container.indexOf(self.widget))
key_action.triggered.connect(set_visible)
container.addAction(key_action)
container.setContextMenuPolicy(Qt.ActionsContextMenu)
class TablesWidget(QtGui.QTabWidget):
def __init__(self, experiment):
super(TablesWidget, self).__init__()
self.experiment = experiment
self.master_table = MasterTable()
self.analog0_table = AnalogTable()
self.analog1_table = AnalogTable()
self.digital_table = DigitalTable()
self.dsl_table = QtGui.QTextEdit()
self.dsl_table.setReadOnly(True)
VBoxTab(self, 'Master', self.master_table)
VBoxTab(self, 'Analog 0', self.analog0_table)
VBoxTab(self, 'Analog 1', self.analog1_table)
VBoxTab(self, 'Digital', self.digital_table)
VBoxTab(self, 'DSL Raw', self.dsl_table)
def set_sequences(self):
logger.info('Updating tables')
mseq, (a0seq, a1seq), (dseq,) = self.experiment.sequences
self.master_table.set_sequence(mseq)
self.analog0_table.set_sequence(a0seq)
self.analog1_table.set_sequence(a1seq)
self.digital_table.set_sequence(dseq)
self.set_raw_seq()
def set_raw_seq(self):
self.dsl_table.setText(self.experiment.print_raw(to_string=True))
class SequenceTable(QtGui.QWidget):
base_headers = [
'addr',
'jump',
'label',
'length',
]
headers = []
aliases = {}
defaults = {}
condense_action = None
def __init__(self):
super(SequenceTable, self).__init__()
self.table_view = QtGui.QTableView()
self.table_model = QtGui.QStandardItemModel()
self.table_view.setModel(self.table_model)
self.condensed_view = QtGui.QTextEdit()
self.condensed_view.setReadOnly(True)
self.condensed_view.setCurrentFont(QtGui.QFont("Consolas", 10))
layout = QtGui.QVBoxLayout(self)
layout.addWidget(self.table_view)
layout.addWidget(self.condensed_view)
def set_view(condensed):
self.condensed_view.setVisible(condensed)
self.table_view.setVisible(not condensed)
SequenceTable.condense_action.toggled.connect(set_view)
set_view(SequenceTable.condense_action.isChecked())
@property
def all_headers(self):
return self.base_headers + self.headers
@property
def aliased_headers(self):
return [self.aliases.get(h, h) for h in self.all_headers]
def set_sequence(self, seq):
model = self.table_model
model.clear()
model.setHorizontalHeaderLabels(self.aliased_headers)
for inst in seq:
items = [QtGui.QStandardItem(str(i)) for i in self.get_items(inst)]
for i in items:
i.setEditable(False)
model.appendRow(items)
self.table_view.resizeColumnsToContents()
self.condensed_view.setText(self.get_condensed())
def get_items(self, inst):
for name in self.all_headers:
if hasattr(self, 'get_'+name):
yield getattr(self, 'get_'+name)(inst)
else:
yield getattr(inst, name)
def get_label(self, inst):
label = inst.label_str()
if label == 'none':
label = '...'
return label
def get_jump(self, inst):
return inst.branch_str()
def get_length(self, inst):
return inst.length_str().replace(' ', '')
def get_condensed(self):
model = self.table_model
text = ''
for addr, i in enumerate(range(model.rowCount())):
length = str(model.item(i, 3).text())
line = ('[%d: %s]' % (addr, length)).ljust(14)
goto = str(model.item(i, 1).text())
if goto != ('GOTO %d' % (addr+1)):
line += goto + ', '
for j, header in enumerate(self.headers, len(self.base_headers)):
item = str(model.item(i, j).text())
default = self.defaults.get(header, '...')
if item != default:
name = self.aliases.get(header, header)
line += '%s:%s, ' % (name, item)
text += line[:-2] + '\n'
return text
class MasterTable(SequenceTable):
headers = [
'register_instruction',
'counter0', 'counter1',
'internal_function', 'se_addr', 'trigger_levels',
'external_function0', 'external_function1',
]
aliases = {
'register_instruction': 'Reg Op',
'counter0': 'C0',
'counter1': 'C1',
'internal_function': 'Int Fn',
'se_addr': 'SE',
'trigger_levels': 'trigger',
'external_function0': 'Ext Fn0',
'external_function1': 'Ext Fn1',
}
defaults = {
'trigger_levels': '[[0], [0]]'
}
def get_se_addr(self, inst):
if inst.update_estimation_params:
return inst.estimation_params_addr
else:
return '...'
class AnalogTable(SequenceTable):
headers = ['wave_address', 'mixer_amplitudes', 'ssb_reload']
aliases = {
'wave_address':'Wave Addr',
'mixer_amplitudes': 'Mixer',
'ssb_reload': 'ssb',
}
defaults = {
'wave_address': '0',
'mixer_amplitudes': '[0 0]',
}
def get_ssb_reload(self, inst):
return 'LOAD' if inst.ssb_reload else '...'
def get_mixer_amplitudes(self, inst):
s0 = 'calc' if inst.mixer_mask[0] else inst.mixer_amplitudes[0]
s1 = 'calc' if inst.mixer_mask[0] else inst.mixer_amplitudes[1]
return '[%s %s]' % (s0, s1)
class DigitalTable(SequenceTable):
headers = ['marker_levels']
aliases = {'marker_levels': 'levels'}
defaults = {'marker_levels': '[[0], [0]]'}
class HistoryWidget(QtGui.QWidget):
def __init__(self, experiment, dataserver):
self.experiment = experiment
self.dataserver = dataserver
super(HistoryWidget, self).__init__()
layout = QtGui.QVBoxLayout(self)
self.date_selector = QtGui.QComboBox()
self.inst_selector = QtGui.QComboBox()
self.plot = PlotWidget()
self.display_box = TextDisplay()
refresh_button = QtGui.QPushButton('Refresh')
load_button = QtGui.QPushButton('Load')
for button in (refresh_button, load_button):
button.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Fixed)
selector_layout = QtGui.QHBoxLayout()
selector_layout.addWidget(refresh_button)
selector_layout.addWidget(self.date_selector)
selector_layout.addWidget(self.inst_selector)
selector_layout.addWidget(load_button)
layout.addLayout(selector_layout)
layout.addWidget(self.plot)
layout.addWidget(self.display_box)
self.date_selector.currentIndexChanged.connect(self.set_available_insts)
refresh_button.clicked.connect(self.set_available_files)
load_button.clicked.connect(self.set_plot)
self.set_available_files()
def set_available_files(self):
self.date_selector.clear()
data_dir = os.path.join(self.experiment.directory, 'archive')
if not os.path.isdir(data_dir):
return
for fn in sorted(os.listdir(data_dir), reverse=True):
if fn.endswith('.h5'):
name = fn[:-3]
self.date_selector.addItem(name)
def get_file(self):
name = str(self.date_selector.currentText())
if not name:
return None
fpath = os.path.join(self.experiment.directory, 'archive', name+'.h5')
return self.dataserver.get_file(fpath)
def set_available_insts(self, index):
self.inst_selector.clear()
f = self.get_file()
if f is None:
return
items = []
for key in f.keys():
attrs = f[key].get_attrs()
if 'timestamp' not in attrs:
logger.warn('No timestamp on dataset %s', key)
continue
items.append((key, attrs['timestamp']))
for key, timestamp in sorted(items, key=lambda x: x[1], reverse=True):
self.inst_selector.addItem(key + ': (%s)' % timestamp)
def set_plot(self):
f_name = str(self.date_selector.currentText())
f = self.get_file()
name = str(self.inst_selector.currentText()).split(': ')[0]
logger.info('Plotting dataset %s from %s.h5', name, f_name)
if not name:
return
g = f[name]
self.experiment.plot(self.plot.fig, g)
self.plot.canvas.draw()
for name, val in g.get_attrs().items():
self.display_box.display(name, val)
class WavesWidget(QtGui.QTabWidget):
def __init__(self, experiment):
self.experiment = experiment
super(WavesWidget, self).__init__()
def add_wave(self, name, wave):
VBoxTab(self, name, PlotWidget(wave))
def load_waves(self):
self.clear()
for pos, arr in self.experiment.waves.items():
self.add_wave(str(pos), arr)
class CodeWidget(QtGui.QWidget):
def __init__(self, filename):
super(CodeWidget, self).__init__()
self.filename = filename
layout = QtGui.QVBoxLayout(self)
self.editor = SimplePythonEditor()
self.editor.textChanged.connect(self.check_modified)
layout.addWidget(self.editor)
self.save_button = QtGui.QPushButton("save")
self.save_button.clicked.connect(self.save_to_disk)
self.load_button = QtGui.QPushButton("load")
self.load_button.clicked.connect(self.load_from_disk)
buttons_layout = QtGui.QHBoxLayout()
buttons_layout.addWidget(self.save_button)
buttons_layout.addWidget(self.load_button)
layout.addLayout(buttons_layout)
save_shortcut = QtGui.QShortcut(QtGui.QKeySequence.Save, self)
save_shortcut.activated.connect(self.save_to_disk)
self.load_from_disk()
def check_modified(self):
modified = str(self.editor.text()) != self.text_on_disk
self.save_button.setEnabled(modified)
self.load_button.setEnabled(modified)
color = 'red' if modified else 'black'
size = 2 if modified else 1
style_sheet = 'QWidget { border: %dpx solid %s }' % (size, color)
self.editor.setStyleSheet(style_sheet)
def load_from_disk(self):
with open(self.filename, 'r') as f:
self.text_on_disk = f.read()
self.editor.setText(self.text_on_disk)
self.check_modified()
def save_to_disk(self):
# self.text_on_disk = str(self.editor.text()).replace('\r', '')
lines = str(self.editor.text()).split('\n')
self.text_on_disk = '\n'.join([l.rstrip() for l in lines])
self.editor.setText(self.text_on_disk)
with open(self.filename, 'w') as f:
f.write(self.text_on_disk)
self.check_modified()
logger.info('%s saved to disk', self.filename)
class AutoGridLayout(QtGui.QGridLayout):
def __init__(self, n_col, widget=None):
super(AutoGridLayout, self).__init__(widget)
self.n_col = n_col
self.cur_col = self.cur_row = 0
def addWidget(self, widget):
widget.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Fixed)
super(AutoGridLayout, self).addWidget(widget, self.cur_row, self.cur_col)
self.cur_col += 1
self.cur_col %= self.n_col
if self.cur_col == 0:
self.cur_row += 1
def addRow(self, w1, w2):
widget = QtGui.QWidget()
layout = QtGui.QHBoxLayout(widget)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(w1)
layout.addWidget(w2)
self.addWidget(widget)
class ResultRecordsWidget(QtGui.QWidget):
headers = [
's0', 's1', 'c0', 'c1', 'x0', 'x1',
'r0', 'r1', 'ir', 'ex0', 'ex1', 'calc_rec',
'time_stamp', 'addr_next', 'addr_curr',
'cycle', 'addr_integ', 'cnt0', 'cnt1',
'is0', 'is1', 'reg0', 'reg1', 'reg2', 'reg3'
]
def __init__(self, experiment):
super(ResultRecordsWidget, self).__init__()
self.experiment = experiment
self.model = QtGui.QStandardItemModel()
self.table = QtGui.QTableView()
self.table.setModel(self.model)
self.table.selectionModel().currentRowChanged.connect(self.update_seq_view)
self.master_seq = []
self.rrecs = []
self.master_seq_view = QtGui.QTextEdit()
self.master_seq_view.setTextInteractionFlags(
Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard
)
headers_check_layout = QtGui.QGridLayout()
layout = QtGui.QVBoxLayout(self)
splitter = QtGui.QSplitter(Qt.Horizontal)
layout.addWidget(splitter)
splitter.addWidget(self.table)
splitter.addWidget(self.master_seq_view)
layout.addLayout(headers_check_layout)
n_col = 7
load_button = QtGui.QPushButton("load")
all_button = QtGui.QPushButton("all")
none_button = QtGui.QPushButton("none")
headers_check_layout.addWidget(load_button, 0, 0)
headers_check_layout.addWidget(all_button, 0, 1)
headers_check_layout.addWidget(none_button, 0, 2)
for n, name in enumerate(self.headers, 3):
check = QtGui.QCheckBox(name)
check.setChecked(True)
i, j = n / n_col, n % n_col
headers_check_layout.addWidget(check, i, j)
def toggle(enabled, cb=check, col_n=n-3):
cb.setChecked(enabled)
self.table.setColumnHidden(col_n, not(enabled))
check.clicked.connect(toggle)
all_button.clicked.connect(lambda checked, toggle=toggle: toggle(True))
none_button.clicked.connect(lambda checked, toggle=toggle: toggle(False))
load_button.clicked.connect(self.set_rrecs)
def set_master_seq(self, seq_str):
self.master_seq = seq_str.split('\n')
self.master_seq_view.setPlainText(seq_str)
def update_seq_view(self, selected, deselected):
addr = self.rrecs[selected.row()].addr_curr
text = '<style>.selected {background-color: yellow;}</style>'
for i, line in enumerate(self.master_seq):
if i == addr:
text += '<span class=selected>%s</span>' % line
else:
text += line
text += '<br>'
self.master_seq_view.setHtml(text)
scroll = self.master_seq_view.verticalScrollBar()
pct = float(addr) / len(self.master_seq)
pos = int(scroll.maximum()) * pct
scroll.setValue(pos)
def set_rrecs(self):
self.model.clear()
self.model.setHorizontalHeaderLabels(self.headers)
self.rrecs = map(ResultRecord, self.experiment.get_rrecs())
for rrec in self.rrecs:
row = []
for name in self.headers:
val = getattr(rrec, name)
if isinstance(val, bool):
item = QtGui.QStandardItem(" ")
color = QtGui.QColor("green" if val else "red")
item.setBackground(QtGui.QBrush(color))
else:
item = QtGui.QStandardItem(str(val))
row.append(item)
self.model.appendRow(row)
self.table.resizeColumnsToContents()
class ParamsWidget(QtGui.QWidget):
def __init__(self, experiment, num_cols=3, bool_cols=5):
super(ParamsWidget, self).__init__()
layout = QtGui.QVBoxLayout(self)
bool_layout = AutoGridLayout(bool_cols)
if num_cols == 1:
num_layout = QtGui.QFormLayout()
else:
num_layout = AutoGridLayout(num_cols)
str_layout = QtGui.QFormLayout()
layout.addLayout(bool_layout)
layout.addLayout(num_layout)
layout.addLayout(str_layout)
str_layout.setFieldGrowthPolicy(QtGui.QFormLayout.AllNonFixedFieldsGrow)
self.param_widgets = {}
self.param_getters = {}
self.param_setters = {}
for name, val, param_cls in experiment.get_param_info():
self.param_widgets[name] = widget = param_cls.widget_cls()
def param_get(name=name, param_cls=param_cls, widget=widget):
return param_cls.from_widget(widget)
def param_set(new_val, name=name, param_cls=param_cls, widget=widget):
param_cls.to_widget(widget, new_val)
self.param_getters[name] = param_get
self.param_setters[name] = param_set
param_cls.to_widget(widget, val)
if issubclass(param_cls, base_experiment.BoolParameter):
widget.setText(name)
bool_layout.addWidget(widget)
elif issubclass(param_cls, base_experiment.NumParameter):
num_layout.addRow(QtGui.QLabel(name), widget)
elif issubclass(param_cls, base_experiment.StringParameter):
str_layout.addRow(name, widget)
else:
raise ValueError("Unknown parameter type %s" % param_cls)
def get_param(self, param_name):
val = self.param_getters[param_name]()
return val
def set_param(self, param_name, val):
self.param_setters[param_name](val)
def get_all_parameters(self):
return {name: self.get_param(name) for name in self.param_getters}
class TextDisplay(QtGui.QWidget):
def __init__(self):
super(TextDisplay, self).__init__()
self._layout = AutoGridLayout(4, self)
self.display_labels = {}
def display(self, name, val):
if name not in self.display_labels:
self.display_labels[name] = label = QtGui.QLabel()
label.setTextInteractionFlags(Qt.TextSelectableByMouse)
self._layout.addWidget(label)
self.display_labels[name].setText(name+": "+str(val))
class LogView(QtGui.QTextEdit):
def __init__(self):
super(LogView, self).__init__()
self.sections = {}
self.lines = []
self.setTextInteractionFlags(
Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard
)
self.setContextMenuPolicy(Qt.ActionsContextMenu)
self.check_layout = QtGui.QHBoxLayout()
self.follow_action = QtGui.QAction('Follow Latest', self)
self.follow_action.setCheckable(True)
self.follow_action.setChecked(True)
self.addAction(self.follow_action)
def add_section(self, name):
if name in self.sections:
return
self.sections[name] = action = QtGui.QAction('View ' + name, self)
action.setCheckable(True)
self.addAction(action)
if name in ('Experiment', 'GUI', 'Errors'):
action.setChecked(True)
action.triggered.connect(self.update_view)
@property
def currently_checked(self):
return [name for name, action in self.sections.items() if action.isChecked()]
def update_view(self):
items = [text for name, text in self.lines if name in self.currently_checked]
body = '<br>'.join(items)
head = '<style>.ERROR,.CRITICAL {font-weight: bold;}</style>'
head += '<style>.WARNING,.ERROR,.CRITICAL {color: red;}</style>'
head += '<style>.DEBUG {color: green;}</style>'
self.setHtml(head + body)
if self.follow_action.isChecked():
QtGui.QApplication.instance().processEvents()
scroll_bar = self.verticalScrollBar()
scroll_bar.setValue(scroll_bar.maximum())
def log_to(self, name, record):
self.lines.append((name, str(record)))
self.update_view()
class StatusBarHandler(logging.Handler):
def __init__(self, window):
super(StatusBarHandler, self).__init__()
self.status_bar = window.statusBar()
def emit(self, record):
self.status_bar.showMessage(record.getMessage())
QtGui.QApplication.instance().processEvents()
class CalibrationsTab(ParamsWidget):
pass
class CalibrationsWidget(QtGui.QTabWidget):
def __init__(self):
super(CalibrationsWidget, self).__init__()
import fpga_dsl
self.mode_params = {}
for mode in fpga_dsl.Mode.modes:
self.add_mode(mode)
def add_mode(self, mode):
self.mode_params[mode.name] = params_widget = ParamsWidget(mode, 1, 3)
button = QtGui.QPushButton('Save')
button.clicked.connect(mode.save_params)
widget = QtGui.QWidget()
layout = QtGui.QVBoxLayout(widget)
layout.addWidget(params_widget)
layout.addWidget(button)
self.addTab(widget, mode.name)
class LogViewHandler(logging.Handler):
def __init__(self, name, window):
super(LogViewHandler, self).__init__()
self.name = name
self.log_view = window.log_view
self.log_view.add_section(name)
self.setFormatter(fpga_config.formatter)
def emit(self, record):
html_record = cgi.escape(self.format(record))
text = '<span class=%s>%s</span>' % (record.levelname, html_record)
self.log_view.log_to(self.name, text)
QtGui.QApplication.instance().processEvents()
class PlotWidget(QtGui.QWidget):
def __init__(self, data=None):
super(PlotWidget, self).__init__()
self.fig = plt.figure()
self.canvas = FigureCanvasQTAgg(self.fig)
self.toolbar = NavigationToolbar2QT(self.canvas, self)
layout = QtGui.QVBoxLayout(self)
layout.addWidget(self.toolbar)
layout.addWidget(self.canvas)
if data is not None:
plt.plot(data)
self.canvas.draw()
class JournalWidget(QtGui.QTextEdit):
filename = 'journal.html'
def __init__(self):
super(JournalWidget, self).__init__()
if os.path.exists(self.filename):
with open(self.filename, 'r') as f:
self.setHtml(f.read())
def save(self):
with open(self.filename, 'w') as f:
f.write(str(self.toHtml()))
def add_image(self, img_path):
html = "<a href=%s><img src='%s' width=350></a>" % (img_path, img_path)
fragment = QtGui.QTextDocumentFragment.fromHtml(html)
self.textCursor().insertFragment(fragment)
self.save()
# def setSource(self, url):
# os.system('start ' + str(url.path()))
def human_time(t_seconds):
if t_seconds > 7200:
return "%.1f Hours" % (t_seconds / 3600.0)
elif t_seconds > 120:
return "%.1f Minutes" % (t_seconds / 60.0)
else:
return "%.1f Seconds" % (t_seconds)
if __name__ == "__main__":
import sys
# from PyQt4.QtTest import QTest
app = QtGui.QApplication([])
logger = logging.getLogger('exp gui')
logger.setLevel(fpga_config.log_level)
win = ExperimentWindow()
base_experiment.Parameter.gui = win
win.setup_logging()
win.showMaximized()
win.move(0, 0)
def test():
pass
app.lastWindowClosed.connect(sys.exit)
QtCore.QTimer.singleShot(100, test)
app.exec_()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment