Last active
August 25, 2019 16:15
-
-
Save adeutscher/b07de04d322f2316c0af5ddeff51e534 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Output desktop notifications when useful hotplug events happen. | |
''' | |
from __future__ import print_function # Python3 printing in Python2 | |
import pyudev # For detecting events | |
# Basic includes | |
import getopt, os, sys | |
from subprocess import Popen as popen | |
DEFAULT_DEBUG = False | |
DEFAULT_NO_GUI = False | |
TITLE_DEBUG = 'debug' | |
TITLE_NO_GUI = 'nogui' | |
# | |
# Common Colours and Message Functions | |
### | |
def _print_message(header_colour, header_text, message): | |
print('%s[%s]: %s' % (colour_text(header_text, header_colour), colour_text(os.path.basename(sys.argv[0]), COLOUR_GREEN), message)) | |
def colour_text(text, colour = None): | |
if not colour: | |
colour = COLOUR_BOLD | |
# A useful shorthand for applying a colour to a string. | |
return '%s%s%s' % (colour, text, COLOUR_OFF) | |
def enable_colours(force = False): | |
global COLOUR_PURPLE | |
global COLOUR_RED | |
global COLOUR_GREEN | |
global COLOUR_YELLOW | |
global COLOUR_BLUE | |
global COLOUR_BOLD | |
global COLOUR_OFF | |
if force or sys.stdout.isatty(): | |
# Colours for standard output. | |
COLOUR_PURPLE = '\033[1;35m' | |
COLOUR_RED = '\033[1;91m' | |
COLOUR_GREEN = '\033[1;92m' | |
COLOUR_YELLOW = '\033[1;93m' | |
COLOUR_BLUE = '\033[1;94m' | |
COLOUR_BOLD = '\033[1m' | |
COLOUR_OFF = '\033[0m' | |
else: | |
# Set to blank values if not to standard output. | |
COLOUR_PURPLE = '' | |
COLOUR_RED = '' | |
COLOUR_GREEN = '' | |
COLOUR_YELLOW = '' | |
COLOUR_BLUE = '' | |
COLOUR_BOLD = '' | |
COLOUR_OFF = '' | |
enable_colours() | |
error_count = 0 | |
def print_error(message): | |
global error_count | |
error_count += 1 | |
_print_message(COLOUR_RED, 'Error', message) | |
def print_debug(message): | |
_print_message(COLOUR_YELLOW, 'DEBUG', message) | |
def print_exception(e, msg=None): | |
# Shorthand wrapper to handle an exception. | |
# msg: Used to provide more context. | |
sub_msg = '' | |
if msg: | |
sub_msg = ' (%s)' % msg | |
print_error('Unexpected %s%s: %s' % (colour_text(type(e).__name__, COLOUR_RED), sub_msg, str(e))) | |
def print_notice(message): | |
_print_message(COLOUR_BLUE, 'Notice', message) | |
def print_usage(message): | |
_print_message(COLOUR_PURPLE, 'Usage', message) | |
# Script Functions | |
### | |
def colour_value(text): | |
text = str(text) | |
if not text: | |
return text | |
if text.startswith('/'): | |
# I like to colour-code paths green | |
return colour_text(text, COLOUR_GREEN) | |
return colour_text(text) | |
def desktop_notify(header, contents): | |
if args.get(TITLE_NO_GUI, DEFAULT_NO_GUI): | |
return | |
try: | |
popen(['notify-send', header, contents]).communicate() | |
except OSError as e: | |
# Handle notify-send going south (not installed?) | |
# For this reddit post, just raise it. | |
raise e | |
def hexit(exit_code = 0): | |
print_usage('Usage: ./%s [-h] [-n] [--debug]' % os.path.basename(sys.argv[0])) | |
print_usage('-h: Display this help menu and exit.') | |
print_usage('-n: Do not attempt to print to GUI with desktop-notify') | |
print_usage('--debug: Print a debug printout for any hotplug event. Useful for developing new features.') | |
exit(exit_code) | |
def process_args(raw_args): | |
args = {} | |
try: | |
output_options, operands = getopt.gnu_getopt(raw_args, 'hn', [TITLE_DEBUG]) | |
except Exception as e: | |
print_error('Error parsing arguments: %s' % str(e)) | |
hexit(1) | |
for option, value in output_options: | |
if option == '--%s' % TITLE_DEBUG: | |
args[TITLE_DEBUG] = True | |
elif option == '-h': | |
hexit( ) | |
elif option == '-n': | |
args[TITLE_NO_GUI] = True | |
return args | |
def run(): | |
# Convenient shorthand | |
debug = args.get(TITLE_DEBUG, DEFAULT_DEBUG) | |
gui = not args.get(TITLE_NO_GUI, DEFAULT_NO_GUI) | |
context = pyudev.Context() | |
monitor = pyudev.Monitor.from_netlink(context) | |
# Note: Though the phrasing in documentation worried me, these do stack (e.g. filtering by net doesn't override filtering by USB) | |
if not debug: | |
monitor.filter_by(subsystem='block') # Block devices (seems to just be storage disks and partitions?) | |
monitor.filter_by(subsystem='net') # Display information if a network interface was added. | |
monitor.filter_by(subsystem='tty') # Display information on TTY events. | |
monitor.filter_by(subsystem='usb') # Display information if a USB device was added. | |
for device in iter(monitor.poll, None): | |
if debug: | |
# Store in advance for the sake of avoiding quite as bad of a monster line. | |
raw_values = { | |
'subsystem': device.subsystem, | |
'action': device.action, | |
'device_type': device.device_type, | |
'driver': device.driver, | |
'sys_name': device.sys_name, | |
'sys_path': device.sys_path, | |
'device_path': device.device_path, | |
'device_node': device.device_node, | |
'device_number': device.device_number | |
} | |
print_debug('EVENT: (%s)' % ', '.join(['%s: %s' % (k, colour_value(raw_values[k])) for k in raw_values.keys()])) | |
# Compared to initial testing, instead grouping events by subsystem | |
if device.subsystem == 'tty': | |
# Terminal Event (e.g. a serial adapter or Arduino device) | |
if device.action == 'add': | |
print_notice('New TTY device added: %s' % colour_value(device.device_node)) | |
desktop_notify('New TTY', 'Path: %s' % device.device_node) | |
elif device.action == 'remove': | |
print_notice('TTY removed: %s' % colour_value(device.device_node)) | |
desktop_notify('Removed TTY', 'Path: %s' % device.device_node) | |
elif device.subsystem == 'block': | |
# Storage Event | |
if device.action == 'add': | |
if device.device_type == 'disk': | |
child_nodes = [d.device_node for d in device.children] | |
child_wording = 'Children' | |
if len(child_nodes) == 1: | |
child_wording = 'Child' | |
content_terminal = 'New disk added: %s' % colour_value(device.device_node) | |
if gui: | |
content_gui = 'New disk: %s' % device.device_node | |
if child_nodes: | |
content_terminal += ' (%s: %s)' % (child_wording, ', '.join([colour_value(c) for c in child_nodes])) | |
if gui: | |
content_gui += ' (%s: %s)' % (child_wording, ', '.join(child_nodes)) | |
print_notice(content_terminal) | |
if gui: | |
desktop_notify('Disk Added', content_gui) | |
elif device.action == 'remove': | |
print_notice('Removed %s: %s' % (device.device_type, colour_value(device.device_node))) | |
desktop_notify('%s removed' % device.device_type.capitalize(), 'Path: %s' % device.device_node) | |
elif device.subsystem == 'net': | |
# Network event. | |
# Note: Adding/removing interfaces from a bridge does NOT trigger a hotplug event. | |
if_name = device.sys_name # Shorthand | |
wireless = device.device_type == 'wlan' | |
if wireless: | |
if_wording = 'wireless' | |
else: | |
if_wording = device.device_type or 'ethernet' | |
if device.action == 'add': | |
print_notice('New %s interface: %s' % (if_wording, colour_value(if_name))) | |
desktop_notify('New %s interface' % if_wording, '%s device name: %s' % (if_wording.capitalize(), if_name)) | |
elif device.action == 'remove': | |
print_notice('Removed %s interface: %s' % (if_wording, colour_value(if_name))) | |
desktop_notify('Removed %s interface' % if_wording, '%s device name: %s' % (if_wording.capitalize(), if_name)) | |
if __name__ == '__main__': | |
# Globally set arguments | |
args = process_args(sys.argv[1:]) | |
try: | |
run() | |
except KeyboardInterrupt: | |
print('') # Print new line so that prompt happens on fresh line. | |
exit(130) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment