Skip to content

Instantly share code, notes, and snippets.

@AlmightyOatmeal
Last active February 9, 2024 16:26
Show Gist options
  • Save AlmightyOatmeal/ce0592ca6759d55b73acb59d07dee1af to your computer and use it in GitHub Desktop.
Save AlmightyOatmeal/ce0592ca6759d55b73acb59d07dee1af to your computer and use it in GitHub Desktop.
mod_jamie
"""Welcome to the tomorrow of yesterday.
"""
import logging
import urllib3
# Disable `InsecureRequestWarning: Unverified HTTPS request is being made to host '...' ...` messages.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ^ Alternatively, one can set the environmental variable:
# PYTHONWARNINGS="ignore:Unverified HTTPS request"
urllib3.disable_warnings()
#
# Add trace-level logging.
#
log_level_trace = logging.DEBUG - 5
logging.addLevelName(log_level_trace, 'TRACE', )
def trace(self, message, *args, **kws):
"""Trace-level logging method that will be appended to the `logging.Logger()` class."""
# Yes, logger takes its '*args' as 'args'.
self._log(log_level_trace, message, args, **kws)
logging.Logger.trace = trace
logging = logging.getLogger(__name__)
import logging
logger = logging.getLogger(__name__)
def to_csv(data, header=None):
"""Quick-n-Dirty CSV blob generator
:param data: List of dictionaries.
:type data: list
:param header: (optional) List of strings of header columns to use in specific order when generating a CSV. These
will be appended to the `date` column by default.
:type header: list or None
:return: Concatenated CSV string blob
:rtype: str
"""
csv_lines = []
headers = []
# Build the header if an override is present otherwise derive one from the data.
if header:
for h in header:
if h in headers:
continue
headers.append(h)
else:
# Annoyingly pre-iterate over everything to ensure we have ALL columns from ALL dictionaries in the array.
for line in data:
for key in line.keys():
if key in headers:
continue
headers.append(key)
# Add the header line to the CSV
csv_lines.append('"{}"'.format('","'.join(headers)))
# Add actual data to the CSV
for line in data:
# `line.get(i, '')`
# If no value is present then use an empty string as a placeholder rather than a string of "None" which
# can be confusing and cause excess file bloat.
# `.replace('\"', '\'')`
# Convert value-based quotes as to not conflict with CSV-level quotes to ensure it can be parsed properly.
csv_lines.append('"{}"'.format('","'.join([str(line.get(i, '')).replace('\"', '\'') for i in headers])))
# Concatenate the resulting array separating by newlines
return '\n'.join(csv_lines)
import logging
logger = logging.getLogger(__name__)
def list_slicer(data, size):
"""List slicer
:param data: List of values to be sliced.
:type data: list
:param size: How many items each list should contain.
:type size: int
:return: Generator providing a list of items at, or less, than the specified size.
:rtype: list
"""
for i in range(0, len(data), size):
yield data[i:i + size]
def list_dedupe(data):
"""Deduplicate a list using hashtables.
:param data: List to be deduplicated.
:type data: list
:return: List free of duplicates.
:rtype: list
"""
return list(dict.fromkeys(data))
import logging
logger = logging.getLogger(__name__)
def round_decimal(value, decimal_places):
"""Rounds decimal values to given number of decimal places.
:param value: Numeric value to round.
:type value: float or int
:param decimal_places: Number of decimal places to round to.
:type decimal_places: int
:return: Rounded value.
:rtype: float
"""
# String formatting is actually a little faster than using `round()`, that's kinda neat!
return float(f'{value:.{decimal_places}f}')
# return round(float(value), decimal_places)
import datetime
import decimal
import json
import logging
import re
logger = logging.getLogger(__name__)
class CustomJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that does things that shouldn't need to be done."""
def default(self, obj):
"""Overrides the default serialization of JSONEncoder then calls the JSONEncoder default() method.
:param obj: Object to serialize.
:type obj: object
:return: json.JSONEncoder.default() object.
:rtype: instance
"""
try:
if isinstance(obj, (datetime.datetime, datetime.time, datetime.date)):
return obj.isoformat()
if isinstance(obj, decimal.Decimal):
s = str(obj)
if '.' in s:
return float(s)
else:
return int(s)
iterable = iter(obj)
except TypeError:
pass
else:
return list(iterable)
return json.JSONEncoder.default(self, obj)
def json_string_hook(obj):
"""JSON deserializer helper to ensure values are converted to strings instead of native datatypes due
to data inconsistencies.
Current behavior:
- Convert all non-iterable values to strings.
- Exclude values where the key contains the word 'date'.
:param obj: json.loads() dict
:type obj: dict
:return: Updated dictionary
:rtype: dict
"""
obj_d = dict(obj)
# return {k: str(v) if isinstance(v, bool) else v for k, v in obj_d.items()}
return {k: str(v) if 'date' not in str(k).lower() and not hasattr(v, '__iter__') else v for k, v in obj_d.items()}
def json_pretty(data, encoder=CustomJSONEncoder):
"""Converts Python dict or list/set/array objects to a pretty-printed JSON string.
:param data: Python iter object like dict, list, set, array, tuple, etc.
:type data: dict, list, set, array, tuple
:param encoder: (optional) Custom JSON encoder class that's an extension of `json.JSONEncoder`.
(default: CustomJSONEncoder)
:type encoder: json.JSONEncoder
:return: Pretty-printed JSON string.
:rtype: str
"""
return json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '), ensure_ascii=True, cls=encoder)
def json_min(data, encoder=CustomJSONEncoder):
"""Converts Python dict or list/set/array objects to a minified JSON string.
:param data: Python iter object like dict, list, set, array, tuple, etc.
:type data: dict, list, set, array, tuple
:param encoder: (optional) Custom JSON encoder class that's an extension of `json.JSONEncoder`.
(default: CustomJSONEncoder)
:type encoder: json.JSONEncoder
:return: Minified JSON string.
:rtype: str
"""
return json.dumps(data, separators=(',', ":"), cls=encoder)
def fix_broken_json(input_str):
"""Fix broken JSON quotes...
:param input_str: Broken JSON string.
:type input_str: str
:return: Fixed JSON string
:rtype: str
"""
# <MOVED INSIDE FUNCTION>
# Move this stuff out of the function because it doesn't need to be assigned and compiled every. single. time.
# It's just here for the sake of organization and keeping things somewhat organized.
# Use sets `()` instead of lists `[]` because of set hashtables which improves performance over lists. Set's need
# to have more than one value so some may have a comma without an additional value just to appease the Python gods.
expected_chars = {
"[": (",", "]"),
"]": ("[", ","),
"{": (":",),
"}": (",", "{", "]"),
":": (",", "}"),
",": (":", "{", "}", "[", "]"),
}
double_quote = '"'
# Backslash needs to be escaped otherwise Python thinks it's escaping the single quote.
backslash = '\\'
# Precompile the regular expression (which is why it's better outside of this function)
regex_nonwhite = re.compile(r'\S')
# </MOVED INSIDE FUNCTION>
output_str = ''
in_string = False
prev = None
prev_nonwhite_nonquote = None
# Iterate over string, letter by letter, with character position.
for char_pos, char in enumerate(input_str):
if char is double_quote and prev is not backslash:
if in_string:
# If we're already inside a quoted string and if the next non-whitespace character is an expected one,
# then we have exited the quoted string. Otherwise, escape the quote.
nonwhite_char = regex_nonwhite.search(input_str, pos=char_pos+1).group()
if nonwhite_char in expected_chars.get(prev_nonwhite_nonquote, ''):
in_string = False
else:
output_str += backslash
else:
in_string = True
elif not in_string and char.strip():
# Previous non-whitespace non-quote character.
prev_nonwhite_nonquote = char
# Add character to the output string.
output_str += char
prev = char
return output_str
def sub_json_parser(obj):
"""Try to parse JSON values from a dictionary or list of dictionaries.
NOTE: This does not recursively go through and try to parse; this is currently setup to only do the root level
for things such as JSON stored in a database table.
:param obj: Dictionary or list of dictionaries.
:type obj: dict or list
:return: dict or list
:rtype: dict or list
"""
# If it's a list of dictionaries then iterate and pass each dict into this function
if isinstance(obj, (list, set, tuple)):
# Create a new list of results because you can't manipulate a list you're iterating over.
new_obj = []
for i in obj:
# E.T. phone home...
new_obj.append(
sub_json_parser(i)
)
return new_obj
# If it's a dict then iterate over the keys and values
elif isinstance(obj, dict):
# Create a new dictionary object because you can't edit dicts or lists while iterating over them without
# causing state inconsistencies.
new_obj = {}
for k, v in obj.items():
# If the value is not already a string then keep the original value and move on.
if not isinstance(v, str):
new_obj[k] = v
continue
# If there is a curly brace in there then assume it might be JSON.
elif '{' in v:
# Try to parse the JSON as-is
try:
new_obj[k] = json.loads(v)
except Exception as err:
# Since the JSON might be broken then try to run it through the `fix_broken_json()` function
try:
new_obj[k] = json.loads(fix_broken_json(v))
except Exception as errrrrrrr:
logger.debug(f'Unable to fix broken json key={k}, value={v}')
# If the JSON can't be fixed then keep the original value and move on.
new_obj[k] = v
else:
# Catch-all
new_obj[k] = v
return new_obj
# If it's not a list, set, tuple, or dict, then return the object untouched.
else:
return obj
import logging
import re
from lxml import etree
logger = logging.getLogger(__name__)
def lxml_to_dict_with_arrays(element):
"""Converts a lxml.objectify.ObjectifiedElement object to dictionary for JSON conversion.
:param element: LXML objectified element.
:type element: lxml.objectify.ObjectifiedElement
:return: Dictionary
:rtype: dict
"""
# <CAN BE MOVED OUT OF FUNCTION>
# For the sake of convenience it's within the function but can be moved out so the regex isn't compiled repeatedly.
regex_namespace = re.compile('{.*}')
# </CAN BE MOVED OUT OF FUNCTION>
# Make sure that we are iterating over the root of a document so we can get the root's children; maybe try .iter() instead?
if hasattr(element, 'getroot'):
element = element.getroot()
result = {}
if not element.getchildren():
tag = regex_namespace.sub('', element.tag)
result[tag] = element.text
else:
for elem in element.getchildren():
sub_dict = lxml_to_dict_with_arrays(elem)
tag = regex_namespace.sub('', element.tag)
sub_tag = regex_namespace.sub('', elem.tag)
if result.get(tag):
# If the same child tag appears more than once, convert it to a list.
if sub_tag in result[tag]:
# No need to redundantly have the sub element tag name in there since it's already at the root
# of the array
if sub_tag in sub_dict:
sub_dict = sub_dict[sub_tag]
# Append the child dictionary to the existing array or convert the existing structure to a list
# containing the old dictionary AND the new child dictionary.
if isinstance(result[tag][sub_tag], list):
result[tag][sub_tag].append(sub_dict)
else:
result[tag][sub_tag] = [result[tag][sub_tag], sub_dict]
else:
# If no child already exists, append the child dictionary to an existing list or update the
# existing dictionary accordingly.
if isinstance(result[tag], list):
result[tag].append(sub_dict)
else:
result[tag].update(sub_dict)
else:
result[tag] = sub_dict
return result
def xml_pretty(data, indent_level=4):
"""Converts LXML object to a pretty-printed XML string.
:param data: Object to be prettified.
:type data: lxml.objectify.ObjectifiedElement
:param indent_level: (optional) How many spaces should elements be indented (default: 4)
:type indent_level: int
:return: Pretty-printed string, oooooh-aaaaahh.
:rtype: str
"""
etree.indent(data, space=' '*indent_level)
return etree.tostring(data, pretty_print=True, method='xml', xml_declaration=True, encoding='utf-8').decode('utf-8')
def xml_minify(data):
"""Converts LXML object to a minified XML string.
:param data: Object to be minified.
:type data: lxml.objectify.ObjectifiedElement
:return: Minified string.
:rtype: str
"""
return etree.tostring(data, pretty_print=False, method='xml', xml_declaration=True, encoding='utf-8').decode('utf-8')
def xml_remove_namespaces(data):
"""Remove namespaces from elements.
:param data: Parsed document to be cleaned.
:type data: lxml.objectify.ObjectifiedElement
:return: Cleaned document.
:rtype: lxml.objectify.ObjectifiedElement
"""
for elem in data.getiterator():
# Skip comments and processing instructions because they do not have names
if isinstance(elem, (etree._Comment, etree._ProcessingInstruction)):
continue
# Remove a namespace URI in the element's name
elem.tag = etree.QName(elem).localname
# Remove namespaces from attribute's names too
for attr_name in elem.attrib:
local_attr_name = etree.QName(attr_name).localname
if attr_name != local_attr_name:
attr_value = elem.attrib[attr_name]
del elem.attrib[attr_name]
elem.attrib[local_attr_name] = attr_value
# Finally cleanup unused namespaces
etree.cleanup_namespaces(data)
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment