Skip to content

Instantly share code, notes, and snippets.

@GhostofGoes
Last active November 22, 2021 11:47
Show Gist options
  • Save GhostofGoes/e049e1cad17428194a3d8adaaaa7b392 to your computer and use it in GitHub Desktop.
Save GhostofGoes/e049e1cad17428194a3d8adaaaa7b392 to your computer and use it in GitHub Desktop.
Various useful snippets of Python code I've picked up over the years. Python 3 is assumed unless stated otherwise.
#!/usr/bin/env python3
"""
Sources
* https://github.com/GhostofGoes/ADLES
"""
import logging
import logging.handlers
import sys
import os
# Credit to: http://stackoverflow.com/a/15707426/2214380
def time_execution(func):
"""
Function decorator to time the execution of a function and log to debug.
:param func: The function to time execution of
:return: The decorated function
:rtype: func
"""
from timeit import default_timer
def wrapper(*args, **kwargs):
start_time = default_timer()
ret = func(*args, **kwargs)
end_time = default_timer()
logging.debug("Elapsed time for %s: %f seconds", func.__name__,
float(end_time - start_time))
return ret
return wrapper
def read_json(filename):
"""
Reads input from a JSON file and returns the contents.
:param str filename: Path to JSON file to read
:return: Contents of the JSON file
:rtype: dict or None
"""
from json import load
try:
with open(filename) as json_file:
return load(fp=json_file)
except ValueError as message:
logging.error("Syntax Error in JSON file '%s': %s",
filename, str(message))
return None
except Exception as message:
logging.critical("Could not open JSON file '%s': %s",
filename, str(message))
return None
def user_input(prompt, obj_name, func):
"""
Continually prompts a user for input until the specified object is found.
:param str prompt: Prompt to bother user with
:param str obj_name: Name of the type of the object that we seek
:param func: The function that shalt be called to discover the object
:return: The discovered object and it's human name
:rtype: tuple(vimtype, str)
"""
while True:
item_name = str(input(prompt))
item = func(item_name)
if item:
logging.info("Found %s: %s", obj_name, item.name)
return item, item_name
else:
print("Couldn't find a %s with name %s. Perhaps try another? "
% (obj_name, item_name))
# NOTE: python-humanfriendly provides this functionality in a much cleaner and extensible way
# Based on: http://code.activestate.com/recipes/577058/
def ask_question(question, default="no"):
"""
Prompts user to answer a question.
>>> ask_question("Do you like the color yellow?")
Do you like the color yellow? [y/N]
:param str question: Question to ask
:param str default: No
:return: True/False
:rtype: bool
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
choice = ''
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("Invalid default answer: '%s'", default)
while True:
choice = str(input(question + prompt)).lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes' or 'no' or 'y' or 'n'")
def setup_logging(filename, colors=True, console_verbose=False,
server=('localhost', 514)):
"""
Configures the logging interface used by everything for output.
:param str filename: Name of file that logs should be saved to
:param bool colors: Color the terminal output
:param bool console_verbose: Print DEBUG logs to terminal
:param server: SysLog server to forward logs to
:type server: tuple(str, int)
"""
# Prepend spaces to separate logs from previous runs
with open(filename, 'a') as logfile:
logfile.write(2 * '\n')
# Format log output so it's human readable yet verbose
base_format = "%(asctime)s %(levelname)-8s %(name)-7s %(message)s"
time_format = "%H:%M:%S" # %Y-%m-%d
formatter = logging.Formatter(fmt=base_format, datefmt=time_format)
# Configures the base logger to append to a file
logging.basicConfig(level=logging.DEBUG, format=base_format,
datefmt=time_format, filename=filename, filemode='a')
# Get the global root logger
# Handlers added to this will propagate to all loggers
logger = logging.root
# Configure logging to a SysLog server
syslog = logging.handlers.SysLogHandler(address=server)
syslog.setLevel(logging.DEBUG)
syslog.setFormatter(formatter)
logger.addHandler(syslog)
logging.debug("Configured logging to SysLog server %s:%s",
server[0], str(server[1]))
# Record system information to aid in auditing and debugging
# We do this before configuring console output to reduce verbosity
from getpass import getuser
from os import getcwd
from platform import python_version, system, release, node
from datetime import date
from adles import __version__ as adles_version
logging.debug("Initialized logging, saving logs to %s", filename)
logging.debug("Date %s", str(date.today()))
logging.debug("OS %s", str(system() + " " + release()))
logging.debug("Hostname %s", str(node()))
logging.debug("Username %s", str(getuser()))
logging.debug("Directory %s", str(getcwd()))
logging.debug("Python version %s", str(python_version()))
logging.debug("Adles version %s", str(adles_version))
# If any of the libraries we're using have warnings, capture them
logging.captureWarnings(capture=True)
# Configure console output
console = logging.StreamHandler(stream=sys.stdout)
if colors: # Colored console output
try:
from colorlog import ColoredFormatter
formatter = ColoredFormatter(fmt="%(log_color)s" + base_format,
datefmt=time_format, reset=True)
logging.debug("Configured COLORED console logging output")
except ImportError:
logging.error("Colorlog is not installed. "
"Using STANDARD console output...")
else: # Bland console output
logging.debug("Configured STANDARD console logging output")
console.setFormatter(formatter)
console.setLevel((logging.DEBUG if console_verbose else logging.INFO))
logger.addHandler(console)
# PyYAML Reference: http://pyyaml.org/wiki/PyYAMLDocumentation
from yaml import load, YAMLError
try: # Attempt to use C-based YAML parser if it's available
from yaml import CLoader as Loader
except ImportError: # Fallback to using pure Python YAML parser
from yaml import Loader
def parse_yaml(filename):
"""
Parses a YAML file and returns a nested dictionary containing its contents.
:param str filename: Name of YAML file to parse
:return: Parsed file contents
:rtype: dict or None
"""
try:
# Enables use of stdin if '-' is specified
with sys.stdin if filename == '-' else open(filename) as f:
try:
# Parses the YAML file into a dict
return load(f, Loader=Loader)
except YAMLError as exc:
logging.critical("Could not parse YAML file %s", filename)
if hasattr(exc, 'problem_mark'):
# Tell user exactly where the syntax error is
mark = exc.problem_mark
logging.error("Error position: (%s:%s)",
mark.line + 1, mark.column + 1)
else:
logging.error("Error: %s", exc)
return None
except FileNotFoundError:
logging.critical("Could not find YAML file for parsing: %s", filename)
return None
# Class-based command line scripts.
from abc import ABC, abstractmethod
from distutils.version import StrictVersion
import functools
import logging
@functools.total_ordering
class Script(ABC):
"""Base class for all CLI scripts."""
__version__ = '0.1.0'
name = ''
def __init__(self):
self._log = logging.getLogger(self.name)
self._log.debug("Script name %s", self.name)
self._log.debug("Script version %s", self.__version__)
self._log.info(
'\n***** YOU RUN THIS SCRIPT AT YOUR OWN RISK *****\n'
'\n** Help and Documentation **'
'\n+ "<script> --help": flags, arguments, and usage'
'\n+ Read the latest documentation : https://adles.readthedocs.io'
'\n+ Open an issue on GitHub : %s'
'\n+ Email the script author : %s'
'\n', __url__, __email__)
@classmethod
def get_ver(cls):
return cls.name.capitalize() + ' ' + cls.__version__
@abstractmethod
def run(self):
pass
def __str__(self):
return self.__doc__
def __repr__(self):
return self.get_ver()
def __hash__(self):
return hash(repr(self))
def __eq__(self, other):
return self.name == other.name \
and self.__version__ == other.__version__
def __gt__(self, other):
return StrictVersion(self.__version__) > StrictVersion(other.__version__)
# TQDM handler for the logging module
try:
import tqdm
TQDM = True
class TqdmHandler(logging.StreamHandler):
def __init__(self, level=logging.NOTSET):
super(self.__class__, self).__init__(level)
def emit(self, record):
try:
msg = self.format(record)
tqdm.tqdm.write(msg)
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
except ImportError:
TQDM = False
def pad(value, length=2):
"""
Adds leading and trailing zeros to value ("pads" the value).
>>> pad(5)
05
>>> pad(9, 3)
009
:param int value: integer value to pad
:param int length: Length to pad to
:return: string of padded value
:rtype: str
"""
return "{0:0>{width}}".format(value, width=length)
# Source: https://stackoverflow.com/a/37573701/2214380
def download_file(url, filename, extension=""):
"""
Download a file from the Internet.
:param str url: URL to retrieve from
:param str filename: Name of the file to be created
:param str extension: Extension of the file
"""
download = requests.get(url, stream=True)
total_size = int(download.headers.get("content-length", 0))
with open(filename + extension, "wb") as handle:
for data in tqdm(download.iter_content(), total=total_size,
unit='B', unit_scale=True):
handle.write(data)
# http://stackoverflow.com/questions/1094841/
def sizeof_fmt(num):
"""
Generates the human-readable version of a file size.
>>> sizeof_fmt(512)
512bytes
>>> sizeof_fmt(2048)
2KB
:param float num: Robot-readable file size in bytes
:return: Human-readable file size
:rtype: str
"""
for item in ['bytes', 'KB', 'MB', 'GB']:
if num < 1024.0:
return "%3.1f%s" % (num, item)
num /= 1024.0
return "%3.1f%s" % (num, 'TB')
# Up to date list of network/MAC methods here: https://gist.github.com/GhostofGoes/0a8e82930e75afcefbd879a825ba4c26
# Get MAC address of remote hosts
def arpreq_ip(ip):
# type: (str) -> Optional[str]
import arpreq
return arpreq.arpreq('192.168.1.1')
def scapy_ip(ip):
# type: (str) -> str
"""Requires root permissions on POSIX platforms.
Windows does not have this limitation."""
from scapy.layers.l2 import getmacbyip
return getmacbyip(ip)
# Get MAC address of a local interfaces
def psutil_iface(iface):
# type: (str) -> Optional[str]
import psutil
nics = psutil.net_if_addrs()
if iface in nics:
nic = nics[iface]
for i in nic:
if i.family == psutil.AF_LINK:
return i.address
def netifaces_iface(iface):
# type: (str) -> str
import netifaces
return netifaces.ifaddresses(iface)[netifaces.AF_LINK][0]['addr']
def scapy_iface(iface):
# type: (str) -> str
from scapy.layers.l2 import get_if_hwaddr
if WINDOWS:
from scapy.arch.windows import get_windows_if_list
interfaces = get_windows_if_list()
for i in interfaces:
if any(iface in i[x] for x in
['name', 'netid', 'description', 'win_index']):
return i['mac']
# WARNING: Do not put an 'else' here!
return get_if_hwaddr(iface)
# Determine the default interface for a system
def netifaces_default_gateway():
# type: () -> str
import netifaces
return list(netifaces.gateways()['default'].values())[0][1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment