Skip to content

Instantly share code, notes, and snippets.

@yspreen
Created March 23, 2021 23:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yspreen/3bd5300d1b84decced23e5ddf1979c1a to your computer and use it in GitHub Desktop.
Save yspreen/3bd5300d1b84decced23e5ddf1979c1a to your computer and use it in GitHub Desktop.
PyQt Recorder
import time
import qtawesome as qta
import gseos_qt.globalvars as glob
import copy
import pickle
import gzip
import os
from typing import *
from datetime import datetime
from PyQt5.uic import loadUi
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from functools import wraps
from contextlib import suppress
from gseos_qt.utils.misc import WrappedMessageHandler, call_async, format_time
RECORDING_FILTER = "GSpy Recording (*.gspy)"
class StolenObject:
def __init__(self, original, intercept, replacement):
self.original = original
self.intercept = intercept
self.replacement = replacement
def __getattr__(self, item):
if item in self.intercept:
return self.replacement
return object.__getattribute__(self.original, item)
class RecorderWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
from .widget import select_file
self.ui = loadUi("ui/recorder.ui", self)
self.list_record: QListView = self.list_record
self.list_play: QListView = self.list_play
self.play.setIcon(qta.icon("fa.play", color="green"))
self.play.setText("")
self.play.clicked.connect(self.play_click)
self.f_forward.setIcon(qta.icon("fa.fast-forward", color="green"))
self.f_forward.setText("")
self.f_forward.clicked.connect(self.f_forward_click)
self.record.setIcon(qta.icon("fa.circle", color="red"))
self.record.setText("")
self.record.clicked.connect(self.record_click)
self.stop.setIcon(qta.icon("fa.stop"))
self.stop.setText("")
self.stop.clicked.connect(self.stop_click)
self.stop.setEnabled(False)
self.step.setIcon(qta.icon("fa.step-forward", color="orange"))
self.step.setText("")
self.step.clicked.connect(self.step_click)
glob.Recorder.finished.connect(self.finished)
glob.Recorder.recording_changed.connect(self.on_new_recording)
self.action_save.triggered.connect(lambda: select_file(
self.save_file,
type_filter=RECORDING_FILTER,
path=glob.Settings.value("record_path", "."),
create_new=True
))
self.action_load.triggered.connect(lambda: select_file(
self.load_file,
type_filter=RECORDING_FILTER,
path=glob.Settings.value("record_path", ".")
))
self.record_name.textChanged.connect(self.update_name)
self.list_record.setModel(glob.Recorder.record_items_model)
self.on_new_recording()
with suppress(Exception):
self.restoreGeometry(glob.Settings.value("recorder_geometry", None))
self.status_label = QLabel("Start or Load Recording", self)
self.statusbar.addPermanentWidget(self.status_label, 1)
glob.Recorder.m_h = StolenObject(glob.Recorder.m_h, ["info", "success"], self.status_set)
self.show()
def status_set(self, message):
self.status_label.setText(message)
def update_name(self, *_):
glob.Recorder.recording.name = self.record_name.text()
@pyqtSlot(str)
def save_file(self, file: str):
# store the object
with gzip.open(file, 'wb') as f:
pickle.dump(glob.Recorder.recording.to_dump(), f)
glob.Settings.setValue("record_path", os.path.dirname(file))
glob.Recorder.m_h.info("File saved. (%d Steps)" % len(glob.Recorder.recording.recordings))
@pyqtSlot(str)
def load_file(self, file: str):
# restore the object
with gzip.open(file, 'rb') as f:
dump = pickle.load(f)
glob.Recorder.set_recording(Recording.from_dump(dump))
glob.Settings.setValue("record_path", os.path.dirname(file))
glob.Recorder.m_h.info("File loaded. (%d Steps)" % len(glob.Recorder.recording.recordings))
def on_new_recording(self):
self.list_play.setModel(glob.Recorder.recording.play_items_model)
self.record_name.setText(glob.Recorder.recording.name)
def closeEvent(self, *args, **kwargs):
glob.Settings.setValue("recorder_geometry", self.saveGeometry())
glob.Recorder.m_h = glob.Recorder.m_h.original
self.deleteLater()
QMainWindow.closeEvent(self, *args, **kwargs)
@pyqtSlot()
def record_click(self):
if glob.Recorder.is_recording:
self.record.setIcon(qta.icon("fa.circle", color="red"))
self.play.setEnabled(True)
self.f_forward.setEnabled(True)
self.step.setEnabled(True)
glob.Recorder.stop_recording()
else:
self.record.setIcon(qta.icon("fa.stop", color="red"))
self.play.setEnabled(False)
self.f_forward.setEnabled(False)
self.step.setEnabled(False)
self.stop.setEnabled(False)
self.list_play.setEnabled(True)
glob.Recorder.start_recording()
def step_click(self):
self.stop.setEnabled(True)
self.list_play.setEnabled(False)
glob.Recorder.step_playback()
def stop_click(self):
_ = self # unused
glob.Recorder.stop_playback()
@pyqtSlot()
def finished(self):
self.play.setIcon(qta.icon("fa.play", color="green"))
self.play.setEnabled(True)
self.f_forward.setEnabled(True)
self.step.setEnabled(True)
self.stop.setEnabled(False)
self.list_play.setEnabled(True)
self.record.setEnabled(True)
def f_forward_click(self):
self.play.setEnabled(False)
self.f_forward.setEnabled(False)
self.step.setEnabled(False)
self.stop.setEnabled(False)
self.list_play.setEnabled(True)
glob.Recorder.fast_playback()
def play_click(self):
self.stop.setEnabled(True)
self.list_play.setEnabled(False)
if glob.Recorder.is_playing_back and not glob.Recorder.is_paused:
self.play.setIcon(qta.icon("fa.play", color="green"))
self.f_forward.setEnabled(True)
self.step.setEnabled(True)
glob.Recorder.pause_playback()
else:
self.play.setIcon(qta.icon("fa.pause", color="green"))
self.f_forward.setEnabled(False)
self.step.setEnabled(False)
if glob.Recorder.is_paused:
glob.Recorder.continue_playback()
else:
glob.Recorder.start_playback()
class PlayAction:
def __init__(self, action: Union[str, dict, list], *args, copy_action=True, **kwargs):
super().__init__(*args, **kwargs)
if copy_action:
action = copy.deepcopy(action)
self.action = action
class Recordable:
@staticmethod
def record_method():
def _wrapper(func):
def _decorator(self: Recordable, *args, **kwargs):
if kwargs.get("play_back", False):
return func(self, *args, **kwargs)
kwargs["play_back"] = True
action = PlayAction({
"method": func.__name__,
"args": args,
"kwargs": kwargs
})
glob.Recorder.record(self.cls_name, self.instance_name, action)
kwargs["play_back"] = False
return func(self, *args, **kwargs)
return _decorator
return _wrapper
@staticmethod
def intercept_on_playback():
def _wrapper(func):
def _decorator(self: Recordable, *args, **kwargs):
if self.playing_back and not kwargs.get("play_back", False):
return
return func(self, *args, **kwargs)
return _decorator
return _wrapper
def __init__(self, *args, instance_name=None, cls_name=None, singleton: Optional[bool] = None, register=True,
**kwargs):
super().__init__(*args, **kwargs)
if instance_name is None:
instance_name = "STATIC"
if cls_name is None:
cls_name = self.__module__ + "." + self.__class__.__name__
if singleton is None:
singleton = cls_name == "STATIC"
self.cls_name = cls_name
self.instance_name = instance_name
self.singleton = singleton
if register:
self.recorder_register()
def recorder_register(self):
glob.Recorder.register_instance(self.__class__, self.cls_name, self, self.instance_name, self.singleton)
@property
def playing_back(self) -> bool:
return glob.Recorder.is_playing_back
def play_back(self, action: PlayAction):
if isinstance(action.action, dict):
method = action.action.get("method", None)
try:
if method is not None:
method = self.__getattribute__(method)
except AttributeError:
method = None
args = action.action.get("args", tuple())
kwargs = action.action.get("kwargs", dict())
if callable(method) and isinstance(args, tuple) and isinstance(kwargs, dict):
if len(args) > 0:
if len(kwargs) > 0:
return method(*args, **kwargs)
return method(*args)
if len(kwargs) > 0:
return method(**kwargs)
return method()
raise NotImplementedError()
class RecordAction:
@staticmethod
def delay_only(delay_s):
return RecordAction("", "", None, delay_s)
def __init__(self, classname: str, instancename: str, action: Optional[PlayAction], delay_s=-1.):
self.classname = classname
self.instancename = instancename
self.action = action
self.delay = delay_s
def clone(self):
return RecordAction(self.classname, self.instancename, self.action, self.delay)
def __str__(self):
return "A<%s,%s,%s>" % (self.classname, self.instancename, str(self.action.action))
class Recording:
def __init__(self, *args, name=None, **kwargs):
super().__init__(*args, **kwargs)
self.name = "Recording %s" % datetime.now().strftime("%Y-%m-%d %H:%M:%S") if name is None else name
self.recordings: List[RecordAction] = []
self.play_items_model: QStandardItemModel = QStandardItemModel()
self.play_items_dict = dict()
self.check_states_restore: List[bool] = []
@property
def check_states(self) -> List[bool]:
states = []
for i in range(self.play_items_model.rowCount()):
states.append(self.play_items_model.item(i).checkState() == Qt.Checked)
return states
@property
def recordings_cleaned(self) -> List[RecordAction]:
recordings = []
delay_add = 0
for r in self.recordings:
try:
if self.play_items_dict[r.classname][r.instancename].checkState() == Qt.Checked:
if delay_add > 0:
r = r.clone()
if r.delay > 0:
r.delay += delay_add
else:
r.delay = delay_add
delay_add = 0
recordings.append(r)
else:
raise KeyError()
except (KeyError, AttributeError):
if r.delay > 0:
delay_add += r.delay
return recordings
def update_model(self):
self.play_items_model.clear()
self.play_items_dict.clear()
for r in self.recordings:
checked = True
if len(self.check_states_restore) > 0:
checked = self.check_states_restore.pop(0)
add_model_entry(self.play_items_model, self.play_items_dict, r.classname, r.instancename, checked)
def to_dump(self):
return [
self.name,
self.recordings,
self.check_states
]
@staticmethod
def from_dump(d) -> "Recording":
r = Recording(name=d[0])
r.recordings = d[1]
r.check_states_restore = d[2]
r.update_model()
return r
def make_item(cls, instance, dic, checked=True):
cls_short = cls.split(".")[-1]
if instance.upper() in ["", "_", "STATIC"]:
item = QStandardItem(cls_short)
else:
item = QStandardItem("%s.%s" % (cls_short, instance))
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Checked if checked else Qt.Unchecked)
if dic.get(cls, None) is None:
dic[cls] = dict()
dic[cls][instance] = item
return item
def add_model_entry(model, dict_, cls, instance, checked=True):
if dict_.get(cls, dict()).get(instance, None) is None:
model.appendRow(make_item(cls, instance, dict_, checked))
class Recorder(QObject):
def __init__(self, *args, message_handler=None, **kwargs):
super().__init__(*args, **kwargs)
self.classes: Dict[str, type] = {}
self.instances: Dict[str, Dict[str, List[Recordable]]] = {}
self.recording = None
self.set_recording(Recording())
self.actions_playing: List[RecordAction] = []
self.last_time: datetime = None
self.is_recording: bool = False
self.is_playing_back = False
self.is_paused = False
self.actions_count = -1
self.time = -1
self.m_h = WrappedMessageHandler(message_handler, "Recorder")
self.record_items_model: QStandardItemModel = QStandardItemModel()
self.record_items_dict = dict()
finished = pyqtSignal()
recording_changed = pyqtSignal()
@property
def progress(self) -> str:
return "(%d/%d) [%s]" % (self.actions_count - len(self.actions_playing), self.actions_count,
format_time(self.time))
def set_recording(self, recording):
self.recording = recording
self.recording_changed.emit()
def set_message_handler(self, message_handler):
self.m_h = WrappedMessageHandler(message_handler, "Recorder")
def register_instance(self, cls: type, cls_name: str, instance: Recordable, instance_name: str, singleton: bool):
if self.classes.get(cls_name, None) is None:
self.classes[cls_name] = cls
if self.instances.get(cls_name, None) is None:
self.instances[cls_name]: Dict[str, List[Recordable]] = dict()
if self.instances[cls_name].get(instance_name) is None:
self.instances[cls_name][instance_name]: List[Recordable] = list()
elif singleton:
raise ValueError("Instance %s <%s> already defined!" % (instance_name, cls_name))
add_model_entry(self.record_items_model, self.record_items_dict, cls_name, instance_name)
self.instances[cls_name][instance_name].append(instance)
def record(self, classname, instancename, action):
if not self.is_recording:
return
try:
if self.record_items_dict[classname][instancename].checkState() != Qt.Checked:
return
except (AttributeError, KeyError):
return
now_time = datetime.now()
self.recording.recordings.append(
RecordAction(classname, instancename, action, (now_time - self.last_time).total_seconds()))
self.last_time = now_time
self.m_h.info("Recording... %d" % len(self.recording.recordings))
@pyqtSlot()
def stop_recording(self):
from pympler import asizeof
self.is_recording = False
self.recording.update_model()
self.m_h.info("%d Action%s recorded. (%s Bytes)" % (
len(self.recording.recordings), "s" if len(self.recording.recordings) != 1 else "",
asizeof.asizeof(self.recording.recordings)))
@pyqtSlot()
def start_recording(self):
self.is_recording = True
self.last_time = datetime.now()
self.set_recording(Recording())
self.m_h.info("Recording...")
@pyqtSlot()
def stop_playback(self, finished=False): # TODO
if finished:
self.m_h.info("Playback finished. %s" % self.progress)
else:
self.m_h.info("Playback aborted.")
self.is_paused = False
self.is_playing_back = False
self.finished.emit()
@pyqtSlot()
def pause_playback(self): # TODO
self.is_paused = True
self.m_h.info("Paused. %s" % self.progress)
@pyqtSlot()
def continue_playback(self):
self.is_paused = False
self.queue_step()
@pyqtSlot()
def step_playback(self):
if not self.is_playing_back:
self.is_playing_back = True
self.set_actions()
self.is_paused = True
self.make_step(True, False)
@pyqtSlot()
def fast_playback(self):
self.is_playing_back = True
self.is_paused = False
self.set_actions()
self.queue_step(instant=True)
self.m_h.info("Fast forward >>")
@pyqtSlot()
def start_playback(self):
self.is_playing_back = True
self.is_paused = False
self.set_actions()
self.queue_step()
self.m_h.info("Playing.. %s" % self.progress)
def set_actions(self):
self.actions_playing = self.recording.recordings_cleaned
self.actions_count = len(self.actions_playing)
self.time = 0
def queue_step(self, instant=False):
if len(self.actions_playing) == 0:
self.stop_playback(finished=True)
return
call_async(self.make_step, (instant,))
def make_step(self, instant=False, queue_next=True):
start_time = datetime.now()
step: RecordAction = self.actions_playing[0]
while not instant and step.delay > (datetime.now() - start_time).total_seconds():
time.sleep(.005)
if not self.is_playing_back or self.is_paused:
return
self.actions_playing = self.actions_playing[1:]
self.play_back(step)
if len(self.actions_playing) == 0:
return self.stop_playback(finished=True)
if queue_next:
self.queue_step(instant)
if not instant or not queue_next:
if queue_next:
self.m_h.info("Playing.. %s" % self.progress)
else:
self.m_h.info("Paused. %s" % self.progress)
@staticmethod
def sync_play_back(instances: List[Recordable], instance: Recordable, action):
try:
instance.play_back(action)
except (Exception,):
instances.remove(instance)
def play_back(self, recording: RecordAction):
if recording.action is None:
return
if recording.delay > 0:
self.time += recording.delay
instances = self.instances.get(recording.classname, dict()).get(recording.instancename, list())
if len(instances) == 0:
self.m_h.warning(
"No instance of %s <%s> to play back action!" % (recording.instancename, recording.classname))
for i in instances:
from .widget import async_in_main_thread
async_in_main_thread(Recorder.sync_play_back, (instances, i, recording.action))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment