Last active
August 29, 2015 14:06
-
-
Save flying-sheep/4ad4b6374c492f9bc707 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Repeatedly runs a command and checks its output via a Python regex, | |
displaying a progress bar in KDE’s notification area. | |
The regex must have the named groups “i” and “max”, which determine the achieved progress. | |
""" | |
from PyQt4.QtCore import QObject, QTimer | |
from PyKDE4.kdecore import KJob, KCmdLineArgs, KCmdLineOptions, KAboutData, KLocalizedString, ki18n | |
from PyKDE4.kio import KIO | |
from PyKDE4.kdeui import KApplication, KNotification | |
import sys | |
import re | |
from subprocess import CalledProcessError, check_output | |
from inspect import getargspec | |
from collections import namedtuple | |
def shellrepr(s): | |
"""Returns a lazily shell-like escaped version of a string (i.e. without quotes unless necessary)""" | |
r = repr(s) | |
stripped_r = r[1:-1] | |
if re.search(r'''[\s"'!#]''', stripped_r): | |
return r | |
else: | |
return stripped_r | |
def function_defaults(f): | |
"""Returns a named tuple for easy function defaults access""" | |
spec = getargspec(f) | |
args = spec.args[-len(spec.defaults):] | |
cls = namedtuple(f.__name__ + '_defaults', args) | |
return cls(*spec.defaults) | |
class RunJob(KJob): | |
"""A KJob able to run a command repeatedly | |
""" | |
def __init__(self, command, regex=r'(?P<i>\d+)\s*(?:/|or)\s*(?P<max>\d+)', max=None, interval=1000, parent=QObject()): | |
KJob.__init__(self, parent) | |
self.setCapabilities(KJob.Capabilities(KJob.Killable)) | |
self.command = [command] if isinstance(command, str) else command | |
self.regex = re.compile(regex) | |
self.max = max | |
self.interval = interval | |
if 'i' not in self.regex.groupindex: | |
raise ValueError('The regex parameter needs to have an “i” named group') | |
if max is None and 'max' not in self.regex.groupindex: | |
raise ValueError('The regex parameter needs to have an “max” named group or the max parameter has to be specified') | |
def start(self): | |
KIO.getJobTracker().registerJob(self) | |
QTimer().singleShot(0, self.tick) | |
def tick(self): | |
try: | |
if self.error() or self.isSuspended(): | |
return | |
except RuntimeError: | |
#if this class is killed before, a RuntimeError will raise | |
return | |
try: | |
i, max = self.run_and_parse() | |
except Exception as e: | |
self.process_error(str(e)) | |
return | |
self.emit_description() | |
KJob.setPercent(self, i / max * 100) | |
if i >= max: | |
self.emitResult() | |
return | |
QTimer().singleShot(self.interval, self.tick) | |
def run_and_parse(self): | |
output = self.run_command() | |
print(output) | |
match = self.regex.search(output) | |
if not match: | |
self.process_error(output) | |
max = self.max if self.max is not None else int(match.group('max')) | |
return int(match.group('i')), max | |
def run_command(self): | |
try: | |
print(self.command) | |
return check_output(self.command).decode() | |
except CalledProcessError as e: | |
return e.output.decode() | |
def process_error(self, msg): | |
print('Error', msg) | |
self.setError(KJob.UserDefinedError) | |
self.setErrorText(msg) | |
self.emit_description() | |
self.emitResult() | |
def emit_description(self): | |
if self.error(): | |
title = 'Error {}'.format(self.error()) | |
status = ('Error', self.errorText()) | |
else: | |
title = 'Watching command with interval {s.interval}'.format(s=self) | |
status = ('Progress', '{s.i} / {s.max}'.format(s=self)) | |
self.description.emit(self, title, ('Command', ' '.join([shellrepr(c) for c in self.command])), status) | |
def doSuspend(self): | |
#returns False for signaling that this class supports to suspend | |
return False | |
def doResume(self): | |
#return False for signaling that this class supports resuming | |
return False | |
def doKill(self): | |
#return True for signaling that we support killing | |
return True | |
RUNJOB_DEFAULTS = function_defaults(RunJob.__init__) | |
if __name__ == '__main__': | |
appName = 'kprogress' | |
catalog = '' | |
programName = ki18n(b'kprogress') | |
version = '1.0' | |
description = ki18n(__doc__.encode()) | |
license = KAboutData.License_GPL_V3 | |
copyright = ki18n('ⓒ 2014 Philipp A.'.encode()) | |
text = ki18n(b'none') | |
homePage = 'techbase.kde.org' | |
bugEmail = 'trueflyingsheep@gmail.com' | |
aboutData = KAboutData(appName, catalog, programName, version, description, license, copyright, text, homePage, bugEmail) | |
KCmdLineArgs.init(sys.argv, aboutData) | |
options = KCmdLineOptions() | |
options.add('r').add('regex <regex>', ki18n('Regex to fit on output. Must have named groups “i” and “max”.'.encode()), RUNJOB_DEFAULTS.regex) | |
options.add('i').add('interval <number>', ki18n(b'Interval in ms between command executions.'), str(RUNJOB_DEFAULTS.interval)) | |
options.add('!+command...', ki18n(b'Command with optional arguments to execute.')) | |
KCmdLineArgs.addCmdLineOptions(options) | |
args = KCmdLineArgs.parsedArgs() | |
command = [args.arg(i) for i in range(args.count())] | |
regex = args.getOption('regex') | |
interval = int(args.getOption('interval')) | |
print(command, regex, interval, sep='\n') | |
app = KApplication() | |
job = RunJob(command, regex, interval, app) | |
@job.result.connect | |
def finished(job): | |
if job.error() not in {KJob.NoError, KJob.KilledJobError}: | |
#TODO: why does this do nothing? | |
KNotification.event(job.errorText(), job.errorText()) | |
sys.exit() | |
job.start() | |
app.exec() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment