Created
August 21, 2011 18:12
-
-
Save kachayev/1160938 to your computer and use it in GitHub Desktop.
How to create amazing script for custom keyboard shortcut with Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import time | |
import errno | |
import xmlrpclib | |
import socket | |
class Package(object): | |
'''Install python packages with using setup.py install''' | |
def __init__(self, directory): | |
self.directory = directory | |
def install(self): | |
'''Call setup.py install from given directory''' | |
cmd = 'python setup.py install' | |
try: | |
os.setuid(Command.ROOT) | |
except OSError: | |
# Operation not permitted | |
cmd = Command.sudo(cmd) | |
try: | |
os.chdir(self.directory) | |
return os.system(cmd) | |
except OSError: | |
return Command.ERROR | |
class Notify(object): | |
''' | |
Provide facilities to throw message to user desktop | |
even from background script (notification will be shown | |
in top right corner) | |
For correct processing, package libnotify should be installed | |
''' | |
def __init__(self, title='Hot Key', icon=None, time=500): | |
self.title = title | |
self.icon = icon | |
self.time = time | |
def send(self, message): | |
''' | |
Send message to user with using notify-send package | |
or just echo as fallback variant | |
''' | |
os.system(self.sender(self.title, message, self.icon, self.time)) | |
@property | |
def sender(self): | |
'''Return one of several sender function according to system facilities''' | |
def notify(*args): | |
return 'notify-send "%s" "%s" -i %s -t %d' % tuple(args) | |
def echo(*args): | |
return 'echo "%s"\n"%s"' % (args[0], args[1]) | |
if Command.SUCCESS == os.system('notify-send --version'): | |
return notify | |
return echo | |
class Command(object): | |
'''Command executor''' | |
ROOT = 0 | |
SUCCESS = 0 | |
ERROR = 1 | |
@staticmethod | |
def sudo(cmd): | |
'''Prepare command for runnig with root permissions''' | |
return 'gksudo %s' % cmd | |
@staticmethod | |
def gearoscope(*args): | |
''' | |
Rebuild gearoscope env by reinstalling | |
all necessary packages and restarting | |
group of daemons running under supervisord | |
''' | |
# Install Gearoscope package | |
if Command.SUCCESS == Package('/var/gearoscope/gearoscope').install(): | |
yield 'Gearoscope server is rebuild', Command.SUCCESS | |
else: | |
yield 'Error during Gearoscope application reinstall', Command.ERROR | |
# Copy Sonar daemon configuration file to /etc directory | |
try: | |
os.system(Command.sudo('cp %s %s' | |
% ('/var/gearoscope/gearoscope/config/sonar.conf', | |
'/etc/gearoscope/'))) | |
yield 'Sonar settings file is copied', Command.SUCCESS | |
except (IOError, OSError), exc: | |
yield 'Error during configuration file copy:\n%s' % exc.strerror, Command.ERROR | |
# Install Sonar package | |
if Command.SUCCESS == Package('/var/gearoscope/sonar').install(): | |
yield 'Sonar package reinstalled', Command.SUCCESS | |
else: | |
yield 'Error during Sonar pachage reinstall', Command.ERROR | |
# Connect to supervisor daemon via XML-RPC interface | |
try: | |
conn = xmlrpclib.ServerProxy('http://127.0.0.1:9001') | |
yield 'Connected to supervisor XML-RPC interface', Command.SUCCESS | |
# Stop process group | |
if conn.supervisor.stopProcessGroup('gearoscope'): | |
yield 'All processes in Gearoscope group succesfully stopped', Command.SUCCESS | |
else: | |
# We shouldn't stop working here, | |
# cause of too many reasons of process' stopping failed | |
yield 'Error during stoping Gearoscope group of processs', Command.SUCCESS | |
# Start process group | |
if conn.supervisor.startProcessGroup('gearoscope'): | |
yield 'All processes in Gearoscope group succesfully started', Command.SUCCESS | |
else: | |
yield 'Error during sratring Gearoscope group of processes', Command.ERROR | |
yield 'All process are restarted', Command.SUCCESS | |
except socket.error: | |
yield 'Connection to supervisor is failed', Command.ERROR | |
Notify(title='Gearoscope', | |
icon='/home/kachayev/gearoscope.png', | |
time=7500).send('Succesfully rebuild and restarted') | |
class FileLock(object): | |
""" | |
Source: http://www.evanfosmark.com/2009/01/cross-platform-file-locking-support-in-python/ | |
Revised and updated. | |
A file locking mechanism that has context-manager support so | |
you can use it in a with statement. This should be relatively cross | |
compatible as it doesn't rely on msvcrt or fcntl for the locking. | |
""" | |
class Locked(Exception): | |
pass | |
def __init__(self, file_name, timeout=10, delay=.05): | |
""" | |
Prepare the file locker. Specify the file to lock and optionally | |
the maximum timeout and the delay between each attempt to lock. | |
""" | |
self.is_locked = False | |
self.lockfile = os.path.join(os.getcwd(), "%s.lock" % file_name) | |
self.file_name = file_name | |
self.timeout = timeout | |
self.delay = delay | |
def acquire(self): | |
""" | |
Acquire the lock, if possible. If the lock is in use, it check again | |
every `wait` seconds. It does this until it either gets the lock or | |
exceeds `timeout` number of seconds, in which case it throws | |
an exception. | |
""" | |
start_time = time.time() | |
while True: | |
try: | |
self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR) | |
break; | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
raise | |
if (time.time() - start_time) >= self.timeout: | |
raise FileLock.Locked("Timeout occured.") | |
time.sleep(self.delay) | |
self.is_locked = True | |
def release(self): | |
""" | |
Get rid of the lock by deleting the lockfile. | |
When working in a `with` statement, this gets automatically | |
called at the end. | |
""" | |
if self.is_locked: | |
os.close(self.fd) | |
os.unlink(self.lockfile) | |
self.is_locked = False | |
def __enter__(self): | |
""" | |
Activated when used in the with statement. | |
Should automatically acquire a lock to be used in the with block. | |
""" | |
if not self.is_locked: | |
self.acquire() | |
return self | |
def __exit__(self, type, value, traceback): | |
""" | |
Activated at the end of the with statement. | |
It automatically releases the lock if it isn't locked. | |
""" | |
if self.is_locked: | |
self.release() | |
def __del__(self): | |
""" | |
Make sure that the FileLock instance doesn't leave a lockfile | |
lying around. | |
""" | |
self.release() | |
def error_shutdown(msg, code=Command.ERROR): | |
Notify(title='Gearoscope error', | |
icon='/home/kachayev/error.png', | |
time=5000).send(msg) | |
sys.exit(code) | |
if __name__ == '__main__': | |
try: | |
with FileLock('/tmp/shortcut.%s.lock' % sys.argv[1]) as lock: | |
cmd = getattr(Command, sys.argv[1]) | |
for msg, code in cmd(sys.argv[2:]): | |
print msg | |
if code != Command.SUCCESS: | |
error_shutdown(msg) | |
print 'Succefully done' | |
sys.exit(Command.SUCCESS) | |
except FileLock.Locked: | |
error_shutdown('Already running') | |
except IndexError: | |
error_shutdown('You should provide command name as first argument') | |
except AttributeError: | |
error_shutdown('Unknown command given') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Remove unnecessary
shutil
import.