Last active
June 21, 2022 18:08
-
-
Save rapidcow/a0490a57965061ae06e5c43b2c97e46c to your computer and use it in GitHub Desktop.
The original code for the psp library --- basicproc.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Basic processing of the backup file.""" | |
# Docstrings use the conversion from numpy: | |
# https://numpydoc.readthedocs.io/en/latest/format.html | |
__all__ = [ | |
'Panel', 'Entry', 'InvalidEntryError', 'backup_get_attributes', | |
'backup_get_data', 'backup_get_description', 'data_check_duplicates', | |
'data_get_by_date', 'format_bytes', 'BYTES_TENS_UNITS', | |
'BYTES_TWOS_UNITS', | |
] | |
# XXX: Metadata of the same file? Do we have to copy-and-paste them | |
# EVERY time? | |
import base64 | |
import datetime as dt | |
import glob | |
import logging | |
import logging.handlers | |
import os | |
import re | |
import sys | |
import textwrap | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
# BASE_DIR = os.path.realpath(os.path.dirname(__file__)) | |
BASE_DIR = os.getcwd() | |
def _get_path(p): | |
"""Return p appended with BASE_DIR.""" | |
# Similar to how `os.path.abspath` works: | |
# https://docs.python.org/3/library/os.path.html#os.path.abspath | |
return os.path.normpath(os.path.join(BASE_DIR, p)) | |
# Create a file handler --- in the same directory as this script. | |
fh = logging.handlers.TimedRotatingFileHandler( | |
os.path.splitext(__file__)[0] + '.log', | |
when='d', backupCount=7, | |
) | |
fh.setLevel(logging.DEBUG) | |
fh.setFormatter(logging.Formatter('%(asctime)s %(name)s: ' | |
'%(levelname)-8s %(message)s')) | |
logger.addHandler(fh) | |
# Create a console handler | |
ch = logging.StreamHandler(sys.stderr) | |
ch.setLevel(logging.WARNING) | |
ch.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) | |
logger.addHandler(ch) | |
del ch, fh | |
# Inference (XXX: what for?) | |
# * From one of the extensions, the type can be inferred. | |
# * From one of the names, the extension can be created---sorry, inferred. | |
# (aside from no extension or .txt) | |
# * From one of the types, the encoding can be inferred. | |
_FILE_TYPES = [ | |
# '+' for binary, '-' for text | |
# General text type | |
['-plain', ['.txt'], None], | |
# General binary type. | |
['+binary', None, None], | |
# Common plain text formats | |
['-markdown', ['.md', '.markdown'], ['md']], | |
['-html', ['.html'], None], | |
['-css', ['.css'], None], | |
# Programming languages | |
['-python', ['.py'], None], | |
['-c', ['.c'], None], | |
['-c++', ['.cc', '.cpp'], None], | |
['-java', ['.java'], None], | |
['-javascript', ['.js'], None], | |
['-perl', ['.pl'], None], | |
# LaTeX is not a type! It's a format just like XeLaTeX and LuaTeX | |
# (and all of them share the same extension anyway) | |
['-tex', ['.tex', '.sty', '.cls', '.dtx'], None], | |
['-xml', ['.xml'], None], | |
['-json', ['.json'], None], | |
['-yaml', ['.yaml', '.yml'], ['yml']], | |
['+zip', ['.zip'], None], | |
# Image types | |
['+png', ['.png'], None], | |
['+jpeg', ['.jpg', '.jpeg'], ['jpg']], | |
['+tiff', ['.tiff'], None], | |
['+heic', ['.heic'], None], | |
# Video | |
['+mp4', ['.mp4'], None], | |
['+mov', ['.mov'], None], | |
['+wmv', ['.wmv'], None], | |
['+avi', ['.avi'], None], | |
# Audio | |
['+mp3', ['.mp3'], None], | |
['+flac', ['.flac'], None], | |
['+wav', ['.wav'], None], | |
['+m4a', ['.m4a'], None], | |
['+aiff', ['.aiff'], None], | |
['+midi', ['.midi'], None], | |
# Miscellaneous | |
['+pdf', ['.pdf'], None], | |
['+musescore', ['.mscz'], ['musescore_compressed']], | |
['-musescore_uncompressed', ['.mscx'], None] | |
] | |
TEXT_TYPES = set() | |
BINARY_TYPES = set() | |
TYPE_ALIASES = {} | |
TYPE_EXTENSIONS = {} | |
_seen_types = set() | |
_seen_aliases = set() | |
_seen_extensions = set() | |
# set and element, and a name for raising exceptions | |
def _sadd(s, s2, elem, name): | |
if elem in s: | |
logger.warning('duplicate %s at module initialization: %r', | |
name, elem) | |
return | |
elif elem in s2: | |
logger.warning('duplicate %s with conflicting type at ' | |
'module initialization: %r', name, elem) | |
return | |
s.add(elem) | |
def _dadd(d, key, value, name): | |
if key in d: | |
logger.warning('duplicate %s at module initialization: %r', | |
name, key) | |
d[key] = value | |
for t, e, a in _FILE_TYPES: | |
tt = t[1:] | |
if t[0] == '-': | |
_sadd(TEXT_TYPES, BINARY_TYPES, tt, 'binary type') | |
elif t[0] == '+': | |
_sadd(BINARY_TYPES, TEXT_TYPES, tt, 'text type') | |
else: | |
raise RuntimeError(f"{t!r} doesn't start with '-' or '+'") | |
if a is not None: | |
for alias in a: | |
_dadd(TYPE_ALIASES, alias, tt, 'alias') | |
if e is not None: | |
for ext in e: | |
_dadd(TYPE_EXTENSIONS, ext, tt, 'type') | |
del _FILE_TYPES, _sadd, _dadd, _seen_types, _seen_aliases, _seen_extensions | |
del t, e, a, alias, ext | |
# Determine platform thingies that strftime might not work in | |
# Single digit version | |
_SD_H = '%-H' | |
_SD_I = '%-I' | |
_SD_d = '%-d' | |
# Space-padded version | |
_SP_H = '%k' | |
_SP_I = '%l' | |
# Space-padded day '%e' seems to work just fine, so I'm not gonna put it here. | |
import platform | |
pf = platform.system() | |
logger.info('detected platform: %s', pf) | |
if pf == 'Windows': | |
_SD_H = _SP_H = '%H' | |
_SD_I = _SP_I = '%I' | |
_SD_d = '%d' | |
del platform, pf | |
# Exceptions...? | |
class InvalidEntryError(ValueError): | |
pass | |
class Panel: | |
"""Panel containing entries for a single day. | |
Parameters | |
---------- | |
panel : dict | |
A dict loaded from a JSON object of the panel. In `backup.json` | |
this is any of the objects within the "data" key. | |
This can be extracted from a backup dict using `backup_get_data`. | |
attrs : dict | |
Extracted by `backup_get_attributes`, this contains information that | |
acts as global options in `backup.json`. | |
width : int, default 80 | |
Maximum length of lines for the printed text. | |
Since this works by calling `textwrap.TextWrapper`, this | |
breaks if trying to format a paragraph with a word longer than | |
this integer. | |
""" | |
__slots__ = ( | |
'date', 'entries', '_width', '_wrapper', | |
'_indent', | |
# lookup path, | |
'paths', | |
# attributes ('tz', 'rating') | |
'attrs', | |
) | |
def __init__(self, panel, attrs, width=80): | |
# Default values | |
self.attrs = dict(tz=None, rating=None) | |
self.paths = ['.'] | |
self._set(attrs) | |
self.width = width | |
self._indent = 0 | |
self._wrapper = textwrap.TextWrapper() | |
self._process(panel) | |
@property | |
def wrapper(self): | |
return self._wrapper | |
@wrapper.setter | |
def wrapper(self, value): | |
self._wrapper = value | |
@property | |
def width(self): | |
return self._width | |
@width.setter | |
def width(self, value): | |
if not isinstance(value, int): | |
raise TypeError(f'width must be an int, not {value!r}') | |
self._width = max(1, value) | |
def _set(self, dct): | |
# Set attributes inherited from the outside | |
# Keep track of the attributes we've checked | |
remaining = set(dct.keys()) | |
try: | |
self.attrs['tz'] = dct['tz'] | |
except KeyError: | |
pass | |
else: | |
remaining.remove('tz') | |
# Update lookup paths according to the top-level configurations | |
if 'paths' in dct: | |
paths = dct['paths'] | |
if not isinstance(paths, list): | |
raise TypeError("'paths' should be a list, not {!r}" | |
.format(type(paths).__name__())) | |
if not all(isinstance(item, str) for item in paths): | |
raise TypeError("'paths' should contain only str") | |
# self.paths.extend(paths) | |
self.paths = paths + self.paths | |
remaining.remove('paths') | |
if remaining: | |
# Make it possible to reproduce the same exception | |
raise InvalidEntryError('unrecognized keys: ' | |
+ ', '.join(repr(key) for key in | |
sorted(remaining))) | |
def _process(self, panel): | |
# Process the attributes set within the panel dictionary | |
remaining = set(panel.keys()) | |
# Required field | |
# -------------- | |
# Date | |
try: | |
date = panel['date'] | |
except KeyError: | |
raise InvalidEntryError('panel must provide date') | |
self.date = dt.date.fromisoformat(date) | |
remaining.remove('date') | |
# Optional fields | |
# --------------- | |
# Update lookups path according to the panel attribute | |
if 'paths' in panel: | |
paths = panel['paths'] | |
# I copy-and-pasted this | |
if not isinstance(paths, list): | |
raise TypeError("'paths' should be a list, not {!r}" | |
.format(type(paths).__name__())) | |
if not all(isinstance(item, str) for item in list): | |
raise TypeError("'paths' should contain only str") | |
# self.paths.extend(paths) | |
self.paths = paths + self.paths | |
remaining.remove('paths') | |
if 'tz' in panel: | |
self.attrs['tz'] = panel['tz'] | |
remaining.remove('tz') | |
elif self.attrs['tz'] is None: | |
raise InvalidEntryError('time zone not provided') | |
if 'rating' in panel: | |
self.attrs['rating'] = panel['rating'] | |
remaining.remove('rating') | |
# Entries | |
# ------- | |
try: | |
_entries = panel['entries'] | |
except KeyError: | |
_entries = [] | |
else: | |
remaining.remove('entries') | |
# Check them before we load in the entries | |
if remaining: | |
# Make it possible to reproduce the same exception | |
raise InvalidEntryError('unrecognized keys: ' | |
+ ', '.join(repr(key) for key in | |
sorted(remaining))) | |
self.entries = entries = [] | |
for entry in _entries: | |
entries.append(Entry(entry, self)) | |
def _center_line(self, line, char=' ', rstrip=True): | |
"""Center 'line' with 'char' as padding to 'self.width'. | |
If 'rstrip' is True, call 'str.rstrip' on every line. | |
""" | |
if not self._indent: | |
if rstrip: | |
s_meth = str.rstrip | |
else: | |
def s_meth(self): | |
return self | |
# 5% of horizontal padding | |
width = self._wrapper.width | |
pad = round(0.05 * width) | |
if width - len(line) >= 2 * pad: | |
return s_meth('{:{}^{}}'.format(line, char, width)) | |
else: | |
self._wrapper.width -= 2 * pad | |
lines = self._wrapper.wrap(line) | |
assert len(lines) > 1, ('_center_line: excepted more than ' | |
'one line') | |
self._wrapper.width += 2 * pad | |
return '\n'.join( | |
s_meth('{:{}^{}}'.format(line, char, width)) | |
for line in lines | |
) | |
indent = self._indent | |
logger.debug('i am centering a line with indent') | |
logger.debug('self._indent = %r', self._indent) | |
logger.debug('self._wrapper.width = %r', self._wrapper.width) | |
try: | |
self._indent = 0 | |
result = ''.join( | |
' ' * indent + line | |
for line in self._center_line(line, char, rstrip) | |
.splitlines(keepends=True) | |
) | |
finally: | |
self._indent = indent | |
return result | |
def _wrap_paragraph(self, par, prefix='', indent=0): | |
# Internal function. Assuming that width has already been set, | |
# wrap a paragraph into lines, obeying the current indentation. | |
whole_indent = self._indent + indent | |
indent_str = ' ' * whole_indent | |
if prefix: | |
try: | |
self._wrapper.width -= indent + len(prefix) | |
par_lines = self._wrapper.wrap(par) | |
finally: | |
self._wrapper.width += indent + len(prefix) | |
lines = [] | |
for line in par_lines: | |
lines.append(indent_str + prefix + line) | |
prefix = ' ' * len(prefix) | |
return lines | |
else: | |
try: | |
self._wrapper.width -= indent | |
par_lines = self._wrapper.wrap(par) | |
finally: | |
self._wrapper.width += indent | |
return [indent_str + line for line in par_lines] | |
def __repr__(self): | |
buf = [f'<{type(self).__name__} object on {self.date:%a %b %e %Y}'] | |
if self.attrs['tz']: | |
buf.append(f' ({self.attrs["tz"]})') | |
buf.append('>') | |
return ''.join(buf) | |
def to_string(self, show_entry_count=False, as_timezone=None, | |
show_timezone=None): | |
# show_timezone should control the time zone in the title and | |
# also in each individual entry | |
# to compare the time zone of the panel and the entry | |
this_timezone = as_timezone or _get_tzinfo(self.attrs['tz']) | |
self._wrapper.width = self.width - self._indent | |
buf = [] | |
main_entries, insight_entries = self.get_entries() | |
main_entries.sort(key=lambda e: e.date_time) | |
insight_entries.sort(key=lambda e: e.date_time) | |
mlen, ilen, elen = (len(main_entries), len(insight_entries), | |
len(self.entries)) | |
date_str = self.date.strftime(f'%A, %B {_SD_d}, %Y') | |
if self.attrs['rating'] is not None: | |
title = '{} {}'.format(date_str, self.attrs['rating']) | |
else: | |
title = date_str | |
buf.append(self._center_line(title)) | |
buf.append('\n') | |
if show_timezone: | |
# XXX: What the fuck??? How is... how are we supposed to print | |
# the title for this entire panel? The time offset could | |
# literally change! | |
if as_timezone: | |
tz_string = _get_tz_string(dt.datetime.min | |
.replace(tzinfo=as_timezone)) | |
else: | |
tz_string = ('UTC' if self.attrs['tz'] in {'+00:00', '-00:00'} | |
else f'UTC{self.attrs["tz"]}') | |
buf.append(self._center_line(f'[time zone: {tz_string}]')) | |
buf.append('\n') | |
if show_entry_count: | |
buf.append('\n') | |
wrapped = self._wrap_paragraph( | |
''.join([f'This day has {mlen} main ', | |
f'entry' if mlen == 1 else 'entries', | |
f' and {ilen} insight ', | |
f'entry' if ilen == 1 else 'entries', | |
f'.']), | |
prefix='* ' | |
) | |
for line in wrapped: | |
buf.extend([line, '\n']) | |
if main_entries: | |
buf.append('\n') | |
not_first = False | |
for ent in main_entries: | |
if not_first: | |
buf.append('\n') | |
show_this_timezone = bool( | |
show_timezone and | |
this_timezone != ent.date_time.tzinfo | |
) | |
buf.append(ent.to_string(as_timezone=as_timezone, | |
show_timezone=show_this_timezone)) | |
buf.append('\n') | |
not_first = True | |
# Insights | |
if insight_entries: | |
buf.append('\n') | |
if main_entries: | |
buf.append('\n') | |
heading = 'Insight' if ilen == 1 else 'Insights' | |
buf.append(self._get_line(heading)) | |
buf.append(self._get_line(len(heading) * '-')) | |
buf.append('\n') | |
not_first = False | |
for ent in insight_entries: | |
if not_first: | |
buf.append('\n') | |
show_this_timezone = bool( | |
show_timezone and | |
this_timezone != _get_tzinfo(self.attrs['tz']) | |
) | |
buf.append(ent.to_string(as_timezone=as_timezone, | |
show_timezone=show_this_timezone)) | |
buf.append('\n') | |
not_first = True | |
# Bound to be a spurious line feed character; remove it. | |
buf.pop() | |
return ''.join(buf) | |
# when calling this functions don't include line feed characters!! | |
def _get_line(self, line): | |
return (' ' * self._indent) + line + '\n' | |
def get_entries(self): | |
"""Return a tuple of main entries and insight entries.""" | |
main_entries = [] | |
insight_entries = [] | |
for ent in self.entries: | |
if ent.insight: | |
insight_entries.append(ent) | |
else: | |
main_entries.append(ent) | |
return main_entries, insight_entries | |
@property | |
def indent(self): | |
return self._indent | |
@indent.setter | |
def indent(self, value): | |
# Validate the value | |
if not isinstance(value, int): | |
raise TypeError(f'indent must be an int, not {value!r}') | |
if value <= 0: | |
self._indent = 0 | |
self._wrapper.width = self.width | |
if self.width <= value: | |
self._indent = self.width - 1 | |
self._wrapper.width = 1 | |
else: | |
self._indent = value | |
self._wrapper.width = self.width - value | |
class Entry: | |
"""An entry belonging to a panel. | |
Parameters | |
---------- | |
entry : dict | |
A dict loaded from a JSON object of the entry. In | |
`backup.json` this is any of the objects in the "entries" | |
key of a panel. | |
panel : Panel object | |
The panel that the entry to be created belongs to. | |
load_file : bool, default False | |
Whether to load file that are specified "input" into memory. | |
strict : bool, default True | |
Whether to validate the entry at the end of initialization. | |
""" | |
__slots__ = ( | |
# i promoted 'insight' as that is quite an important | |
# attribute imo | |
'date_time', 'panel', '_wrapper', '_indent', 'insight', | |
# a dictionary containing 'type', 'encoding', 'raw', | |
# 'source', 'caption' (the file name) | |
'data', | |
# attrs like ('question') | |
'attrs', | |
) | |
def __init__(self, entry, panel, load_file=False, strict=True): | |
# input is whether to input from file (may be good for like | |
# saving memory) | |
self.panel = panel | |
self.insight = False | |
# Default values | |
# (Encoding will be set by self._process_data) | |
self.data = dict(type=None, format=None, encoding=None, caption=None) | |
self.attrs = dict(question=None) | |
self._wrapper = self.panel._wrapper | |
self._process(entry, load_file=load_file) | |
if strict: | |
self._validate() | |
@property | |
def width(self): | |
return self.panel.width | |
@width.setter | |
def width(self, value): | |
self.panel.width = value | |
def _process(self, entry, load_file): | |
# Process options, optionally inheriting from the panel. | |
remaining = set(entry.keys()) | |
# Required fields | |
# --------------- | |
# XXX: No validation... or is it good this way? | |
if 'date-time' in entry: | |
date_time = entry['date-time'] | |
match = re.match(r'^(.+?)[ T](.+?)$', date_time) | |
if match: | |
date_str, time_str = match.groups() | |
else: | |
raise InvalidEntryError("invalid 'date-time' field in entry") | |
remaining.remove('date-time') | |
elif 'time' in entry: | |
time_str = entry['time'] | |
if 'date' in entry: | |
date_str = entry['date'] | |
remaining.remove('date') | |
else: | |
date_str = self.panel.date.isoformat() | |
remaining.remove('time') | |
else: | |
raise InvalidEntryError("entry must provide time, either through " | |
"the key 'time' or 'date-time'") | |
# Time zone optionally (will be stored as a datetime object) | |
if 'tz' in entry: | |
tz = entry['tz'] | |
remaining.remove('tz') | |
else: | |
tz = self.panel.attrs['tz'] | |
if tz is None: | |
raise ValueError('time zone must be provided') | |
self.date_time = dt.datetime.fromisoformat( | |
'{}T{}{}'.format(date_str, time_str, tz) | |
) | |
# Process 'type' and one of 'data' and 'input' | |
self._process_data(entry, remaining, load_file) | |
# Optional attributes | |
# ------------------- | |
if 'meta' in entry: | |
meta = entry['meta'] | |
if not isinstance(meta, dict): | |
raise TypeError(f"'meta' should be a dict") | |
self.data['meta'] = meta.copy() | |
remaining.remove('meta') | |
# Process regardless... because we will have important stuff in the | |
# metadata. | |
# (XXX: What are the important stuff??) | |
self._process_metadata() | |
if 'caption' in entry: | |
caption = entry['caption'] | |
if not isinstance(caption, str): | |
raise TypeError(f"'caption' should be a str") | |
self.data['caption'] = caption | |
remaining.remove('caption') | |
# Same code, copy-and-pasted from self._process_data (except | |
# we do not encode question into a bytes object) | |
if 'question' in entry: | |
question = entry['question'] | |
if isinstance(question, list): | |
try: | |
self.attrs['question'] = ''.join(question) | |
except TypeError: | |
raise TypeError('question should be a list of str') \ | |
from None | |
elif isinstance(question, str): | |
self.attrs['question'] = question | |
else: | |
raise TypeError(f"'question' should be str or list of str, " | |
f"not {type(question).__name__!r}") | |
remaining.remove('question') | |
if 'insight' in entry: | |
self.insight = entry['insight'] | |
remaining.remove('insight') | |
if remaining: | |
# Make it possible to reproduce the same exception | |
raise InvalidEntryError('unrecognized keys: ' | |
+ ', '.join(repr(key) for key in | |
sorted(remaining))) | |
def _process_data(self, entry, remaining, load_file): | |
if 'type-format' in entry: | |
type_, fmt = entry['type-format'].split('-') | |
self.set_type(type_) | |
self.set_format(fmt) | |
remaining.remove('type-format') | |
else: | |
if 'type' in entry: | |
self.set_type(entry['type']) | |
remaining.remove('type') | |
if 'format' in entry: | |
self.set_format(entry['format']) | |
remaining.remove('format') | |
# Get default encoding (very sketchily) | |
if 'encoding' in entry: | |
encoding = entry['encoding'] | |
if not isinstance(encoding, str): | |
raise TypeError(f"'encoding' in entry should be a str, " | |
f"not {type(encoding).__name__!r}") | |
self.data['encoding'] = encoding | |
remaining.remove('encoding') | |
# We will infer the encoding later (specifically when we use | |
# data-encoding or load from a file). | |
# Some trivial validations | |
if 'data' in entry and 'input' in entry: | |
raise InvalidEntryError("only one of 'data' and 'input' can be " | |
"specified") | |
elif not ('data' in entry or 'input' in entry): | |
raise InvalidEntryError("at least one of 'data' and 'input' " | |
"should be specified") | |
# Split off into two cases | |
enc = self.data['encoding'] | |
if 'data' in entry: | |
if 'data-encoding' in entry: | |
# The encoding serves a different purpose for when | |
# data-encoding is supplied | |
data_enc = entry['data-encoding'] | |
self.data['raw'] = self._decode_data(entry['data'], data_enc) | |
remaining.remove('data-encoding') | |
# Sets the encoding (and type potentially) | |
logger.debug('My current type is %r', self.data['type']) | |
# Set the default type, checking for encoding provided. | |
if self.data['type'] is None: | |
if enc is None or enc == 'binary': | |
self.data['type'] = 'binary' | |
else: | |
self.data['type'] = 'plain' | |
self._infer_encoding_from_type() | |
else: | |
# We must not mock other encodings when loading from Python | |
# strings (which are innately encoded with utf-8) | |
if not (enc is None or enc == 'utf-8'): | |
logger.warning( | |
"'_process_data': encoding %r treated as 'utf-8'", | |
self.data['encoding'] | |
) | |
enc = self.data['encoding'] = 'utf-8' | |
# Set the default type | |
if self.data['type'] is None: | |
self.data['type'] = 'plain' | |
data = entry['data'] | |
if isinstance(data, list): | |
try: | |
string = ''.join(data) | |
except TypeError: | |
raise TypeError("'data' should be a list of str") \ | |
from None | |
elif isinstance(data, str): | |
string = data | |
else: | |
raise TypeError(f"'data' should be str or a list of str, " | |
f"not {type(data).__name__!r}") | |
# 'enc' is set to 'utf-8' anyway, so no validation needed. | |
self.data['raw'] = string.encode(enc) | |
remaining.remove('data') | |
else: | |
assert 'input' in entry, 'did the validations above fail???' | |
inp = entry['input'] | |
if not isinstance(inp, str): | |
raise TypeError(f"'input' should be a str, not " | |
f"{type(inp).__name__!r}") | |
# if '.' != self.panel.paths[0]: | |
# logger.critical("'paths' does not contain '.' as the " | |
# "zeroth item") | |
candidates = [] | |
for path in self.panel.paths: | |
# logger.debug('i am checking %r', path) | |
fullpath = _get_path(path) | |
# Some basic validation | |
if '*' not in path: | |
if not os.path.exists(fullpath): | |
logger.warning("non-existent path in 'paths': %r", | |
path) | |
continue | |
if not os.path.isdir(fullpath): | |
logger.warning("non-directory path in 'paths': %r", | |
path) | |
continue | |
# Tell `glob.glob` that this should be a directory | |
if not fullpath.endswith(os.sep): | |
fullpath += os.sep | |
for dirname in glob.iglob(fullpath): | |
# logger.debug(' - %r', dirname) | |
filepath = os.path.normpath(os.path.join(dirname, inp)) | |
# Check for file existence and validity as a file | |
# (or as the file a symbolic link points to) | |
if os.path.isfile(filepath): | |
candidates.append(filepath) | |
if not candidates: | |
raise FileNotFoundError(f"'input': {inp!r} not found") | |
filepath = candidates[0] | |
if len(candidates) > 1: | |
logger.warning(f"multiple files for 'input' {inp!r}; " | |
f"using the first match {filepath!r}") | |
# Respect symbolic links | |
self.data['source'] = os.path.abspath(filepath) | |
self._infer_type_from_input(inp) | |
# Set the default type, checking for encoding provided. | |
if self.data['type'] is None: | |
if enc is None or enc == 'binary': | |
self.data['type'] = 'binary' | |
else: | |
self.data['type'] = 'plain' | |
self._infer_encoding_from_type() | |
if load_file: | |
self.load_data() | |
remaining.remove('input') | |
def _process_metadata(self): | |
if 'meta' not in self.data: | |
return | |
meta = self.data['meta'] | |
# Tolerance for cases when second is not provided | |
time = self.date_time | |
if time.second == 0 and time.microsecond == 0: | |
tolerance = dt.timedelta(seconds=60) | |
else: | |
tolerance = dt.timedelta(0) | |
if 'posted' in meta: | |
posted = dt.datetime.fromisoformat(meta['posted']) | |
if (posted.tzinfo is None | |
or posted.tzinfo.utcoffset(posted) is None): | |
posted = posted.replace(tzinfo=time.tzinfo) | |
if posted < time: | |
raise InvalidEntryError('posted time of file earlier than ' | |
'the time of entry') | |
meta['posted'] = posted | |
else: | |
meta['posted'] = time | |
if 'created' in meta: | |
created = dt.datetime.fromisoformat(meta['created']) | |
# Check if naive: https://docs.python.org/3/library/datetime.html | |
# #determining-if-an-object-is-aware-or-naive | |
# (The conditions given are for aware objects, in which case | |
# we just flip them around by De Morgan's law) | |
if (created.tzinfo is None | |
or created.tzinfo.utcoffset(created) is None): | |
created = created.replace(tzinfo=time.tzinfo) | |
if created > meta['posted'] + tolerance: | |
raise InvalidEntryError('creation time of file later than ' | |
'the posted time') | |
meta['created'] = created | |
# Description: can be any string | |
meta.setdefault('desc') | |
# NSFW: sexual stuff or just stuff that might scare you (like gore) | |
meta.setdefault('nsfw', False) | |
def _decode_data(self, data, enc): | |
if enc == 'base16': | |
return base64.b16decode(data) | |
elif enc == 'base32': | |
return base64.b32decode(data) | |
elif enc == 'base64': | |
return base64.b64decode(data) | |
elif enc == 'ascii85': | |
return base64.a85decode(data) | |
elif enc == 'base85': | |
return base64.b85decode(data) | |
else: | |
raise InvalidEntryError(f'unrecognized data encoding: ' | |
f'{enc!r}') | |
def _infer_type_from_input(self, input_path): | |
if self.data['type'] is not None: | |
return | |
_, ext = os.path.splitext(input_path) | |
# No extension or a trailing dot. | |
if len(ext) <= 1: | |
return | |
try: | |
type_ = TYPE_EXTENSIONS[ext] | |
except KeyError: | |
logger.warning('_infer_type_from_input: unable to infer type ' | |
'from extension %r', ext) | |
else: | |
# logger.debug('_infer_type_from_input: inferred type %r', type_) | |
self.data['type'] = type_ | |
def _infer_encoding_from_type(self): | |
if self.data['encoding'] is None: | |
if self.data['type'] in TEXT_TYPES: | |
self.data['encoding'] = 'utf-8' | |
else: | |
self.data['encoding'] = 'binary' | |
def _infer_extension_from_type(self): | |
# Either we loaded from the backup file itself or just... | |
# I don't know, maybe some weird-ass file with no extension at all? | |
raise NotImplementedError('i don\'t know uwu') | |
# TODO: Set file extension | |
# jpg: jpg, otherwise stuff... | |
# "musescore" -> "mscz" | |
# "musescore-uncompressed" (essentially xml) -> "mscx" | |
del _infer_extension_from_type | |
def _validate(self): | |
midnight = dt.time(tzinfo=_get_tzinfo(self.panel.attrs['tz'])) | |
start_of_day = dt.datetime.combine(self.panel.date, midnight) | |
time = self.date_time | |
if time < start_of_day: | |
raise InvalidEntryError('entry earlier than 0:00 of the panel') | |
if self.insight: | |
if time - dt.timedelta(days=2) < start_of_day: | |
raise InvalidEntryError('insight within the first 48 hours') | |
def load_data(self): | |
if 'source' not in self.data: | |
raise ValueError("Source path not found in 'data'") | |
try: | |
with open(self.data['source'], 'rb') as fp: | |
content = fp.read() | |
except (FileNotFoundError, IsADirectoryError): | |
raise # I don't know what to do | |
self.data['raw'] = content | |
return content | |
def get_data_size(self): | |
"""Get size of data without necessarily loading the | |
unloaded file. | |
""" | |
if 'raw' in self.data: | |
return len(self.data['raw']) | |
try: | |
return os.path.getsize(self.data['source']) | |
except KeyError: | |
raise RuntimeError( | |
"'raw' and 'source' are both undefined" | |
) from None | |
def is_binary(self): | |
return self.data['encoding'] == 'binary' | |
# Setter and getter (for we can't determine what's important enough for | |
# a property yet) | |
def get_text(self): | |
if self.is_binary(): | |
raise TypeError('cannot retrieve text of entry with binary data') | |
if 'raw' not in self.data: | |
self.load_data() | |
return self.data['raw'].decode(self.data['encoding']) | |
def set_type(self, typ): | |
if typ in TYPE_ALIASES: | |
typ = TYPE_ALIASES[typ] | |
self.data['type'] = typ | |
def set_format(self, fmt): | |
self.data['format'] = fmt | |
def __repr__(self): | |
buf = [f'<{type(self).__name__} object at ' | |
f'{self.date_time:%a %b %e %H:%M %Y}'] | |
if self.date_time.tzinfo is not None: | |
buf.append(f' ({_get_tz_string(self.date_time)})') | |
buf.append('>') | |
return ''.join(buf) | |
def to_string(self, *, indent=2, label_insight=False, long_format=False, | |
as_timezone=None, show_timezone=None): | |
# In case we were called directly and not from Panel, | |
# we should definitely set the width. | |
# | |
# XXX: But is this too sketchy?? | |
self._indent = self.panel._indent | |
self._wrapper.width = self.width - self._indent # width of panel | |
buf = [] | |
if as_timezone is None: | |
as_timezone = _get_tzinfo(self.panel.attrs['tz']) | |
# as_timezone is not provided... so make show_timezone | |
# False by default. | |
if show_timezone is None: | |
show_timezone = False | |
else: | |
# as_timezone is provided! We want show_timezone to be | |
# True by default. | |
if show_timezone is None: | |
show_timezone = True | |
# Only convert the datetime if we're NOT showing the time zone | |
if not show_timezone: | |
conv_date_time = self.date_time.astimezone(as_timezone) | |
else: | |
conv_date_time = self.date_time | |
if long_format: | |
date_time_string = conv_date_time.strftime( | |
'%b %e, %Y{}{}' | |
.format(self._DATE_TIME_SEP, self.TIME_FORMAT_2) | |
) | |
else: | |
date_time_string = self._get_date_time_string(conv_date_time) | |
words = [] | |
# buf.append(' ' * self._indent) | |
# buf.append(date_time_string) | |
# sneak in some nice non-breaking spaces | |
# words.append(date_time_string.replace(' ', '\xa0')) | |
words.append(date_time_string) | |
# Handle time zones | |
if show_timezone: | |
words.append(' [{}]'.format(_get_tz_string(self.date_time))) | |
# if not (self.is_binary() or self.data['type'] == 'plain'): | |
# if label_insight and self.insight: | |
# buf.append(f' (insight, {self.data["type"]})') | |
# else: | |
# buf.append(f' ({self.data["type"]})') | |
# elif label_insight and self.insight: | |
# buf.append(' (insight)') | |
# Some additional stuff to be put in parentheses | |
time_attrs = [] | |
if label_insight and self.insight: | |
time_attrs.append('insight') | |
if not (self.is_binary() or self.data['type'] == 'plain'): | |
time_attrs.append(self.data['type']) | |
if time_attrs: | |
words.append(' ({})'.format(', '.join(time_attrs))) | |
wrapped = self._wrap_paragraph(''.join(words)) | |
for line in wrapped: | |
buf.extend([line, '\n']) | |
# The main content | |
try: | |
self.indent += indent | |
# Check for question | |
if self.attrs['question'] is not None: | |
question_prefix = '(Q) ' | |
wrapped = self._wrap_paragraph(self.attrs['question'], | |
prefix=question_prefix) | |
for line in wrapped: | |
buf.extend([line, '\n']) | |
# Add some space after question | |
buf.append('\n') | |
if self.is_binary(): | |
data_type = self.data['type'] | |
data_size = format_bytes(self.get_data_size()) | |
if 'source' in self.data: | |
data_src = self.data['source'] | |
# Make the path somewhat shorter | |
if os.path.samefile( | |
os.path.commonpath([BASE_DIR, data_src]), | |
BASE_DIR | |
): | |
data_src = os.path.relpath(data_src, start=BASE_DIR) | |
wrapped = self._wrap_paragraph( | |
'<{} file sized {} at {!r}>' | |
.format(data_type, data_size, data_src) | |
) | |
else: | |
wrapped = self._wrap_paragraph( | |
'<{} data sized {}>' | |
.format(data_type, data_size) | |
) | |
for line in wrapped: | |
buf.extend([line, '\n']) | |
else: | |
lines = self.get_text().splitlines() | |
for line in lines: | |
if not line.strip(): | |
buf.append('\n') | |
else: | |
# Indent either with the question prefix (defined | |
# beforehand) or with no indent at all (default) | |
wrapped = self._wrap_paragraph(line) | |
for ln in wrapped: | |
buf.extend([ln, '\n']) | |
if self.data['caption'] is not None: | |
buf.append('\n') | |
caption_prefix = 'Caption: ' | |
# Because we are indenting each paragraph, some deed must be | |
# done to caption_prefix to give off an illusion of | |
# indentation... | |
for line in self.data['caption'].splitlines(): | |
if not line.strip(): | |
buf.append('\n') | |
else: | |
wrapped = self._wrap_paragraph( | |
line, prefix=caption_prefix | |
) | |
for ln in wrapped: | |
buf.extend([ln, '\n']) | |
caption_prefix = ' ' * len(caption_prefix) | |
finally: | |
self.indent -= indent | |
buf.pop() | |
return ''.join(buf) | |
_DATE_TIME_SEP = ' ' | |
# 12-hour clock format | |
TIME_FORMAT = f'{_SP_I}:%M %p' | |
TIME_FORMAT_2 = f'{_SD_I}:%M %p' | |
# 24-hour clock format | |
# TIME_FORMAT = f'{_SP_H}:%M' | |
# TIME_FORMAT_2 = f'{_SD_H}:%M' | |
# Internal function used by to_string() | |
def _get_date_time_string(self, date_time): | |
panel_date = self.panel.date | |
if panel_date.year == date_time.year: | |
if panel_date == date_time.date(): | |
fmt = self.TIME_FORMAT_2 | |
else: | |
fmt = '%b %e{}{}'.format(self._DATE_TIME_SEP, self.TIME_FORMAT) | |
else: | |
fmt = '%b %e, %Y{}{}'.format(self._DATE_TIME_SEP, self.TIME_FORMAT) | |
return date_time.strftime(fmt) | |
def _wrap_paragraph(self, *args, **kwargs): | |
"""Calls `self.panel._wrap_paragraph`.""" | |
# XXX: I fucking hate this code | |
return type(self.panel)._wrap_paragraph(self, *args, **kwargs) | |
@property | |
def indent(self): | |
return self._indent | |
@indent.setter | |
def indent(self, value): | |
# Validate the value | |
if not isinstance(value, int): | |
raise TypeError(f'indent must be an int, not {value!r}') | |
if value <= 0: | |
self._indent = 0 | |
self._wrapper.width = self.width | |
if self.width <= value: | |
self._indent = self.width - 1 | |
self._wrapper.width = 1 | |
else: | |
self._indent = value | |
self._wrapper.width = self.width - value | |
# A 'backup' class is not really needed, as there is only one dictionary | |
# that stores all the information. Here are just some scattered "methods." | |
# | |
# (please dont ask me why abbreviation is 'bk' and not 'bu'... | |
# it just looks nicer ok) | |
def backup_get_description(bk) -> str: | |
"""Extract the description 'desc' from backup file. | |
The description should be a list of strings. | |
""" | |
try: | |
desc = bk['desc'] | |
except KeyError: | |
return None | |
if not isinstance(desc, list): | |
raise TypeError("'desc' must be a list of strings") | |
try: | |
return ''.join(desc) | |
except TypeError as exc: | |
raise TypeError("Possible mingled type in 'desc' " | |
"(expected a list of strings)") from exc | |
def backup_get_data(bk) -> list: | |
"""Extract the 'data' from backup file, if it exists.""" | |
try: | |
data = bk['data'] | |
except KeyError: | |
return [] | |
if not isinstance(data, list): | |
raise TypeError("'data' must be a list (JSON array)") | |
return data | |
def backup_get_attributes(bk) -> dict: | |
"""Extract attributes, i.e. key-value pairs aside from | |
'data' and 'desc'. | |
""" | |
_ignored = {'data', 'desc'} | |
return {k: v for k, v in bk.items() if k not in _ignored} | |
# Dealing with list of panels | |
def data_check_duplicates(data) -> set: | |
"""Check for duplicates and returns the set of all dates.""" | |
dates = set() | |
for panel in data: | |
if 'date' in panel: | |
n = len(dates) | |
date_str = panel['date'] # Save for debug | |
date = dt.date.fromisoformat(date_str) | |
dates.add(date) | |
if n == len(dates): | |
raise ValueError(f'duplicate date: {date_str!r}') | |
# If they don't have a date, it's not a panel...? | |
return dates | |
def data_get_by_date(data, date, *, duplicates_ok=False) -> dict: | |
"""Extract a panel with a certain date. | |
Parameters | |
---------- | |
data : dict | |
The data as extracted by backup_get_data(). | |
date : datetime.date or str object | |
The date of the panel to be extracted. Raise LookupError if it is | |
not found. | |
Keyword Arguments | |
----------------- | |
duplicates_ok : bool, default False | |
If True, panels with the same date are ignored, and only the first | |
search is returned. If not, raise LookupError when multiple panels | |
with the same date are found. | |
""" | |
if isinstance(date, dt.date): | |
date_str = date.isoformat() | |
elif isinstance(date, str): | |
# Validate the string. | |
date_str = dt.date.fromisoformat(date).isoformat() | |
else: | |
raise TypeError(f'expected datetime.date or str object, got {date!r}') | |
match = [panel for panel in data if panel['date'] == date_str] | |
if not match: | |
raise LookupError(f'{date!r}') | |
if len(match) > 1: | |
if not duplicates_ok: | |
raise ValueError(f'duplicate date: {date_str}') | |
return match | |
return match[0] | |
# Miscellaneous functions | |
def default_bytes_formatter(x): | |
# x: float -> s: string | |
if x < 100: | |
return format(x, '.3g') | |
else: | |
return format(x, '.1f') | |
def format_bytes(size, unit='tens', sep=' ', | |
formatter=default_bytes_formatter): | |
units = ['B'] | |
# Multiplier. (Think of the unit as an embodiment of the multiplier.) | |
mult = 1 | |
if unit == 'tens': | |
units.extend(BYTES_TENS_UNITS) | |
# Increase by powers of 10**3 and see if it falls in the range | |
# [mult, mult * 10**3). | |
for suffix in units: | |
if mult <= size < mult * 1000: | |
return formatter(size / mult) + sep + suffix | |
mult *= 1000 | |
# No need to worry about 'suffix' being undefined since 'unit' has | |
# at least one item. | |
return formatter(size / mult) + sep + suffix | |
elif unit == 'twos': | |
units.extend(BYTES_TWOS_UNITS) | |
# Increase by powers of 2**10 and see if it falls in the range | |
# [mult, mult * 2**10). Bit shifting is used because... computers. | |
for suffix in units: | |
if mult <= size < mult << 10: | |
return formatter(size / mult) + sep + suffix | |
mult <<= 10 | |
return formatter(size / mult) + sep + suffix | |
else: | |
raise ValueError("'unit' must be either 'tens' or 'twos'") | |
# Only up to terrabyte (TB) is actually used. Anything above that | |
# would be unrealistic. | |
BYTES_TENS_UNITS = [ | |
'kB', 'MB', 'GB', 'TB', | |
# 'PB', 'EB', 'ZB', 'YB', | |
] | |
BYTES_TWOS_UNITS = [ | |
'KiB', 'MiB', 'GiB', 'TiB', | |
# 'PiB', 'EiB', 'ZiB', 'YiB', | |
] | |
# Miscellaneous functions | |
def _get_tz_string(date_time): | |
"""Convert a `datetime.datetime` object to a string | |
(like 'UTC' or 'UTC+08:00') representing its time zone. | |
""" | |
return date_time.tzinfo.tzname(date_time) | |
def _get_tzinfo(tzname): | |
"""Convert an offset string to a string (like '+00:00' | |
or '+08:00') to a `datetime.tzinfo` object, using the | |
class method `datetime.datetime.fromisoformat`. | |
""" | |
dummy = dt.datetime.fromisoformat(f'0001-01-01T00:00{tzname}') | |
return dummy.tzinfo | |
def main(): | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('filename', nargs='?', default='backup.json', | |
help='file path of the backup file') | |
parser.add_argument('date', type=dt.date.fromisoformat, | |
help='date of the panel to load') | |
parser.add_argument('--enc', '-e', metavar='ENCODING', default='utf-8', | |
help='encoding for opening the file') | |
parser.add_argument('--width', '-w', type=int, | |
help='width of the printed panel') | |
parser.add_argument('--check', '-c', action='count', default=0, | |
help=('further checking for duplicate entries and ' | |
'proper order (-c for warning and -cc for ' | |
'exception)')) | |
args = parser.parse_args() | |
if args.width is None: | |
import shutil | |
width, _ = shutil.get_terminal_size() | |
else: | |
width = args.width | |
do_check = args.check >= 1 | |
do_error = args.check >= 2 | |
warnings = None | |
def _warn_or_raise(warning, msg, cause=None): | |
nonlocal warnings | |
if warnings is None: | |
import warnings | |
if do_error: | |
raise warning(msg) from cause | |
else: | |
warnings.warn(msg, warning, 2) | |
if cause is not None: | |
import traceback | |
traceback.print_exception(type(cause), cause, | |
cause.__traceback__) | |
import json | |
with open(args.filename, encoding=args.enc) as fp: | |
bk = json.load(fp) | |
attrs = backup_get_attributes(bk) | |
data = backup_get_data(bk) | |
if do_check: | |
try: | |
data_check_duplicates(data) | |
except ValueError as exc: | |
# len('duplicate date: ') equals 16 | |
dupe_date = exc.args[0][16:] | |
_warn_or_raise(RuntimeWarning, f'duplicate date: {dupe_date}') | |
def _format_insight(insight): | |
if insight: | |
return 'an insight entry' | |
return 'a main entry' | |
last_panel = None | |
for i, panel_dict in enumerate(data): | |
try: | |
panel = Panel(panel_dict, attrs) | |
except (ValueError, TypeError) as exc: | |
panel_name = (repr(panel_dict['date']) | |
if 'date' in panel_dict else f'#{i}') | |
msg = f'error occured while loading panel {panel_name}' | |
_warn_or_raise(RuntimeWarning, msg, exc) | |
else: | |
# Panel order | |
if last_panel is not None and last_panel.date > panel.date: | |
_warn_or_raise(RuntimeWarning, | |
f'inconsistent order in panels: ' | |
f'({panel.date} precedes ' | |
f'{panel.date})') | |
# XXX: Comment this out | |
assert last_panel is None or last_panel.date < panel.date | |
# Entry order | |
main_entries = [] | |
insight_entries = [] | |
# Some criteria to look for: | |
# | |
# 1. Ideally the entries should be main then insight. | |
# | |
# Entry then main is okay... although in Perspective it's | |
# never like that. We'll issue a different warning for | |
# that. | |
# | |
# 2. Main entries and insight entries should be in | |
# chronological order. The check is similar the check | |
# for panels above. | |
# | |
# To implement the 1st criterion, we use a boolean | |
# 'has_switched' that is True when we see a change from | |
# main entry to insight entry. | |
has_switched = False | |
expected_insight_value = None | |
last_main_entry = None | |
last_insight_entry = None | |
for i, entry in enumerate(panel.entries): | |
if expected_insight_value is None: | |
expected_insight_value = entry.insight | |
# Checking main -> insight order | |
if expected_insight_value != entry.insight: | |
if has_switched: | |
expected = _format_insight(expected_insight_value) | |
got = _format_insight(entry.insight) | |
msg = (f'expected entry {i} to be {expected}, ' | |
f'got {got} (on {panel.date})') | |
_warn_or_raise(RuntimeWarning, msg) | |
else: | |
has_switched = True | |
expected_insight_value = entry.insight | |
# Checking main entry order | |
if last_main_entry is not None and not entry.insight: | |
if last_main_entry.date_time > entry.date_time: | |
msg = (f'inconsistent order in main entries ' | |
f'on {panel.date} (entry {i} precedes ' | |
f'entry {i - 1})') | |
_warn_or_raise(RuntimeWarning, msg) | |
# Checking insight entry order | |
if last_insight_entry is not None and entry.insight: | |
if last_insight_entry.date_time > entry.date_time: | |
msg = (f'inconsistent order in insight entries ' | |
f'on {panel.date} (entry {i} precedes ' | |
f'entry {i - 1})') | |
_warn_or_raise(RuntimeWarning, msg) | |
if entry.insight: | |
last_insight_entry = entry | |
else: | |
last_main_entry = entry | |
last_panel = panel | |
panel_dict = data_get_by_date(data, args.date, duplicates_ok=True) | |
panel = Panel(panel_dict, attrs, width=width) | |
print(panel.to_string()) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Install the basicproc module.""" | |
from distutils.core import setup | |
# from os.path import dirname, join | |
# def read(path): | |
# with open(join(dirname(__file__), path)) as fp: | |
# return fp.read() | |
setup( | |
name='basicproc', | |
version='0.1.0', | |
description='Basic processing for Perspective backup files', | |
# long_description=read('README.md'), | |
author='rapidcow', | |
author_email='thegentlecow@gmail.com', | |
py_modules=['basicproc'], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment