Last active
November 22, 2021 11:47
-
-
Save GhostofGoes/e049e1cad17428194a3d8adaaaa7b392 to your computer and use it in GitHub Desktop.
Various useful snippets of Python code I've picked up over the years. Python 3 is assumed unless stated otherwise.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Sources | |
* https://github.com/GhostofGoes/ADLES | |
""" | |
import logging | |
import logging.handlers | |
import sys | |
import os | |
# Credit to: http://stackoverflow.com/a/15707426/2214380 | |
def time_execution(func): | |
""" | |
Function decorator to time the execution of a function and log to debug. | |
:param func: The function to time execution of | |
:return: The decorated function | |
:rtype: func | |
""" | |
from timeit import default_timer | |
def wrapper(*args, **kwargs): | |
start_time = default_timer() | |
ret = func(*args, **kwargs) | |
end_time = default_timer() | |
logging.debug("Elapsed time for %s: %f seconds", func.__name__, | |
float(end_time - start_time)) | |
return ret | |
return wrapper | |
def read_json(filename): | |
""" | |
Reads input from a JSON file and returns the contents. | |
:param str filename: Path to JSON file to read | |
:return: Contents of the JSON file | |
:rtype: dict or None | |
""" | |
from json import load | |
try: | |
with open(filename) as json_file: | |
return load(fp=json_file) | |
except ValueError as message: | |
logging.error("Syntax Error in JSON file '%s': %s", | |
filename, str(message)) | |
return None | |
except Exception as message: | |
logging.critical("Could not open JSON file '%s': %s", | |
filename, str(message)) | |
return None | |
def user_input(prompt, obj_name, func): | |
""" | |
Continually prompts a user for input until the specified object is found. | |
:param str prompt: Prompt to bother user with | |
:param str obj_name: Name of the type of the object that we seek | |
:param func: The function that shalt be called to discover the object | |
:return: The discovered object and it's human name | |
:rtype: tuple(vimtype, str) | |
""" | |
while True: | |
item_name = str(input(prompt)) | |
item = func(item_name) | |
if item: | |
logging.info("Found %s: %s", obj_name, item.name) | |
return item, item_name | |
else: | |
print("Couldn't find a %s with name %s. Perhaps try another? " | |
% (obj_name, item_name)) | |
# NOTE: python-humanfriendly provides this functionality in a much cleaner and extensible way | |
# Based on: http://code.activestate.com/recipes/577058/ | |
def ask_question(question, default="no"): | |
""" | |
Prompts user to answer a question. | |
>>> ask_question("Do you like the color yellow?") | |
Do you like the color yellow? [y/N] | |
:param str question: Question to ask | |
:param str default: No | |
:return: True/False | |
:rtype: bool | |
""" | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
choice = '' | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("Invalid default answer: '%s'", default) | |
while True: | |
choice = str(input(question + prompt)).lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
print("Please respond with 'yes' or 'no' or 'y' or 'n'") | |
def setup_logging(filename, colors=True, console_verbose=False, | |
server=('localhost', 514)): | |
""" | |
Configures the logging interface used by everything for output. | |
:param str filename: Name of file that logs should be saved to | |
:param bool colors: Color the terminal output | |
:param bool console_verbose: Print DEBUG logs to terminal | |
:param server: SysLog server to forward logs to | |
:type server: tuple(str, int) | |
""" | |
# Prepend spaces to separate logs from previous runs | |
with open(filename, 'a') as logfile: | |
logfile.write(2 * '\n') | |
# Format log output so it's human readable yet verbose | |
base_format = "%(asctime)s %(levelname)-8s %(name)-7s %(message)s" | |
time_format = "%H:%M:%S" # %Y-%m-%d | |
formatter = logging.Formatter(fmt=base_format, datefmt=time_format) | |
# Configures the base logger to append to a file | |
logging.basicConfig(level=logging.DEBUG, format=base_format, | |
datefmt=time_format, filename=filename, filemode='a') | |
# Get the global root logger | |
# Handlers added to this will propagate to all loggers | |
logger = logging.root | |
# Configure logging to a SysLog server | |
syslog = logging.handlers.SysLogHandler(address=server) | |
syslog.setLevel(logging.DEBUG) | |
syslog.setFormatter(formatter) | |
logger.addHandler(syslog) | |
logging.debug("Configured logging to SysLog server %s:%s", | |
server[0], str(server[1])) | |
# Record system information to aid in auditing and debugging | |
# We do this before configuring console output to reduce verbosity | |
from getpass import getuser | |
from os import getcwd | |
from platform import python_version, system, release, node | |
from datetime import date | |
from adles import __version__ as adles_version | |
logging.debug("Initialized logging, saving logs to %s", filename) | |
logging.debug("Date %s", str(date.today())) | |
logging.debug("OS %s", str(system() + " " + release())) | |
logging.debug("Hostname %s", str(node())) | |
logging.debug("Username %s", str(getuser())) | |
logging.debug("Directory %s", str(getcwd())) | |
logging.debug("Python version %s", str(python_version())) | |
logging.debug("Adles version %s", str(adles_version)) | |
# If any of the libraries we're using have warnings, capture them | |
logging.captureWarnings(capture=True) | |
# Configure console output | |
console = logging.StreamHandler(stream=sys.stdout) | |
if colors: # Colored console output | |
try: | |
from colorlog import ColoredFormatter | |
formatter = ColoredFormatter(fmt="%(log_color)s" + base_format, | |
datefmt=time_format, reset=True) | |
logging.debug("Configured COLORED console logging output") | |
except ImportError: | |
logging.error("Colorlog is not installed. " | |
"Using STANDARD console output...") | |
else: # Bland console output | |
logging.debug("Configured STANDARD console logging output") | |
console.setFormatter(formatter) | |
console.setLevel((logging.DEBUG if console_verbose else logging.INFO)) | |
logger.addHandler(console) | |
# PyYAML Reference: http://pyyaml.org/wiki/PyYAMLDocumentation | |
from yaml import load, YAMLError | |
try: # Attempt to use C-based YAML parser if it's available | |
from yaml import CLoader as Loader | |
except ImportError: # Fallback to using pure Python YAML parser | |
from yaml import Loader | |
def parse_yaml(filename): | |
""" | |
Parses a YAML file and returns a nested dictionary containing its contents. | |
:param str filename: Name of YAML file to parse | |
:return: Parsed file contents | |
:rtype: dict or None | |
""" | |
try: | |
# Enables use of stdin if '-' is specified | |
with sys.stdin if filename == '-' else open(filename) as f: | |
try: | |
# Parses the YAML file into a dict | |
return load(f, Loader=Loader) | |
except YAMLError as exc: | |
logging.critical("Could not parse YAML file %s", filename) | |
if hasattr(exc, 'problem_mark'): | |
# Tell user exactly where the syntax error is | |
mark = exc.problem_mark | |
logging.error("Error position: (%s:%s)", | |
mark.line + 1, mark.column + 1) | |
else: | |
logging.error("Error: %s", exc) | |
return None | |
except FileNotFoundError: | |
logging.critical("Could not find YAML file for parsing: %s", filename) | |
return None | |
# Class-based command line scripts. | |
from abc import ABC, abstractmethod | |
from distutils.version import StrictVersion | |
import functools | |
import logging | |
@functools.total_ordering | |
class Script(ABC): | |
"""Base class for all CLI scripts.""" | |
__version__ = '0.1.0' | |
name = '' | |
def __init__(self): | |
self._log = logging.getLogger(self.name) | |
self._log.debug("Script name %s", self.name) | |
self._log.debug("Script version %s", self.__version__) | |
self._log.info( | |
'\n***** YOU RUN THIS SCRIPT AT YOUR OWN RISK *****\n' | |
'\n** Help and Documentation **' | |
'\n+ "<script> --help": flags, arguments, and usage' | |
'\n+ Read the latest documentation : https://adles.readthedocs.io' | |
'\n+ Open an issue on GitHub : %s' | |
'\n+ Email the script author : %s' | |
'\n', __url__, __email__) | |
@classmethod | |
def get_ver(cls): | |
return cls.name.capitalize() + ' ' + cls.__version__ | |
@abstractmethod | |
def run(self): | |
pass | |
def __str__(self): | |
return self.__doc__ | |
def __repr__(self): | |
return self.get_ver() | |
def __hash__(self): | |
return hash(repr(self)) | |
def __eq__(self, other): | |
return self.name == other.name \ | |
and self.__version__ == other.__version__ | |
def __gt__(self, other): | |
return StrictVersion(self.__version__) > StrictVersion(other.__version__) | |
# TQDM handler for the logging module | |
try: | |
import tqdm | |
TQDM = True | |
class TqdmHandler(logging.StreamHandler): | |
def __init__(self, level=logging.NOTSET): | |
super(self.__class__, self).__init__(level) | |
def emit(self, record): | |
try: | |
msg = self.format(record) | |
tqdm.tqdm.write(msg) | |
self.flush() | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except: | |
self.handleError(record) | |
except ImportError: | |
TQDM = False | |
def pad(value, length=2): | |
""" | |
Adds leading and trailing zeros to value ("pads" the value). | |
>>> pad(5) | |
05 | |
>>> pad(9, 3) | |
009 | |
:param int value: integer value to pad | |
:param int length: Length to pad to | |
:return: string of padded value | |
:rtype: str | |
""" | |
return "{0:0>{width}}".format(value, width=length) | |
# Source: https://stackoverflow.com/a/37573701/2214380 | |
def download_file(url, filename, extension=""): | |
""" | |
Download a file from the Internet. | |
:param str url: URL to retrieve from | |
:param str filename: Name of the file to be created | |
:param str extension: Extension of the file | |
""" | |
download = requests.get(url, stream=True) | |
total_size = int(download.headers.get("content-length", 0)) | |
with open(filename + extension, "wb") as handle: | |
for data in tqdm(download.iter_content(), total=total_size, | |
unit='B', unit_scale=True): | |
handle.write(data) | |
# http://stackoverflow.com/questions/1094841/ | |
def sizeof_fmt(num): | |
""" | |
Generates the human-readable version of a file size. | |
>>> sizeof_fmt(512) | |
512bytes | |
>>> sizeof_fmt(2048) | |
2KB | |
:param float num: Robot-readable file size in bytes | |
:return: Human-readable file size | |
:rtype: str | |
""" | |
for item in ['bytes', 'KB', 'MB', 'GB']: | |
if num < 1024.0: | |
return "%3.1f%s" % (num, item) | |
num /= 1024.0 | |
return "%3.1f%s" % (num, 'TB') | |
# Up to date list of network/MAC methods here: https://gist.github.com/GhostofGoes/0a8e82930e75afcefbd879a825ba4c26 | |
# Get MAC address of remote hosts | |
def arpreq_ip(ip): | |
# type: (str) -> Optional[str] | |
import arpreq | |
return arpreq.arpreq('192.168.1.1') | |
def scapy_ip(ip): | |
# type: (str) -> str | |
"""Requires root permissions on POSIX platforms. | |
Windows does not have this limitation.""" | |
from scapy.layers.l2 import getmacbyip | |
return getmacbyip(ip) | |
# Get MAC address of a local interfaces | |
def psutil_iface(iface): | |
# type: (str) -> Optional[str] | |
import psutil | |
nics = psutil.net_if_addrs() | |
if iface in nics: | |
nic = nics[iface] | |
for i in nic: | |
if i.family == psutil.AF_LINK: | |
return i.address | |
def netifaces_iface(iface): | |
# type: (str) -> str | |
import netifaces | |
return netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr'] | |
def scapy_iface(iface): | |
# type: (str) -> str | |
from scapy.layers.l2 import get_if_hwaddr | |
if WINDOWS: | |
from scapy.arch.windows import get_windows_if_list | |
interfaces = get_windows_if_list() | |
for i in interfaces: | |
if any(iface in i[x] for x in | |
['name', 'netid', 'description', 'win_index']): | |
return i['mac'] | |
# WARNING: Do not put an 'else' here! | |
return get_if_hwaddr(iface) | |
# Determine the default interface for a system | |
def netifaces_default_gateway(): | |
# type: () -> str | |
import netifaces | |
return list(netifaces.gateways()['default'].values())[0][1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment