Created
January 25, 2015 18:49
-
-
Save PhilReinhold/05b475549e4c37e2f95a to your computer and use it in GitHub Desktop.
PyQT Experiment GUI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ipython_widget import QIPythonWidget | |
from PyQt4 import QtGui, QtCore | |
from PyQt4.QtCore import Qt | |
import matplotlib.pyplot as plt | |
from matplotlib.backends.backend_qt4agg import FigureCanvasQTAgg, NavigationToolbar2QT | |
import objectsharer as objsh | |
import socket | |
import os | |
import logging | |
from instrumentserver.instrument_gui import InstrumentsContainer | |
from qsci_simple_pythoneditor import SimplePythonEditor | |
import base_experiment | |
from YngwieDecoding import ResultRecord | |
import traceback | |
import fpga_config | |
import importlib | |
import subprocess | |
import time | |
import cgi | |
BLOCK_FMT = 'Data_0x%04d[0x00]B0_#%04d.bin' | |
class ExperimentTab(QtGui.QWidget): | |
tab_actions = [ | |
('run', 'F5'), | |
('stop', 'F8'), | |
('clone', 'Ctrl+n'), | |
('reload', 'Ctrl+r'), | |
('update', 'Ctrl+u'), | |
('screenshot', None) | |
] | |
def __init__(self, experiment, window, name): | |
''' | |
:type experiment: base_experiment.BaseExperiment | |
:type window: ExperimentWindow | |
:type name: str | |
''' | |
super(ExperimentTab, self).__init__() | |
self.experiment = experiment | |
self.name = name | |
self.filename = str(os.path.join('experiments', *name.split('.'))) + '.py' | |
self.experiment.run_step.connect(self.update_progress) | |
self.experiment.run_finished.connect(self.run_finished) | |
self.window = window | |
self.view_box = QtGui.QTabWidget() | |
self.main_plot = PlotWidget() | |
self.code_widget = CodeWidget(self.filename) | |
self.history_widget = HistoryWidget(experiment, window.dataserver) | |
self.log_view = QtGui.QPlainTextEdit() | |
self.log_view.document().setDefaultFont(QtGui.QFont("Consolas", 10)) | |
log_view_handler = LogViewHandler('Experiment', window) | |
log_view_handler.setFormatter(fpga_config.formatter) | |
self.experiment.logger.addHandler(log_view_handler) | |
layout = QtGui.QVBoxLayout(self) | |
layout.addWidget(self.view_box) | |
VBoxTab(self.view_box, 'plot', self.main_plot, key='Alt+p') | |
VBoxTab(self.view_box, 'code', self.code_widget, 'Alt+c') | |
VBoxTab(self.view_box, 'history', self.history_widget, 'Alt+h') | |
VBoxTab(self.view_box, 'log', self.log_view, 'Alt+l') | |
self.param_box = ParamsWidget(experiment) | |
layout.addWidget(self.param_box) | |
self.display_box = TextDisplay() | |
layout.addWidget(self.display_box) | |
button_layout = QtGui.QHBoxLayout() | |
layout.addLayout(button_layout) | |
self.buttons = {} | |
for name, keys in self.tab_actions: | |
proper_name = name.capitalize().replace('_', ' ') | |
action = QtGui.QAction(proper_name, self) | |
if keys is not None: | |
action.setShortcut(QtGui.QKeySequence(keys)) | |
action.triggered.connect(lambda x=None, n=name: getattr(self, n)()) | |
self.addAction(action) | |
self.buttons[name] = button = QtGui.QPushButton(proper_name) | |
button.clicked.connect(action.triggered) | |
button_layout.addWidget(button) | |
self.buttons['update'].setEnabled(hasattr(self.experiment, 'update')) | |
self.buttons['run'].clicked.connect(lambda: self.view_box.setCurrentIndex(0)) | |
log_file = self.experiment.log_file | |
self.file_watcher = QtCore.QFileSystemWatcher([self.filename, log_file]) | |
def file_changed(path): | |
path = str(path) | |
if path == self.filename: | |
style_sheet = "QPushButton { background-color: red }" | |
self.buttons['reload'].setStyleSheet(style_sheet) | |
QtCore.QTimer.singleShot(100, self.code_widget.load_from_disk) | |
elif path == log_file: | |
with open(log_file, 'r') as f: | |
text = f.read() | |
self.log_view.setPlainText(text) | |
QtGui.QApplication.instance().processEvents() | |
scroll_bar = self.log_view.verticalScrollBar() | |
scroll_bar.setValue(scroll_bar.maximum()) | |
else: | |
logger.warn('Unknown path %s change notified', path) | |
self.file_watcher.fileChanged.connect(file_changed) | |
if os.path.exists(log_file): | |
file_changed(log_file) | |
def plot(self): | |
self.main_plot.fig.clear() | |
self.experiment.plot_last(self.main_plot.fig) | |
for name, val in self.experiment.fit_params.items(): | |
fmt_str, denom = self.experiment.fit_fmt.get(name, ("%.2e", 1)) | |
self.display_box.display(name, fmt_str % (val / denom)) | |
self.main_plot.canvas.draw() | |
def clone(self): | |
logger.info('cloning %s, getting path', self.name) | |
expt_path = os.path.abspath('experiments') | |
new_path = str(QtGui.QFileDialog.getSaveFileName( | |
self, 'Copy %s to' % self.name, expt_path, 'Python Files (*.py)' | |
)) | |
new_classname = os.path.basename(new_path).split('.py')[0] | |
self_path = os.path.join(expt_path, *self.name.split('.')) + '.py' | |
self_classname = os.path.basename(self_path).split('.py')[0] | |
with open(self_path, 'r') as f: | |
self_content = f.read() | |
logging.debug('old %s --> new %s', self_classname, new_classname) | |
new_content = self_content.replace('class %s' % self_classname, 'class %s' % new_classname) | |
with open(new_path, 'w') as f: | |
f.write(new_content) | |
logger.info('cloned %s --> %s', self_path, new_path) | |
def reload(self): | |
self.window.reload_tab(self) | |
def run(self): | |
logger.info('Running %s', self.name) | |
self.window.progress_bar.setMaximum(self.experiment.n_steps) | |
self.window.progress_bar.setValue(0) | |
self.window.instruments.save_instruments('ins_settings.txt') | |
self.experiment.run() | |
def stop(self): | |
logger.info('Aborted %s', self.name) | |
self.experiment.stop() | |
QtGui.QApplication.instance().processEvents() | |
def update(self): | |
self.experiment.update() | |
def run_finished(self): | |
self.window.progress_bar.setValue(0) | |
import numpy | |
vars = { | |
'np': numpy, | |
'exp': self.experiment, | |
'raw_data': self.experiment.raw_data, | |
'processed_data': self.experiment.processed_data, | |
'run_params': self.experiment.run_params, | |
'run_inst_params': self.experiment.run_inst_params, | |
'fit_params': self.experiment.fit_params, | |
'fig': self.main_plot.fig, | |
} | |
win.shell.pushVariables(vars) | |
logger.info('run finished') | |
def update_progress(self, cur_blocks, tot_blocks): | |
self.plot() | |
if self.experiment.is_running: | |
self.window.progress_bar.setMaximum(tot_blocks) | |
self.window.progress_bar.setValue(cur_blocks) | |
logger.info('%s / %s blocks', cur_blocks, tot_blocks) | |
def get_state(self): | |
return self.param_box.get_all_parameters() | |
def screenshot(self): | |
ss_dir = os.path.join(self.experiment.directory, 'screenshots') | |
if not os.path.exists(ss_dir): | |
os.mkdir(ss_dir) | |
fn = os.path.join(ss_dir, time.strftime('%Y%m%d_%H%M%S.png')) | |
QtGui.QPixmap.grabWidget(self, self.rect()).save(fn, 'PNG') | |
logger.info('Saving screenshot %s', fn) | |
# self.window.journal.add_image(fn) | |
class FPGAExperimentTab(ExperimentTab): | |
tab_actions = ExperimentTab.tab_actions + [ | |
('make_tables', 'Ctrl+t'), | |
] | |
def __init__(self, experiment, window, name): | |
super(FPGAExperimentTab, self).__init__(experiment, window, name) | |
self.poll_timer = QtCore.QTimer() | |
self.poll_timer.timeout.connect(self.experiment.process_available_blocks) | |
self.experiment.run_started.connect(lambda: self.poll_timer.start(500)) | |
self.tables_widget = TablesWidget(experiment) | |
self.rrec_widget = ResultRecordsWidget(experiment) | |
self.waves_widget = WavesWidget(experiment) | |
VBoxTab(self.view_box, 'tables', self.tables_widget, 'Alt+t') | |
VBoxTab(self.view_box, 'rrecs', self.rrec_widget, 'Alt+r') | |
VBoxTab(self.view_box, 'waves', self.waves_widget, 'Alt+w') | |
self.param_box.param_widgets['n_blocks'].textChanged.connect(self.update_seq_time) | |
self.param_box.param_widgets['averages_per_block'].textChanged.connect(self.update_seq_time) | |
self.buttons['run'].setDisabled(window.exp_is_running) | |
self.buttons['stop'].setEnabled(window.exp_is_running) | |
window.exp_running_sig.connect(self.buttons['run'].setDisabled) | |
window.exp_running_sig.connect(self.buttons['stop'].setEnabled) | |
# self.buttons['make_tables'].clicked.connect(lambda: self.view_box.setCurrentIndex(1)) | |
def run(self): | |
self.make_tables() | |
super(FPGAExperimentTab, self).run() | |
self.window.exp_running_sig.emit(True) | |
def run_finished(self): | |
self.poll_timer.stop() | |
self.window.exp_running_sig.emit(False) | |
super(FPGAExperimentTab, self).run_finished() | |
def make_tables(self): | |
try: | |
self.experiment.compile_raw() | |
self.tables_widget.set_raw_seq() | |
except Exception as e: | |
self.tables_widget.dsl_table.setText(str(e)) | |
try: | |
self.experiment.write_tables() | |
self.update_seq_time() | |
self.tables_widget.set_sequences() | |
self.waves_widget.load_waves() | |
master_seq = self.tables_widget.master_table.get_condensed() | |
self.rrec_widget.set_master_seq(master_seq) | |
except Exception as e: | |
self.tables_widget.master_table.condensed_view.setText(str(e)) | |
raise | |
def update_seq_time(self): | |
self.seq_time = self.experiment.seq_time | |
averages_per_block = self.experiment.averages_per_block | |
n_blocks = self.experiment.n_blocks | |
averages = averages_per_block * n_blocks | |
expected_time = (4 * self.seq_time * averages) / 1e9 | |
time_str = human_time(expected_time) | |
expected_block_time = (4 * self.seq_time * averages_per_block) / 1e9 | |
self.display_box.display('Averages', averages) | |
self.display_box.display('Estimated time', time_str) | |
self.display_box.display('Block time', human_time(expected_block_time)) | |
self.display_box.display('Result Shape', self.experiment.result_shape) | |
self.buttons['run'].setText('run (%s)' % time_str) | |
class ScanExperimentTab(ExperimentTab): | |
def __init__(self, experiment, window, name): | |
super(ScanExperimentTab, self).__init__(experiment, window, name) | |
self.points_combo_box = QtGui.QComboBox() | |
self.points_combo_box.currentIndexChanged.connect(self.point_selected) | |
self.points_plot = PlotWidget() | |
points_layout = VBoxTab(self.view_box, 'points') | |
points_layout.addWidget(self.points_combo_box) | |
points_layout.addWidget(self.points_plot) | |
def plot(self): | |
super(ScanExperimentTab, self).plot() | |
n_new = len(self.experiment.points_data) | |
n_old = self.points_combo_box.count() | |
if n_old > n_new: | |
self.points_combo_box.clear() | |
self.points_combo_box.addItems(map(str,range(n_new))) | |
for i in range(n_old, n_new): | |
self.points_combo_box.addItem(str(i)) | |
def point_selected(self): | |
name = str(self.points_combo_box.currentText()) | |
logger.debug('point: %s', name) | |
if name != '': | |
data = self.experiment.points_data[int(name)] | |
self.experiment.child_instance.plot(self.points_plot.fig, data) | |
self.points_plot.canvas.draw() | |
class ExperimentListModel(QtGui.QFileSystemModel): | |
def __init__(self): | |
super(ExperimentListModel, self).__init__() | |
self.setRootPath(os.path.abspath('experiments')) | |
self.setNameFilters(['[^_]?*.py']) | |
self.setNameFilterDisables(False) | |
def columnCount(self, index): | |
return 1 | |
def data(self, index, role): | |
ret = super(ExperimentListModel, self).data(index, role) | |
if role == Qt.DisplayRole: | |
return str(ret).replace('.py', '') | |
if role == Qt.DecorationRole: | |
return None | |
return ret | |
class ExperimentList(QtGui.QTreeView): | |
def __init__(self): | |
super(ExperimentList, self).__init__() | |
model = ExperimentListModel() | |
self.setModel(model) | |
self.setRootIndex(model.index(os.path.abspath('experiments'))) | |
enter_key = QtGui.QShortcut(Qt.Key_Return, self) | |
enter_key.setContext(Qt.WidgetShortcut) | |
enter_key.activated.connect(self.toggle_current) | |
def toggle_current(self): | |
idx = self.currentIndex() | |
if self.model().hasChildren(idx): | |
self.setExpanded(idx, not(self.isExpanded(idx))) | |
else: | |
self.doubleClicked.emit(idx) | |
class ExperimentWindow(QtGui.QMainWindow): | |
exp_running_sig = QtCore.pyqtSignal(bool) | |
exp_is_running = False | |
def __init__(self): | |
super(ExperimentWindow, self).__init__() | |
self.tab_widget = QtGui.QTabWidget() | |
self.tab_widget.setTabsClosable(True) | |
self.tab_widget.setMovable(True) | |
self.tab_widget.tabCloseRequested.connect(self.close_tab) | |
self.tab_widget.currentChanged.connect(self.set_experiment_actions) | |
self.opened = {} | |
self.experiments = {} | |
self.exp_list = ExperimentList() | |
self.exp_list.doubleClicked.connect(self.load_from_list) | |
self.shell = QIPythonWidget() | |
self.shell.pushVariables({'exps':self.experiments}) | |
self.log_view = LogView() | |
# self.journal = JournalWidget() | |
self.calibrations = CalibrationsWidget() | |
self.get_remote_objects() | |
self.instruments_container = InstrumentsContainer(self.instruments) | |
self.progress_bar = QtGui.QProgressBar() | |
self.progress_bar.setStyleSheet( | |
"QProgressBar {" | |
" border: 2px solid grey;" | |
" border-radius: 0px;" | |
" text-align: center;}" | |
"QProgressBar::chunk {" | |
" background-color: #3add36;" | |
" width: 1px;}" | |
) | |
self.statusBar().addPermanentWidget(self.progress_bar) | |
self.exp_running_sig.connect(lambda val: setattr(self, 'exp_is_running', val)) | |
file_menu = self.menuBar().addMenu('File') | |
file_menu.addAction('Reload all modules', self.reload_all, QtGui.QKeySequence('Ctrl+Shift+R')) | |
self.experiment_menu = self.menuBar().addMenu('Experiment') | |
self.view_menu = view_menu = self.menuBar().addMenu('View') | |
def increment_tab(): | |
new_idx = self.tab_widget.currentIndex() + 1 | |
new_idx %= self.tab_widget.count() | |
self.tab_widget.setCurrentIndex(new_idx) | |
def decrement_tab(): | |
new_idx = self.tab_widget.currentIndex() - 1 | |
new_idx %= self.tab_widget.count() | |
self.tab_widget.setCurrentIndex(new_idx) | |
view_menu.addAction('Previous Experiment Tab', decrement_tab, QtGui.QKeySequence('Ctrl+[')) | |
view_menu.addAction('Next Experiment Tab', increment_tab, QtGui.QKeySequence('Ctrl+]')) | |
condense_tables_action = view_menu.addAction('Condensed Tables') | |
SequenceTable.condense_action = condense_tables_action | |
condense_tables_action.setCheckable(True) | |
condense_tables_action.setChecked(True) | |
self.setCentralWidget(self.tab_widget) | |
self.add_dock(self.exp_list, 'Experiments', 'Left', 150) | |
self.add_dock(self.shell, 'Shell', 'Right') | |
self.add_dock(self.log_view, 'Log', 'Left') | |
inst_dock = self.add_dock(self.instruments_container, 'Instruments', 'Right', 300) | |
calib_dock = self.add_dock(self.calibrations, 'Calibrations', 'Right') | |
self.tabifyDockWidget(inst_dock, calib_dock) | |
instruments_menu = self.menuBar().addMenu('Instruments') | |
instruments_view_menu = instruments_menu.addMenu('View') | |
for action in self.instruments_container.actions(): | |
instruments_menu.addAction(action) | |
for name, action in self.instruments_container.show_tab_actions.items(): | |
instruments_view_menu.addAction(action) | |
if name not in ('qubit_info', 'cavity_info', 'system_info', 'yngwie'): | |
action.setChecked(False) | |
objsh.backend.add_qt_timer() | |
logger.info('Ready') | |
def add_dock(self, widget, name, location, min_width=None): | |
dock = QtGui.QDockWidget(name) | |
dock.setWidget(widget) | |
self.view_menu.addAction(dock.toggleViewAction()) | |
loc_const = getattr(Qt, location+'DockWidgetArea') | |
self.addDockWidget(loc_const, dock) | |
if min_width is not None: | |
dock.setMinimumWidth(min_width) | |
return dock | |
def setup_logging(self): | |
self.log_view.add_section('Experiment') | |
self.log_view.add_section('Errors') | |
def excepthook(type, value, tb): | |
tb_str = '\n'.join(traceback.format_exception(type, value, tb)) | |
print tb_str | |
win.log_view.log_to('Errors', '<pre>%s</pre>' % cgi.escape(tb_str)) | |
sys.excepthook = excepthook | |
logger.addHandler(StatusBarHandler(self)) | |
logger.addHandler(LogViewHandler('GUI', self)) | |
objsh.logger.addHandler(LogViewHandler('Objsh', self)) | |
def set_experiment_actions(self, tab_index): | |
tab = self.tab_widget.widget(tab_index) | |
self.experiment_menu.clear() | |
if tab is None: | |
return | |
for action in tab.actions(): | |
self.experiment_menu.addAction(action) | |
def get_remote_objects(self): | |
objsh.backend.start_server('127.0.0.1', 55557) | |
objsh.register(self, 'experiment_gui') | |
self.dataserver = self.get_dataserver() | |
self.instruments = self.get_instruments() | |
try: | |
self.yng = self.instruments['yngwie'] | |
except KeyError: | |
logger.error('Yngwie not present in instruments, create it?') | |
# for i, name in enumerate(('yngwie', 'qubit_info', 'cavity_info', 'system_info')): | |
# tab = self.instruments_container._ins_widgets[name] | |
# index = self.instruments_container.indexOf(tab) | |
# self.instruments_container.tabBar().moveTab(index, i) | |
def load_from_list(self, index): | |
full_path = str(self.exp_list.model().filePath(index).replace('.py', '')) | |
path = os.path.relpath(full_path, 'experiments').split(os.sep) | |
name = '.'.join(path) | |
if name not in self.opened: | |
self.add_tab(name) | |
else: | |
tab_index = self.tab_widget.indexOf(self.opened[name]) | |
self.tab_widget.setCurrentIndex(tab_index) | |
def reload_all(self): | |
logger.info('Reload all') | |
import fpga_compiler | |
reload(fpga_compiler) | |
import base_experiment | |
reload(base_experiment) | |
base_experiment.Parameter.gui = self | |
import fpga_dsl | |
reload(fpga_dsl) | |
for tab in self.opened.values(): | |
self.reload_tab(tab, view=False) | |
def reload_tab(self, tab, view=True): | |
logger.info('Reloading %s', tab.name) | |
name = tab.experiment.name | |
index = self.tab_widget.indexOf(tab) | |
view_index = tab.view_box.currentIndex() | |
params = tab.experiment.get_params() | |
self.close_tab(index) | |
tab = self.add_tab(name) | |
tab.experiment.set_params(params) | |
def set_index(): | |
self.opened[name].view_box.setCurrentIndex(view_index) | |
if view: | |
QtCore.QTimer.singleShot(600, set_index) | |
def add_tab(self, name): | |
if name in self.opened: | |
return self.opened[name] | |
logger.info('Adding tab %s', name) | |
mod_name = 'experiments.' + name | |
mod = importlib.import_module(mod_name) | |
reload(mod) | |
self.experiments[name] = exp = getattr(mod, name.split('.')[-1])(name) | |
tab_cls = FPGAExperimentTab | |
if isinstance(exp, base_experiment.ScanExperiment): | |
child_inst = self.add_tab(exp.child_experiment).experiment | |
exp.child_instance = child_inst | |
tab_cls = ScanExperimentTab | |
self.opened[name] = tab = tab_cls(exp, self, name) | |
index = self.tab_widget.addTab(tab, name) | |
self.tab_widget.setCurrentIndex(index) | |
return tab | |
def close_tab(self, tab_index): | |
tab = self.tab_widget.widget(tab_index) | |
if tab.experiment.is_running: | |
logger.error('Stop experiment before closing!') | |
return | |
logger.info('Closing %s', tab.name) | |
self.opened.pop(tab.name) | |
self.experiments.pop(tab.name) | |
self.tab_widget.removeTab(tab_index) | |
def get_param(self, name, param_name): | |
if name in self.opened: | |
return self.opened[name].param_box.get_param(param_name) | |
elif name in self.calibrations.mode_params: | |
return self.calibrations.mode_params[name].get_param(param_name) | |
else: | |
return None | |
def set_param(self, exp_name, param_name, val): | |
if exp_name in self.opened: | |
self.opened[exp_name].param_box.set_param(param_name, val) | |
def get_instruments(self): | |
working_dir = os.path.join(fpga_config.qrlab_directory, 'instrumentserver') | |
fn = os.path.join(working_dir, 'instruments_server.py') | |
return self.get_object('instruments', fn, 55555, cwd=working_dir) | |
def get_dataserver(self): | |
fn = os.path.join( | |
fpga_config.qrlab_directory, 'dataserver', 'dataserver.py' | |
) | |
return self.get_object('dataserver', fn, 55556) | |
def get_object(self, objname, fn, port, **kwargs): | |
addr = 'tcp://127.0.0.1:%d' % port | |
try: | |
objsh.backend.connect_to(addr) | |
except socket.error: | |
logger.warn('Could not connect to %s, launching server', addr) | |
DETACHED = 8 # magic | |
cmd = [sys.executable, fn] | |
logger.debug(cmd) | |
subprocess.Popen(cmd, creationflags=DETACHED, shell=True, **kwargs) | |
time.sleep(3) | |
objsh.backend.connect_to(addr) | |
logger.info('Connected to %s', addr) | |
obj = objsh.find_object(objname) | |
assert obj is not None, "Could not find object" | |
return obj | |
class VBoxTab(QtGui.QVBoxLayout): | |
def __init__(self, container, text, main_widget=None, key=None): | |
self.widget = QtGui.QWidget() | |
super(VBoxTab, self).__init__(self.widget) | |
container.addTab(self.widget, text) | |
if main_widget is not None: | |
self.addWidget(main_widget) | |
if key is not None: | |
key_action = QtGui.QAction('View %s' % text, container) | |
key_action.setShortcut(QtGui.QKeySequence(key)) | |
def set_visible(): | |
container.setCurrentIndex(container.indexOf(self.widget)) | |
key_action.triggered.connect(set_visible) | |
container.addAction(key_action) | |
container.setContextMenuPolicy(Qt.ActionsContextMenu) | |
class TablesWidget(QtGui.QTabWidget): | |
def __init__(self, experiment): | |
super(TablesWidget, self).__init__() | |
self.experiment = experiment | |
self.master_table = MasterTable() | |
self.analog0_table = AnalogTable() | |
self.analog1_table = AnalogTable() | |
self.digital_table = DigitalTable() | |
self.dsl_table = QtGui.QTextEdit() | |
self.dsl_table.setReadOnly(True) | |
VBoxTab(self, 'Master', self.master_table) | |
VBoxTab(self, 'Analog 0', self.analog0_table) | |
VBoxTab(self, 'Analog 1', self.analog1_table) | |
VBoxTab(self, 'Digital', self.digital_table) | |
VBoxTab(self, 'DSL Raw', self.dsl_table) | |
def set_sequences(self): | |
logger.info('Updating tables') | |
mseq, (a0seq, a1seq), (dseq,) = self.experiment.sequences | |
self.master_table.set_sequence(mseq) | |
self.analog0_table.set_sequence(a0seq) | |
self.analog1_table.set_sequence(a1seq) | |
self.digital_table.set_sequence(dseq) | |
self.set_raw_seq() | |
def set_raw_seq(self): | |
self.dsl_table.setText(self.experiment.print_raw(to_string=True)) | |
class SequenceTable(QtGui.QWidget): | |
base_headers = [ | |
'addr', | |
'jump', | |
'label', | |
'length', | |
] | |
headers = [] | |
aliases = {} | |
defaults = {} | |
condense_action = None | |
def __init__(self): | |
super(SequenceTable, self).__init__() | |
self.table_view = QtGui.QTableView() | |
self.table_model = QtGui.QStandardItemModel() | |
self.table_view.setModel(self.table_model) | |
self.condensed_view = QtGui.QTextEdit() | |
self.condensed_view.setReadOnly(True) | |
self.condensed_view.setCurrentFont(QtGui.QFont("Consolas", 10)) | |
layout = QtGui.QVBoxLayout(self) | |
layout.addWidget(self.table_view) | |
layout.addWidget(self.condensed_view) | |
def set_view(condensed): | |
self.condensed_view.setVisible(condensed) | |
self.table_view.setVisible(not condensed) | |
SequenceTable.condense_action.toggled.connect(set_view) | |
set_view(SequenceTable.condense_action.isChecked()) | |
@property | |
def all_headers(self): | |
return self.base_headers + self.headers | |
@property | |
def aliased_headers(self): | |
return [self.aliases.get(h, h) for h in self.all_headers] | |
def set_sequence(self, seq): | |
model = self.table_model | |
model.clear() | |
model.setHorizontalHeaderLabels(self.aliased_headers) | |
for inst in seq: | |
items = [QtGui.QStandardItem(str(i)) for i in self.get_items(inst)] | |
for i in items: | |
i.setEditable(False) | |
model.appendRow(items) | |
self.table_view.resizeColumnsToContents() | |
self.condensed_view.setText(self.get_condensed()) | |
def get_items(self, inst): | |
for name in self.all_headers: | |
if hasattr(self, 'get_'+name): | |
yield getattr(self, 'get_'+name)(inst) | |
else: | |
yield getattr(inst, name) | |
def get_label(self, inst): | |
label = inst.label_str() | |
if label == 'none': | |
label = '...' | |
return label | |
def get_jump(self, inst): | |
return inst.branch_str() | |
def get_length(self, inst): | |
return inst.length_str().replace(' ', '') | |
def get_condensed(self): | |
model = self.table_model | |
text = '' | |
for addr, i in enumerate(range(model.rowCount())): | |
length = str(model.item(i, 3).text()) | |
line = ('[%d: %s]' % (addr, length)).ljust(14) | |
goto = str(model.item(i, 1).text()) | |
if goto != ('GOTO %d' % (addr+1)): | |
line += goto + ', ' | |
for j, header in enumerate(self.headers, len(self.base_headers)): | |
item = str(model.item(i, j).text()) | |
default = self.defaults.get(header, '...') | |
if item != default: | |
name = self.aliases.get(header, header) | |
line += '%s:%s, ' % (name, item) | |
text += line[:-2] + '\n' | |
return text | |
class MasterTable(SequenceTable): | |
headers = [ | |
'register_instruction', | |
'counter0', 'counter1', | |
'internal_function', 'se_addr', 'trigger_levels', | |
'external_function0', 'external_function1', | |
] | |
aliases = { | |
'register_instruction': 'Reg Op', | |
'counter0': 'C0', | |
'counter1': 'C1', | |
'internal_function': 'Int Fn', | |
'se_addr': 'SE', | |
'trigger_levels': 'trigger', | |
'external_function0': 'Ext Fn0', | |
'external_function1': 'Ext Fn1', | |
} | |
defaults = { | |
'trigger_levels': '[[0], [0]]' | |
} | |
def get_se_addr(self, inst): | |
if inst.update_estimation_params: | |
return inst.estimation_params_addr | |
else: | |
return '...' | |
class AnalogTable(SequenceTable): | |
headers = ['wave_address', 'mixer_amplitudes', 'ssb_reload'] | |
aliases = { | |
'wave_address':'Wave Addr', | |
'mixer_amplitudes': 'Mixer', | |
'ssb_reload': 'ssb', | |
} | |
defaults = { | |
'wave_address': '0', | |
'mixer_amplitudes': '[0 0]', | |
} | |
def get_ssb_reload(self, inst): | |
return 'LOAD' if inst.ssb_reload else '...' | |
def get_mixer_amplitudes(self, inst): | |
s0 = 'calc' if inst.mixer_mask[0] else inst.mixer_amplitudes[0] | |
s1 = 'calc' if inst.mixer_mask[0] else inst.mixer_amplitudes[1] | |
return '[%s %s]' % (s0, s1) | |
class DigitalTable(SequenceTable): | |
headers = ['marker_levels'] | |
aliases = {'marker_levels': 'levels'} | |
defaults = {'marker_levels': '[[0], [0]]'} | |
class HistoryWidget(QtGui.QWidget): | |
def __init__(self, experiment, dataserver): | |
self.experiment = experiment | |
self.dataserver = dataserver | |
super(HistoryWidget, self).__init__() | |
layout = QtGui.QVBoxLayout(self) | |
self.date_selector = QtGui.QComboBox() | |
self.inst_selector = QtGui.QComboBox() | |
self.plot = PlotWidget() | |
self.display_box = TextDisplay() | |
refresh_button = QtGui.QPushButton('Refresh') | |
load_button = QtGui.QPushButton('Load') | |
for button in (refresh_button, load_button): | |
button.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Fixed) | |
selector_layout = QtGui.QHBoxLayout() | |
selector_layout.addWidget(refresh_button) | |
selector_layout.addWidget(self.date_selector) | |
selector_layout.addWidget(self.inst_selector) | |
selector_layout.addWidget(load_button) | |
layout.addLayout(selector_layout) | |
layout.addWidget(self.plot) | |
layout.addWidget(self.display_box) | |
self.date_selector.currentIndexChanged.connect(self.set_available_insts) | |
refresh_button.clicked.connect(self.set_available_files) | |
load_button.clicked.connect(self.set_plot) | |
self.set_available_files() | |
def set_available_files(self): | |
self.date_selector.clear() | |
data_dir = os.path.join(self.experiment.directory, 'archive') | |
if not os.path.isdir(data_dir): | |
return | |
for fn in sorted(os.listdir(data_dir), reverse=True): | |
if fn.endswith('.h5'): | |
name = fn[:-3] | |
self.date_selector.addItem(name) | |
def get_file(self): | |
name = str(self.date_selector.currentText()) | |
if not name: | |
return None | |
fpath = os.path.join(self.experiment.directory, 'archive', name+'.h5') | |
return self.dataserver.get_file(fpath) | |
def set_available_insts(self, index): | |
self.inst_selector.clear() | |
f = self.get_file() | |
if f is None: | |
return | |
items = [] | |
for key in f.keys(): | |
attrs = f[key].get_attrs() | |
if 'timestamp' not in attrs: | |
logger.warn('No timestamp on dataset %s', key) | |
continue | |
items.append((key, attrs['timestamp'])) | |
for key, timestamp in sorted(items, key=lambda x: x[1], reverse=True): | |
self.inst_selector.addItem(key + ': (%s)' % timestamp) | |
def set_plot(self): | |
f_name = str(self.date_selector.currentText()) | |
f = self.get_file() | |
name = str(self.inst_selector.currentText()).split(': ')[0] | |
logger.info('Plotting dataset %s from %s.h5', name, f_name) | |
if not name: | |
return | |
g = f[name] | |
self.experiment.plot(self.plot.fig, g) | |
self.plot.canvas.draw() | |
for name, val in g.get_attrs().items(): | |
self.display_box.display(name, val) | |
class WavesWidget(QtGui.QTabWidget): | |
def __init__(self, experiment): | |
self.experiment = experiment | |
super(WavesWidget, self).__init__() | |
def add_wave(self, name, wave): | |
VBoxTab(self, name, PlotWidget(wave)) | |
def load_waves(self): | |
self.clear() | |
for pos, arr in self.experiment.waves.items(): | |
self.add_wave(str(pos), arr) | |
class CodeWidget(QtGui.QWidget): | |
def __init__(self, filename): | |
super(CodeWidget, self).__init__() | |
self.filename = filename | |
layout = QtGui.QVBoxLayout(self) | |
self.editor = SimplePythonEditor() | |
self.editor.textChanged.connect(self.check_modified) | |
layout.addWidget(self.editor) | |
self.save_button = QtGui.QPushButton("save") | |
self.save_button.clicked.connect(self.save_to_disk) | |
self.load_button = QtGui.QPushButton("load") | |
self.load_button.clicked.connect(self.load_from_disk) | |
buttons_layout = QtGui.QHBoxLayout() | |
buttons_layout.addWidget(self.save_button) | |
buttons_layout.addWidget(self.load_button) | |
layout.addLayout(buttons_layout) | |
save_shortcut = QtGui.QShortcut(QtGui.QKeySequence.Save, self) | |
save_shortcut.activated.connect(self.save_to_disk) | |
self.load_from_disk() | |
def check_modified(self): | |
modified = str(self.editor.text()) != self.text_on_disk | |
self.save_button.setEnabled(modified) | |
self.load_button.setEnabled(modified) | |
color = 'red' if modified else 'black' | |
size = 2 if modified else 1 | |
style_sheet = 'QWidget { border: %dpx solid %s }' % (size, color) | |
self.editor.setStyleSheet(style_sheet) | |
def load_from_disk(self): | |
with open(self.filename, 'r') as f: | |
self.text_on_disk = f.read() | |
self.editor.setText(self.text_on_disk) | |
self.check_modified() | |
def save_to_disk(self): | |
# self.text_on_disk = str(self.editor.text()).replace('\r', '') | |
lines = str(self.editor.text()).split('\n') | |
self.text_on_disk = '\n'.join([l.rstrip() for l in lines]) | |
self.editor.setText(self.text_on_disk) | |
with open(self.filename, 'w') as f: | |
f.write(self.text_on_disk) | |
self.check_modified() | |
logger.info('%s saved to disk', self.filename) | |
class AutoGridLayout(QtGui.QGridLayout): | |
def __init__(self, n_col, widget=None): | |
super(AutoGridLayout, self).__init__(widget) | |
self.n_col = n_col | |
self.cur_col = self.cur_row = 0 | |
def addWidget(self, widget): | |
widget.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Fixed) | |
super(AutoGridLayout, self).addWidget(widget, self.cur_row, self.cur_col) | |
self.cur_col += 1 | |
self.cur_col %= self.n_col | |
if self.cur_col == 0: | |
self.cur_row += 1 | |
def addRow(self, w1, w2): | |
widget = QtGui.QWidget() | |
layout = QtGui.QHBoxLayout(widget) | |
layout.setContentsMargins(0, 0, 0, 0) | |
layout.addWidget(w1) | |
layout.addWidget(w2) | |
self.addWidget(widget) | |
class ResultRecordsWidget(QtGui.QWidget): | |
headers = [ | |
's0', 's1', 'c0', 'c1', 'x0', 'x1', | |
'r0', 'r1', 'ir', 'ex0', 'ex1', 'calc_rec', | |
'time_stamp', 'addr_next', 'addr_curr', | |
'cycle', 'addr_integ', 'cnt0', 'cnt1', | |
'is0', 'is1', 'reg0', 'reg1', 'reg2', 'reg3' | |
] | |
def __init__(self, experiment): | |
super(ResultRecordsWidget, self).__init__() | |
self.experiment = experiment | |
self.model = QtGui.QStandardItemModel() | |
self.table = QtGui.QTableView() | |
self.table.setModel(self.model) | |
self.table.selectionModel().currentRowChanged.connect(self.update_seq_view) | |
self.master_seq = [] | |
self.rrecs = [] | |
self.master_seq_view = QtGui.QTextEdit() | |
self.master_seq_view.setTextInteractionFlags( | |
Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard | |
) | |
headers_check_layout = QtGui.QGridLayout() | |
layout = QtGui.QVBoxLayout(self) | |
splitter = QtGui.QSplitter(Qt.Horizontal) | |
layout.addWidget(splitter) | |
splitter.addWidget(self.table) | |
splitter.addWidget(self.master_seq_view) | |
layout.addLayout(headers_check_layout) | |
n_col = 7 | |
load_button = QtGui.QPushButton("load") | |
all_button = QtGui.QPushButton("all") | |
none_button = QtGui.QPushButton("none") | |
headers_check_layout.addWidget(load_button, 0, 0) | |
headers_check_layout.addWidget(all_button, 0, 1) | |
headers_check_layout.addWidget(none_button, 0, 2) | |
for n, name in enumerate(self.headers, 3): | |
check = QtGui.QCheckBox(name) | |
check.setChecked(True) | |
i, j = n / n_col, n % n_col | |
headers_check_layout.addWidget(check, i, j) | |
def toggle(enabled, cb=check, col_n=n-3): | |
cb.setChecked(enabled) | |
self.table.setColumnHidden(col_n, not(enabled)) | |
check.clicked.connect(toggle) | |
all_button.clicked.connect(lambda checked, toggle=toggle: toggle(True)) | |
none_button.clicked.connect(lambda checked, toggle=toggle: toggle(False)) | |
load_button.clicked.connect(self.set_rrecs) | |
def set_master_seq(self, seq_str): | |
self.master_seq = seq_str.split('\n') | |
self.master_seq_view.setPlainText(seq_str) | |
def update_seq_view(self, selected, deselected): | |
addr = self.rrecs[selected.row()].addr_curr | |
text = '<style>.selected {background-color: yellow;}</style>' | |
for i, line in enumerate(self.master_seq): | |
if i == addr: | |
text += '<span class=selected>%s</span>' % line | |
else: | |
text += line | |
text += '<br>' | |
self.master_seq_view.setHtml(text) | |
scroll = self.master_seq_view.verticalScrollBar() | |
pct = float(addr) / len(self.master_seq) | |
pos = int(scroll.maximum()) * pct | |
scroll.setValue(pos) | |
def set_rrecs(self): | |
self.model.clear() | |
self.model.setHorizontalHeaderLabels(self.headers) | |
self.rrecs = map(ResultRecord, self.experiment.get_rrecs()) | |
for rrec in self.rrecs: | |
row = [] | |
for name in self.headers: | |
val = getattr(rrec, name) | |
if isinstance(val, bool): | |
item = QtGui.QStandardItem(" ") | |
color = QtGui.QColor("green" if val else "red") | |
item.setBackground(QtGui.QBrush(color)) | |
else: | |
item = QtGui.QStandardItem(str(val)) | |
row.append(item) | |
self.model.appendRow(row) | |
self.table.resizeColumnsToContents() | |
class ParamsWidget(QtGui.QWidget): | |
def __init__(self, experiment, num_cols=3, bool_cols=5): | |
super(ParamsWidget, self).__init__() | |
layout = QtGui.QVBoxLayout(self) | |
bool_layout = AutoGridLayout(bool_cols) | |
if num_cols == 1: | |
num_layout = QtGui.QFormLayout() | |
else: | |
num_layout = AutoGridLayout(num_cols) | |
str_layout = QtGui.QFormLayout() | |
layout.addLayout(bool_layout) | |
layout.addLayout(num_layout) | |
layout.addLayout(str_layout) | |
str_layout.setFieldGrowthPolicy(QtGui.QFormLayout.AllNonFixedFieldsGrow) | |
self.param_widgets = {} | |
self.param_getters = {} | |
self.param_setters = {} | |
for name, val, param_cls in experiment.get_param_info(): | |
self.param_widgets[name] = widget = param_cls.widget_cls() | |
def param_get(name=name, param_cls=param_cls, widget=widget): | |
return param_cls.from_widget(widget) | |
def param_set(new_val, name=name, param_cls=param_cls, widget=widget): | |
param_cls.to_widget(widget, new_val) | |
self.param_getters[name] = param_get | |
self.param_setters[name] = param_set | |
param_cls.to_widget(widget, val) | |
if issubclass(param_cls, base_experiment.BoolParameter): | |
widget.setText(name) | |
bool_layout.addWidget(widget) | |
elif issubclass(param_cls, base_experiment.NumParameter): | |
num_layout.addRow(QtGui.QLabel(name), widget) | |
elif issubclass(param_cls, base_experiment.StringParameter): | |
str_layout.addRow(name, widget) | |
else: | |
raise ValueError("Unknown parameter type %s" % param_cls) | |
def get_param(self, param_name): | |
val = self.param_getters[param_name]() | |
return val | |
def set_param(self, param_name, val): | |
self.param_setters[param_name](val) | |
def get_all_parameters(self): | |
return {name: self.get_param(name) for name in self.param_getters} | |
class TextDisplay(QtGui.QWidget): | |
def __init__(self): | |
super(TextDisplay, self).__init__() | |
self._layout = AutoGridLayout(4, self) | |
self.display_labels = {} | |
def display(self, name, val): | |
if name not in self.display_labels: | |
self.display_labels[name] = label = QtGui.QLabel() | |
label.setTextInteractionFlags(Qt.TextSelectableByMouse) | |
self._layout.addWidget(label) | |
self.display_labels[name].setText(name+": "+str(val)) | |
class LogView(QtGui.QTextEdit): | |
def __init__(self): | |
super(LogView, self).__init__() | |
self.sections = {} | |
self.lines = [] | |
self.setTextInteractionFlags( | |
Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard | |
) | |
self.setContextMenuPolicy(Qt.ActionsContextMenu) | |
self.check_layout = QtGui.QHBoxLayout() | |
self.follow_action = QtGui.QAction('Follow Latest', self) | |
self.follow_action.setCheckable(True) | |
self.follow_action.setChecked(True) | |
self.addAction(self.follow_action) | |
def add_section(self, name): | |
if name in self.sections: | |
return | |
self.sections[name] = action = QtGui.QAction('View ' + name, self) | |
action.setCheckable(True) | |
self.addAction(action) | |
if name in ('Experiment', 'GUI', 'Errors'): | |
action.setChecked(True) | |
action.triggered.connect(self.update_view) | |
@property | |
def currently_checked(self): | |
return [name for name, action in self.sections.items() if action.isChecked()] | |
def update_view(self): | |
items = [text for name, text in self.lines if name in self.currently_checked] | |
body = '<br>'.join(items) | |
head = '<style>.ERROR,.CRITICAL {font-weight: bold;}</style>' | |
head += '<style>.WARNING,.ERROR,.CRITICAL {color: red;}</style>' | |
head += '<style>.DEBUG {color: green;}</style>' | |
self.setHtml(head + body) | |
if self.follow_action.isChecked(): | |
QtGui.QApplication.instance().processEvents() | |
scroll_bar = self.verticalScrollBar() | |
scroll_bar.setValue(scroll_bar.maximum()) | |
def log_to(self, name, record): | |
self.lines.append((name, str(record))) | |
self.update_view() | |
class StatusBarHandler(logging.Handler): | |
def __init__(self, window): | |
super(StatusBarHandler, self).__init__() | |
self.status_bar = window.statusBar() | |
def emit(self, record): | |
self.status_bar.showMessage(record.getMessage()) | |
QtGui.QApplication.instance().processEvents() | |
class CalibrationsTab(ParamsWidget): | |
pass | |
class CalibrationsWidget(QtGui.QTabWidget): | |
def __init__(self): | |
super(CalibrationsWidget, self).__init__() | |
import fpga_dsl | |
self.mode_params = {} | |
for mode in fpga_dsl.Mode.modes: | |
self.add_mode(mode) | |
def add_mode(self, mode): | |
self.mode_params[mode.name] = params_widget = ParamsWidget(mode, 1, 3) | |
button = QtGui.QPushButton('Save') | |
button.clicked.connect(mode.save_params) | |
widget = QtGui.QWidget() | |
layout = QtGui.QVBoxLayout(widget) | |
layout.addWidget(params_widget) | |
layout.addWidget(button) | |
self.addTab(widget, mode.name) | |
class LogViewHandler(logging.Handler): | |
def __init__(self, name, window): | |
super(LogViewHandler, self).__init__() | |
self.name = name | |
self.log_view = window.log_view | |
self.log_view.add_section(name) | |
self.setFormatter(fpga_config.formatter) | |
def emit(self, record): | |
html_record = cgi.escape(self.format(record)) | |
text = '<span class=%s>%s</span>' % (record.levelname, html_record) | |
self.log_view.log_to(self.name, text) | |
QtGui.QApplication.instance().processEvents() | |
class PlotWidget(QtGui.QWidget): | |
def __init__(self, data=None): | |
super(PlotWidget, self).__init__() | |
self.fig = plt.figure() | |
self.canvas = FigureCanvasQTAgg(self.fig) | |
self.toolbar = NavigationToolbar2QT(self.canvas, self) | |
layout = QtGui.QVBoxLayout(self) | |
layout.addWidget(self.toolbar) | |
layout.addWidget(self.canvas) | |
if data is not None: | |
plt.plot(data) | |
self.canvas.draw() | |
class JournalWidget(QtGui.QTextEdit): | |
filename = 'journal.html' | |
def __init__(self): | |
super(JournalWidget, self).__init__() | |
if os.path.exists(self.filename): | |
with open(self.filename, 'r') as f: | |
self.setHtml(f.read()) | |
def save(self): | |
with open(self.filename, 'w') as f: | |
f.write(str(self.toHtml())) | |
def add_image(self, img_path): | |
html = "<a href=%s><img src='%s' width=350></a>" % (img_path, img_path) | |
fragment = QtGui.QTextDocumentFragment.fromHtml(html) | |
self.textCursor().insertFragment(fragment) | |
self.save() | |
# def setSource(self, url): | |
# os.system('start ' + str(url.path())) | |
def human_time(t_seconds): | |
if t_seconds > 7200: | |
return "%.1f Hours" % (t_seconds / 3600.0) | |
elif t_seconds > 120: | |
return "%.1f Minutes" % (t_seconds / 60.0) | |
else: | |
return "%.1f Seconds" % (t_seconds) | |
if __name__ == "__main__": | |
import sys | |
# from PyQt4.QtTest import QTest | |
app = QtGui.QApplication([]) | |
logger = logging.getLogger('exp gui') | |
logger.setLevel(fpga_config.log_level) | |
win = ExperimentWindow() | |
base_experiment.Parameter.gui = win | |
win.setup_logging() | |
win.showMaximized() | |
win.move(0, 0) | |
def test(): | |
pass | |
app.lastWindowClosed.connect(sys.exit) | |
QtCore.QTimer.singleShot(100, test) | |
app.exec_() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment