Skip to content

Instantly share code, notes, and snippets.

@rapidcow
Last active June 21, 2022 18:08
Show Gist options
  • Save rapidcow/a0490a57965061ae06e5c43b2c97e46c to your computer and use it in GitHub Desktop.
Save rapidcow/a0490a57965061ae06e5c43b2c97e46c to your computer and use it in GitHub Desktop.
The original code for the psp library --- basicproc.py
"""Basic processing of the backup file."""
# Docstrings use the conversion from numpy:
# https://numpydoc.readthedocs.io/en/latest/format.html
__all__ = [
'Panel', 'Entry', 'InvalidEntryError', 'backup_get_attributes',
'backup_get_data', 'backup_get_description', 'data_check_duplicates',
'data_get_by_date', 'format_bytes', 'BYTES_TENS_UNITS',
'BYTES_TWOS_UNITS',
]
# XXX: Metadata of the same file? Do we have to copy-and-paste them
# EVERY time?
import base64
import datetime as dt
import glob
import logging
import logging.handlers
import os
import re
import sys
import textwrap
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# BASE_DIR = os.path.realpath(os.path.dirname(__file__))
BASE_DIR = os.getcwd()
def _get_path(p):
"""Return p appended with BASE_DIR."""
# Similar to how `os.path.abspath` works:
# https://docs.python.org/3/library/os.path.html#os.path.abspath
return os.path.normpath(os.path.join(BASE_DIR, p))
# Create a file handler --- in the same directory as this script.
fh = logging.handlers.TimedRotatingFileHandler(
os.path.splitext(__file__)[0] + '.log',
when='d', backupCount=7,
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('%(asctime)s %(name)s: '
'%(levelname)-8s %(message)s'))
logger.addHandler(fh)
# Create a console handler
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.WARNING)
ch.setFormatter(logging.Formatter('[%(levelname)s] %(message)s'))
logger.addHandler(ch)
del ch, fh
# Inference (XXX: what for?)
# * From one of the extensions, the type can be inferred.
# * From one of the names, the extension can be created---sorry, inferred.
# (aside from no extension or .txt)
# * From one of the types, the encoding can be inferred.
_FILE_TYPES = [
# '+' for binary, '-' for text
# General text type
['-plain', ['.txt'], None],
# General binary type.
['+binary', None, None],
# Common plain text formats
['-markdown', ['.md', '.markdown'], ['md']],
['-html', ['.html'], None],
['-css', ['.css'], None],
# Programming languages
['-python', ['.py'], None],
['-c', ['.c'], None],
['-c++', ['.cc', '.cpp'], None],
['-java', ['.java'], None],
['-javascript', ['.js'], None],
['-perl', ['.pl'], None],
# LaTeX is not a type! It's a format just like XeLaTeX and LuaTeX
# (and all of them share the same extension anyway)
['-tex', ['.tex', '.sty', '.cls', '.dtx'], None],
['-xml', ['.xml'], None],
['-json', ['.json'], None],
['-yaml', ['.yaml', '.yml'], ['yml']],
['+zip', ['.zip'], None],
# Image types
['+png', ['.png'], None],
['+jpeg', ['.jpg', '.jpeg'], ['jpg']],
['+tiff', ['.tiff'], None],
['+heic', ['.heic'], None],
# Video
['+mp4', ['.mp4'], None],
['+mov', ['.mov'], None],
['+wmv', ['.wmv'], None],
['+avi', ['.avi'], None],
# Audio
['+mp3', ['.mp3'], None],
['+flac', ['.flac'], None],
['+wav', ['.wav'], None],
['+m4a', ['.m4a'], None],
['+aiff', ['.aiff'], None],
['+midi', ['.midi'], None],
# Miscellaneous
['+pdf', ['.pdf'], None],
['+musescore', ['.mscz'], ['musescore_compressed']],
['-musescore_uncompressed', ['.mscx'], None]
]
TEXT_TYPES = set()
BINARY_TYPES = set()
TYPE_ALIASES = {}
TYPE_EXTENSIONS = {}
_seen_types = set()
_seen_aliases = set()
_seen_extensions = set()
# set and element, and a name for raising exceptions
def _sadd(s, s2, elem, name):
if elem in s:
logger.warning('duplicate %s at module initialization: %r',
name, elem)
return
elif elem in s2:
logger.warning('duplicate %s with conflicting type at '
'module initialization: %r', name, elem)
return
s.add(elem)
def _dadd(d, key, value, name):
if key in d:
logger.warning('duplicate %s at module initialization: %r',
name, key)
d[key] = value
for t, e, a in _FILE_TYPES:
tt = t[1:]
if t[0] == '-':
_sadd(TEXT_TYPES, BINARY_TYPES, tt, 'binary type')
elif t[0] == '+':
_sadd(BINARY_TYPES, TEXT_TYPES, tt, 'text type')
else:
raise RuntimeError(f"{t!r} doesn't start with '-' or '+'")
if a is not None:
for alias in a:
_dadd(TYPE_ALIASES, alias, tt, 'alias')
if e is not None:
for ext in e:
_dadd(TYPE_EXTENSIONS, ext, tt, 'type')
del _FILE_TYPES, _sadd, _dadd, _seen_types, _seen_aliases, _seen_extensions
del t, e, a, alias, ext
# Determine platform thingies that strftime might not work in
# Single digit version
_SD_H = '%-H'
_SD_I = '%-I'
_SD_d = '%-d'
# Space-padded version
_SP_H = '%k'
_SP_I = '%l'
# Space-padded day '%e' seems to work just fine, so I'm not gonna put it here.
import platform
pf = platform.system()
logger.info('detected platform: %s', pf)
if pf == 'Windows':
_SD_H = _SP_H = '%H'
_SD_I = _SP_I = '%I'
_SD_d = '%d'
del platform, pf
# Exceptions...?
class InvalidEntryError(ValueError):
pass
class Panel:
"""Panel containing entries for a single day.
Parameters
----------
panel : dict
A dict loaded from a JSON object of the panel. In `backup.json`
this is any of the objects within the "data" key.
This can be extracted from a backup dict using `backup_get_data`.
attrs : dict
Extracted by `backup_get_attributes`, this contains information that
acts as global options in `backup.json`.
width : int, default 80
Maximum length of lines for the printed text.
Since this works by calling `textwrap.TextWrapper`, this
breaks if trying to format a paragraph with a word longer than
this integer.
"""
__slots__ = (
'date', 'entries', '_width', '_wrapper',
'_indent',
# lookup path,
'paths',
# attributes ('tz', 'rating')
'attrs',
)
def __init__(self, panel, attrs, width=80):
# Default values
self.attrs = dict(tz=None, rating=None)
self.paths = ['.']
self._set(attrs)
self.width = width
self._indent = 0
self._wrapper = textwrap.TextWrapper()
self._process(panel)
@property
def wrapper(self):
return self._wrapper
@wrapper.setter
def wrapper(self, value):
self._wrapper = value
@property
def width(self):
return self._width
@width.setter
def width(self, value):
if not isinstance(value, int):
raise TypeError(f'width must be an int, not {value!r}')
self._width = max(1, value)
def _set(self, dct):
# Set attributes inherited from the outside
# Keep track of the attributes we've checked
remaining = set(dct.keys())
try:
self.attrs['tz'] = dct['tz']
except KeyError:
pass
else:
remaining.remove('tz')
# Update lookup paths according to the top-level configurations
if 'paths' in dct:
paths = dct['paths']
if not isinstance(paths, list):
raise TypeError("'paths' should be a list, not {!r}"
.format(type(paths).__name__()))
if not all(isinstance(item, str) for item in paths):
raise TypeError("'paths' should contain only str")
# self.paths.extend(paths)
self.paths = paths + self.paths
remaining.remove('paths')
if remaining:
# Make it possible to reproduce the same exception
raise InvalidEntryError('unrecognized keys: '
+ ', '.join(repr(key) for key in
sorted(remaining)))
def _process(self, panel):
# Process the attributes set within the panel dictionary
remaining = set(panel.keys())
# Required field
# --------------
# Date
try:
date = panel['date']
except KeyError:
raise InvalidEntryError('panel must provide date')
self.date = dt.date.fromisoformat(date)
remaining.remove('date')
# Optional fields
# ---------------
# Update lookups path according to the panel attribute
if 'paths' in panel:
paths = panel['paths']
# I copy-and-pasted this
if not isinstance(paths, list):
raise TypeError("'paths' should be a list, not {!r}"
.format(type(paths).__name__()))
if not all(isinstance(item, str) for item in list):
raise TypeError("'paths' should contain only str")
# self.paths.extend(paths)
self.paths = paths + self.paths
remaining.remove('paths')
if 'tz' in panel:
self.attrs['tz'] = panel['tz']
remaining.remove('tz')
elif self.attrs['tz'] is None:
raise InvalidEntryError('time zone not provided')
if 'rating' in panel:
self.attrs['rating'] = panel['rating']
remaining.remove('rating')
# Entries
# -------
try:
_entries = panel['entries']
except KeyError:
_entries = []
else:
remaining.remove('entries')
# Check them before we load in the entries
if remaining:
# Make it possible to reproduce the same exception
raise InvalidEntryError('unrecognized keys: '
+ ', '.join(repr(key) for key in
sorted(remaining)))
self.entries = entries = []
for entry in _entries:
entries.append(Entry(entry, self))
def _center_line(self, line, char=' ', rstrip=True):
"""Center 'line' with 'char' as padding to 'self.width'.
If 'rstrip' is True, call 'str.rstrip' on every line.
"""
if not self._indent:
if rstrip:
s_meth = str.rstrip
else:
def s_meth(self):
return self
# 5% of horizontal padding
width = self._wrapper.width
pad = round(0.05 * width)
if width - len(line) >= 2 * pad:
return s_meth('{:{}^{}}'.format(line, char, width))
else:
self._wrapper.width -= 2 * pad
lines = self._wrapper.wrap(line)
assert len(lines) > 1, ('_center_line: excepted more than '
'one line')
self._wrapper.width += 2 * pad
return '\n'.join(
s_meth('{:{}^{}}'.format(line, char, width))
for line in lines
)
indent = self._indent
logger.debug('i am centering a line with indent')
logger.debug('self._indent = %r', self._indent)
logger.debug('self._wrapper.width = %r', self._wrapper.width)
try:
self._indent = 0
result = ''.join(
' ' * indent + line
for line in self._center_line(line, char, rstrip)
.splitlines(keepends=True)
)
finally:
self._indent = indent
return result
def _wrap_paragraph(self, par, prefix='', indent=0):
# Internal function. Assuming that width has already been set,
# wrap a paragraph into lines, obeying the current indentation.
whole_indent = self._indent + indent
indent_str = ' ' * whole_indent
if prefix:
try:
self._wrapper.width -= indent + len(prefix)
par_lines = self._wrapper.wrap(par)
finally:
self._wrapper.width += indent + len(prefix)
lines = []
for line in par_lines:
lines.append(indent_str + prefix + line)
prefix = ' ' * len(prefix)
return lines
else:
try:
self._wrapper.width -= indent
par_lines = self._wrapper.wrap(par)
finally:
self._wrapper.width += indent
return [indent_str + line for line in par_lines]
def __repr__(self):
buf = [f'<{type(self).__name__} object on {self.date:%a %b %e %Y}']
if self.attrs['tz']:
buf.append(f' ({self.attrs["tz"]})')
buf.append('>')
return ''.join(buf)
def to_string(self, show_entry_count=False, as_timezone=None,
show_timezone=None):
# show_timezone should control the time zone in the title and
# also in each individual entry
# to compare the time zone of the panel and the entry
this_timezone = as_timezone or _get_tzinfo(self.attrs['tz'])
self._wrapper.width = self.width - self._indent
buf = []
main_entries, insight_entries = self.get_entries()
main_entries.sort(key=lambda e: e.date_time)
insight_entries.sort(key=lambda e: e.date_time)
mlen, ilen, elen = (len(main_entries), len(insight_entries),
len(self.entries))
date_str = self.date.strftime(f'%A, %B {_SD_d}, %Y')
if self.attrs['rating'] is not None:
title = '{} {}'.format(date_str, self.attrs['rating'])
else:
title = date_str
buf.append(self._center_line(title))
buf.append('\n')
if show_timezone:
# XXX: What the fuck??? How is... how are we supposed to print
# the title for this entire panel? The time offset could
# literally change!
if as_timezone:
tz_string = _get_tz_string(dt.datetime.min
.replace(tzinfo=as_timezone))
else:
tz_string = ('UTC' if self.attrs['tz'] in {'+00:00', '-00:00'}
else f'UTC{self.attrs["tz"]}')
buf.append(self._center_line(f'[time zone: {tz_string}]'))
buf.append('\n')
if show_entry_count:
buf.append('\n')
wrapped = self._wrap_paragraph(
''.join([f'This day has {mlen} main ',
f'entry' if mlen == 1 else 'entries',
f' and {ilen} insight ',
f'entry' if ilen == 1 else 'entries',
f'.']),
prefix='* '
)
for line in wrapped:
buf.extend([line, '\n'])
if main_entries:
buf.append('\n')
not_first = False
for ent in main_entries:
if not_first:
buf.append('\n')
show_this_timezone = bool(
show_timezone and
this_timezone != ent.date_time.tzinfo
)
buf.append(ent.to_string(as_timezone=as_timezone,
show_timezone=show_this_timezone))
buf.append('\n')
not_first = True
# Insights
if insight_entries:
buf.append('\n')
if main_entries:
buf.append('\n')
heading = 'Insight' if ilen == 1 else 'Insights'
buf.append(self._get_line(heading))
buf.append(self._get_line(len(heading) * '-'))
buf.append('\n')
not_first = False
for ent in insight_entries:
if not_first:
buf.append('\n')
show_this_timezone = bool(
show_timezone and
this_timezone != _get_tzinfo(self.attrs['tz'])
)
buf.append(ent.to_string(as_timezone=as_timezone,
show_timezone=show_this_timezone))
buf.append('\n')
not_first = True
# Bound to be a spurious line feed character; remove it.
buf.pop()
return ''.join(buf)
# when calling this functions don't include line feed characters!!
def _get_line(self, line):
return (' ' * self._indent) + line + '\n'
def get_entries(self):
"""Return a tuple of main entries and insight entries."""
main_entries = []
insight_entries = []
for ent in self.entries:
if ent.insight:
insight_entries.append(ent)
else:
main_entries.append(ent)
return main_entries, insight_entries
@property
def indent(self):
return self._indent
@indent.setter
def indent(self, value):
# Validate the value
if not isinstance(value, int):
raise TypeError(f'indent must be an int, not {value!r}')
if value <= 0:
self._indent = 0
self._wrapper.width = self.width
if self.width <= value:
self._indent = self.width - 1
self._wrapper.width = 1
else:
self._indent = value
self._wrapper.width = self.width - value
class Entry:
"""An entry belonging to a panel.
Parameters
----------
entry : dict
A dict loaded from a JSON object of the entry. In
`backup.json` this is any of the objects in the "entries"
key of a panel.
panel : Panel object
The panel that the entry to be created belongs to.
load_file : bool, default False
Whether to load file that are specified "input" into memory.
strict : bool, default True
Whether to validate the entry at the end of initialization.
"""
__slots__ = (
# i promoted 'insight' as that is quite an important
# attribute imo
'date_time', 'panel', '_wrapper', '_indent', 'insight',
# a dictionary containing 'type', 'encoding', 'raw',
# 'source', 'caption' (the file name)
'data',
# attrs like ('question')
'attrs',
)
def __init__(self, entry, panel, load_file=False, strict=True):
# input is whether to input from file (may be good for like
# saving memory)
self.panel = panel
self.insight = False
# Default values
# (Encoding will be set by self._process_data)
self.data = dict(type=None, format=None, encoding=None, caption=None)
self.attrs = dict(question=None)
self._wrapper = self.panel._wrapper
self._process(entry, load_file=load_file)
if strict:
self._validate()
@property
def width(self):
return self.panel.width
@width.setter
def width(self, value):
self.panel.width = value
def _process(self, entry, load_file):
# Process options, optionally inheriting from the panel.
remaining = set(entry.keys())
# Required fields
# ---------------
# XXX: No validation... or is it good this way?
if 'date-time' in entry:
date_time = entry['date-time']
match = re.match(r'^(.+?)[ T](.+?)$', date_time)
if match:
date_str, time_str = match.groups()
else:
raise InvalidEntryError("invalid 'date-time' field in entry")
remaining.remove('date-time')
elif 'time' in entry:
time_str = entry['time']
if 'date' in entry:
date_str = entry['date']
remaining.remove('date')
else:
date_str = self.panel.date.isoformat()
remaining.remove('time')
else:
raise InvalidEntryError("entry must provide time, either through "
"the key 'time' or 'date-time'")
# Time zone optionally (will be stored as a datetime object)
if 'tz' in entry:
tz = entry['tz']
remaining.remove('tz')
else:
tz = self.panel.attrs['tz']
if tz is None:
raise ValueError('time zone must be provided')
self.date_time = dt.datetime.fromisoformat(
'{}T{}{}'.format(date_str, time_str, tz)
)
# Process 'type' and one of 'data' and 'input'
self._process_data(entry, remaining, load_file)
# Optional attributes
# -------------------
if 'meta' in entry:
meta = entry['meta']
if not isinstance(meta, dict):
raise TypeError(f"'meta' should be a dict")
self.data['meta'] = meta.copy()
remaining.remove('meta')
# Process regardless... because we will have important stuff in the
# metadata.
# (XXX: What are the important stuff??)
self._process_metadata()
if 'caption' in entry:
caption = entry['caption']
if not isinstance(caption, str):
raise TypeError(f"'caption' should be a str")
self.data['caption'] = caption
remaining.remove('caption')
# Same code, copy-and-pasted from self._process_data (except
# we do not encode question into a bytes object)
if 'question' in entry:
question = entry['question']
if isinstance(question, list):
try:
self.attrs['question'] = ''.join(question)
except TypeError:
raise TypeError('question should be a list of str') \
from None
elif isinstance(question, str):
self.attrs['question'] = question
else:
raise TypeError(f"'question' should be str or list of str, "
f"not {type(question).__name__!r}")
remaining.remove('question')
if 'insight' in entry:
self.insight = entry['insight']
remaining.remove('insight')
if remaining:
# Make it possible to reproduce the same exception
raise InvalidEntryError('unrecognized keys: '
+ ', '.join(repr(key) for key in
sorted(remaining)))
def _process_data(self, entry, remaining, load_file):
if 'type-format' in entry:
type_, fmt = entry['type-format'].split('-')
self.set_type(type_)
self.set_format(fmt)
remaining.remove('type-format')
else:
if 'type' in entry:
self.set_type(entry['type'])
remaining.remove('type')
if 'format' in entry:
self.set_format(entry['format'])
remaining.remove('format')
# Get default encoding (very sketchily)
if 'encoding' in entry:
encoding = entry['encoding']
if not isinstance(encoding, str):
raise TypeError(f"'encoding' in entry should be a str, "
f"not {type(encoding).__name__!r}")
self.data['encoding'] = encoding
remaining.remove('encoding')
# We will infer the encoding later (specifically when we use
# data-encoding or load from a file).
# Some trivial validations
if 'data' in entry and 'input' in entry:
raise InvalidEntryError("only one of 'data' and 'input' can be "
"specified")
elif not ('data' in entry or 'input' in entry):
raise InvalidEntryError("at least one of 'data' and 'input' "
"should be specified")
# Split off into two cases
enc = self.data['encoding']
if 'data' in entry:
if 'data-encoding' in entry:
# The encoding serves a different purpose for when
# data-encoding is supplied
data_enc = entry['data-encoding']
self.data['raw'] = self._decode_data(entry['data'], data_enc)
remaining.remove('data-encoding')
# Sets the encoding (and type potentially)
logger.debug('My current type is %r', self.data['type'])
# Set the default type, checking for encoding provided.
if self.data['type'] is None:
if enc is None or enc == 'binary':
self.data['type'] = 'binary'
else:
self.data['type'] = 'plain'
self._infer_encoding_from_type()
else:
# We must not mock other encodings when loading from Python
# strings (which are innately encoded with utf-8)
if not (enc is None or enc == 'utf-8'):
logger.warning(
"'_process_data': encoding %r treated as 'utf-8'",
self.data['encoding']
)
enc = self.data['encoding'] = 'utf-8'
# Set the default type
if self.data['type'] is None:
self.data['type'] = 'plain'
data = entry['data']
if isinstance(data, list):
try:
string = ''.join(data)
except TypeError:
raise TypeError("'data' should be a list of str") \
from None
elif isinstance(data, str):
string = data
else:
raise TypeError(f"'data' should be str or a list of str, "
f"not {type(data).__name__!r}")
# 'enc' is set to 'utf-8' anyway, so no validation needed.
self.data['raw'] = string.encode(enc)
remaining.remove('data')
else:
assert 'input' in entry, 'did the validations above fail???'
inp = entry['input']
if not isinstance(inp, str):
raise TypeError(f"'input' should be a str, not "
f"{type(inp).__name__!r}")
# if '.' != self.panel.paths[0]:
# logger.critical("'paths' does not contain '.' as the "
# "zeroth item")
candidates = []
for path in self.panel.paths:
# logger.debug('i am checking %r', path)
fullpath = _get_path(path)
# Some basic validation
if '*' not in path:
if not os.path.exists(fullpath):
logger.warning("non-existent path in 'paths': %r",
path)
continue
if not os.path.isdir(fullpath):
logger.warning("non-directory path in 'paths': %r",
path)
continue
# Tell `glob.glob` that this should be a directory
if not fullpath.endswith(os.sep):
fullpath += os.sep
for dirname in glob.iglob(fullpath):
# logger.debug(' - %r', dirname)
filepath = os.path.normpath(os.path.join(dirname, inp))
# Check for file existence and validity as a file
# (or as the file a symbolic link points to)
if os.path.isfile(filepath):
candidates.append(filepath)
if not candidates:
raise FileNotFoundError(f"'input': {inp!r} not found")
filepath = candidates[0]
if len(candidates) > 1:
logger.warning(f"multiple files for 'input' {inp!r}; "
f"using the first match {filepath!r}")
# Respect symbolic links
self.data['source'] = os.path.abspath(filepath)
self._infer_type_from_input(inp)
# Set the default type, checking for encoding provided.
if self.data['type'] is None:
if enc is None or enc == 'binary':
self.data['type'] = 'binary'
else:
self.data['type'] = 'plain'
self._infer_encoding_from_type()
if load_file:
self.load_data()
remaining.remove('input')
def _process_metadata(self):
if 'meta' not in self.data:
return
meta = self.data['meta']
# Tolerance for cases when second is not provided
time = self.date_time
if time.second == 0 and time.microsecond == 0:
tolerance = dt.timedelta(seconds=60)
else:
tolerance = dt.timedelta(0)
if 'posted' in meta:
posted = dt.datetime.fromisoformat(meta['posted'])
if (posted.tzinfo is None
or posted.tzinfo.utcoffset(posted) is None):
posted = posted.replace(tzinfo=time.tzinfo)
if posted < time:
raise InvalidEntryError('posted time of file earlier than '
'the time of entry')
meta['posted'] = posted
else:
meta['posted'] = time
if 'created' in meta:
created = dt.datetime.fromisoformat(meta['created'])
# Check if naive: https://docs.python.org/3/library/datetime.html
# #determining-if-an-object-is-aware-or-naive
# (The conditions given are for aware objects, in which case
# we just flip them around by De Morgan's law)
if (created.tzinfo is None
or created.tzinfo.utcoffset(created) is None):
created = created.replace(tzinfo=time.tzinfo)
if created > meta['posted'] + tolerance:
raise InvalidEntryError('creation time of file later than '
'the posted time')
meta['created'] = created
# Description: can be any string
meta.setdefault('desc')
# NSFW: sexual stuff or just stuff that might scare you (like gore)
meta.setdefault('nsfw', False)
def _decode_data(self, data, enc):
if enc == 'base16':
return base64.b16decode(data)
elif enc == 'base32':
return base64.b32decode(data)
elif enc == 'base64':
return base64.b64decode(data)
elif enc == 'ascii85':
return base64.a85decode(data)
elif enc == 'base85':
return base64.b85decode(data)
else:
raise InvalidEntryError(f'unrecognized data encoding: '
f'{enc!r}')
def _infer_type_from_input(self, input_path):
if self.data['type'] is not None:
return
_, ext = os.path.splitext(input_path)
# No extension or a trailing dot.
if len(ext) <= 1:
return
try:
type_ = TYPE_EXTENSIONS[ext]
except KeyError:
logger.warning('_infer_type_from_input: unable to infer type '
'from extension %r', ext)
else:
# logger.debug('_infer_type_from_input: inferred type %r', type_)
self.data['type'] = type_
def _infer_encoding_from_type(self):
if self.data['encoding'] is None:
if self.data['type'] in TEXT_TYPES:
self.data['encoding'] = 'utf-8'
else:
self.data['encoding'] = 'binary'
def _infer_extension_from_type(self):
# Either we loaded from the backup file itself or just...
# I don't know, maybe some weird-ass file with no extension at all?
raise NotImplementedError('i don\'t know uwu')
# TODO: Set file extension
# jpg: jpg, otherwise stuff...
# "musescore" -> "mscz"
# "musescore-uncompressed" (essentially xml) -> "mscx"
del _infer_extension_from_type
def _validate(self):
midnight = dt.time(tzinfo=_get_tzinfo(self.panel.attrs['tz']))
start_of_day = dt.datetime.combine(self.panel.date, midnight)
time = self.date_time
if time < start_of_day:
raise InvalidEntryError('entry earlier than 0:00 of the panel')
if self.insight:
if time - dt.timedelta(days=2) < start_of_day:
raise InvalidEntryError('insight within the first 48 hours')
def load_data(self):
if 'source' not in self.data:
raise ValueError("Source path not found in 'data'")
try:
with open(self.data['source'], 'rb') as fp:
content = fp.read()
except (FileNotFoundError, IsADirectoryError):
raise # I don't know what to do
self.data['raw'] = content
return content
def get_data_size(self):
"""Get size of data without necessarily loading the
unloaded file.
"""
if 'raw' in self.data:
return len(self.data['raw'])
try:
return os.path.getsize(self.data['source'])
except KeyError:
raise RuntimeError(
"'raw' and 'source' are both undefined"
) from None
def is_binary(self):
return self.data['encoding'] == 'binary'
# Setter and getter (for we can't determine what's important enough for
# a property yet)
def get_text(self):
if self.is_binary():
raise TypeError('cannot retrieve text of entry with binary data')
if 'raw' not in self.data:
self.load_data()
return self.data['raw'].decode(self.data['encoding'])
def set_type(self, typ):
if typ in TYPE_ALIASES:
typ = TYPE_ALIASES[typ]
self.data['type'] = typ
def set_format(self, fmt):
self.data['format'] = fmt
def __repr__(self):
buf = [f'<{type(self).__name__} object at '
f'{self.date_time:%a %b %e %H:%M %Y}']
if self.date_time.tzinfo is not None:
buf.append(f' ({_get_tz_string(self.date_time)})')
buf.append('>')
return ''.join(buf)
def to_string(self, *, indent=2, label_insight=False, long_format=False,
as_timezone=None, show_timezone=None):
# In case we were called directly and not from Panel,
# we should definitely set the width.
#
# XXX: But is this too sketchy??
self._indent = self.panel._indent
self._wrapper.width = self.width - self._indent # width of panel
buf = []
if as_timezone is None:
as_timezone = _get_tzinfo(self.panel.attrs['tz'])
# as_timezone is not provided... so make show_timezone
# False by default.
if show_timezone is None:
show_timezone = False
else:
# as_timezone is provided! We want show_timezone to be
# True by default.
if show_timezone is None:
show_timezone = True
# Only convert the datetime if we're NOT showing the time zone
if not show_timezone:
conv_date_time = self.date_time.astimezone(as_timezone)
else:
conv_date_time = self.date_time
if long_format:
date_time_string = conv_date_time.strftime(
'%b %e, %Y{}{}'
.format(self._DATE_TIME_SEP, self.TIME_FORMAT_2)
)
else:
date_time_string = self._get_date_time_string(conv_date_time)
words = []
# buf.append(' ' * self._indent)
# buf.append(date_time_string)
# sneak in some nice non-breaking spaces
# words.append(date_time_string.replace(' ', '\xa0'))
words.append(date_time_string)
# Handle time zones
if show_timezone:
words.append(' [{}]'.format(_get_tz_string(self.date_time)))
# if not (self.is_binary() or self.data['type'] == 'plain'):
# if label_insight and self.insight:
# buf.append(f' (insight, {self.data["type"]})')
# else:
# buf.append(f' ({self.data["type"]})')
# elif label_insight and self.insight:
# buf.append(' (insight)')
# Some additional stuff to be put in parentheses
time_attrs = []
if label_insight and self.insight:
time_attrs.append('insight')
if not (self.is_binary() or self.data['type'] == 'plain'):
time_attrs.append(self.data['type'])
if time_attrs:
words.append(' ({})'.format(', '.join(time_attrs)))
wrapped = self._wrap_paragraph(''.join(words))
for line in wrapped:
buf.extend([line, '\n'])
# The main content
try:
self.indent += indent
# Check for question
if self.attrs['question'] is not None:
question_prefix = '(Q) '
wrapped = self._wrap_paragraph(self.attrs['question'],
prefix=question_prefix)
for line in wrapped:
buf.extend([line, '\n'])
# Add some space after question
buf.append('\n')
if self.is_binary():
data_type = self.data['type']
data_size = format_bytes(self.get_data_size())
if 'source' in self.data:
data_src = self.data['source']
# Make the path somewhat shorter
if os.path.samefile(
os.path.commonpath([BASE_DIR, data_src]),
BASE_DIR
):
data_src = os.path.relpath(data_src, start=BASE_DIR)
wrapped = self._wrap_paragraph(
'<{} file sized {} at {!r}>'
.format(data_type, data_size, data_src)
)
else:
wrapped = self._wrap_paragraph(
'<{} data sized {}>'
.format(data_type, data_size)
)
for line in wrapped:
buf.extend([line, '\n'])
else:
lines = self.get_text().splitlines()
for line in lines:
if not line.strip():
buf.append('\n')
else:
# Indent either with the question prefix (defined
# beforehand) or with no indent at all (default)
wrapped = self._wrap_paragraph(line)
for ln in wrapped:
buf.extend([ln, '\n'])
if self.data['caption'] is not None:
buf.append('\n')
caption_prefix = 'Caption: '
# Because we are indenting each paragraph, some deed must be
# done to caption_prefix to give off an illusion of
# indentation...
for line in self.data['caption'].splitlines():
if not line.strip():
buf.append('\n')
else:
wrapped = self._wrap_paragraph(
line, prefix=caption_prefix
)
for ln in wrapped:
buf.extend([ln, '\n'])
caption_prefix = ' ' * len(caption_prefix)
finally:
self.indent -= indent
buf.pop()
return ''.join(buf)
_DATE_TIME_SEP = ' '
# 12-hour clock format
TIME_FORMAT = f'{_SP_I}:%M %p'
TIME_FORMAT_2 = f'{_SD_I}:%M %p'
# 24-hour clock format
# TIME_FORMAT = f'{_SP_H}:%M'
# TIME_FORMAT_2 = f'{_SD_H}:%M'
# Internal function used by to_string()
def _get_date_time_string(self, date_time):
panel_date = self.panel.date
if panel_date.year == date_time.year:
if panel_date == date_time.date():
fmt = self.TIME_FORMAT_2
else:
fmt = '%b %e{}{}'.format(self._DATE_TIME_SEP, self.TIME_FORMAT)
else:
fmt = '%b %e, %Y{}{}'.format(self._DATE_TIME_SEP, self.TIME_FORMAT)
return date_time.strftime(fmt)
def _wrap_paragraph(self, *args, **kwargs):
"""Calls `self.panel._wrap_paragraph`."""
# XXX: I fucking hate this code
return type(self.panel)._wrap_paragraph(self, *args, **kwargs)
@property
def indent(self):
return self._indent
@indent.setter
def indent(self, value):
# Validate the value
if not isinstance(value, int):
raise TypeError(f'indent must be an int, not {value!r}')
if value <= 0:
self._indent = 0
self._wrapper.width = self.width
if self.width <= value:
self._indent = self.width - 1
self._wrapper.width = 1
else:
self._indent = value
self._wrapper.width = self.width - value
# A 'backup' class is not really needed, as there is only one dictionary
# that stores all the information. Here are just some scattered "methods."
#
# (please dont ask me why abbreviation is 'bk' and not 'bu'...
# it just looks nicer ok)
def backup_get_description(bk) -> str:
"""Extract the description 'desc' from backup file.
The description should be a list of strings.
"""
try:
desc = bk['desc']
except KeyError:
return None
if not isinstance(desc, list):
raise TypeError("'desc' must be a list of strings")
try:
return ''.join(desc)
except TypeError as exc:
raise TypeError("Possible mingled type in 'desc' "
"(expected a list of strings)") from exc
def backup_get_data(bk) -> list:
"""Extract the 'data' from backup file, if it exists."""
try:
data = bk['data']
except KeyError:
return []
if not isinstance(data, list):
raise TypeError("'data' must be a list (JSON array)")
return data
def backup_get_attributes(bk) -> dict:
"""Extract attributes, i.e. key-value pairs aside from
'data' and 'desc'.
"""
_ignored = {'data', 'desc'}
return {k: v for k, v in bk.items() if k not in _ignored}
# Dealing with list of panels
def data_check_duplicates(data) -> set:
"""Check for duplicates and returns the set of all dates."""
dates = set()
for panel in data:
if 'date' in panel:
n = len(dates)
date_str = panel['date'] # Save for debug
date = dt.date.fromisoformat(date_str)
dates.add(date)
if n == len(dates):
raise ValueError(f'duplicate date: {date_str!r}')
# If they don't have a date, it's not a panel...?
return dates
def data_get_by_date(data, date, *, duplicates_ok=False) -> dict:
"""Extract a panel with a certain date.
Parameters
----------
data : dict
The data as extracted by backup_get_data().
date : datetime.date or str object
The date of the panel to be extracted. Raise LookupError if it is
not found.
Keyword Arguments
-----------------
duplicates_ok : bool, default False
If True, panels with the same date are ignored, and only the first
search is returned. If not, raise LookupError when multiple panels
with the same date are found.
"""
if isinstance(date, dt.date):
date_str = date.isoformat()
elif isinstance(date, str):
# Validate the string.
date_str = dt.date.fromisoformat(date).isoformat()
else:
raise TypeError(f'expected datetime.date or str object, got {date!r}')
match = [panel for panel in data if panel['date'] == date_str]
if not match:
raise LookupError(f'{date!r}')
if len(match) > 1:
if not duplicates_ok:
raise ValueError(f'duplicate date: {date_str}')
return match
return match[0]
# Miscellaneous functions
def default_bytes_formatter(x):
# x: float -> s: string
if x < 100:
return format(x, '.3g')
else:
return format(x, '.1f')
def format_bytes(size, unit='tens', sep=' ',
formatter=default_bytes_formatter):
units = ['B']
# Multiplier. (Think of the unit as an embodiment of the multiplier.)
mult = 1
if unit == 'tens':
units.extend(BYTES_TENS_UNITS)
# Increase by powers of 10**3 and see if it falls in the range
# [mult, mult * 10**3).
for suffix in units:
if mult <= size < mult * 1000:
return formatter(size / mult) + sep + suffix
mult *= 1000
# No need to worry about 'suffix' being undefined since 'unit' has
# at least one item.
return formatter(size / mult) + sep + suffix
elif unit == 'twos':
units.extend(BYTES_TWOS_UNITS)
# Increase by powers of 2**10 and see if it falls in the range
# [mult, mult * 2**10). Bit shifting is used because... computers.
for suffix in units:
if mult <= size < mult << 10:
return formatter(size / mult) + sep + suffix
mult <<= 10
return formatter(size / mult) + sep + suffix
else:
raise ValueError("'unit' must be either 'tens' or 'twos'")
# Only up to terrabyte (TB) is actually used. Anything above that
# would be unrealistic.
BYTES_TENS_UNITS = [
'kB', 'MB', 'GB', 'TB',
# 'PB', 'EB', 'ZB', 'YB',
]
BYTES_TWOS_UNITS = [
'KiB', 'MiB', 'GiB', 'TiB',
# 'PiB', 'EiB', 'ZiB', 'YiB',
]
# Miscellaneous functions
def _get_tz_string(date_time):
"""Convert a `datetime.datetime` object to a string
(like 'UTC' or 'UTC+08:00') representing its time zone.
"""
return date_time.tzinfo.tzname(date_time)
def _get_tzinfo(tzname):
"""Convert an offset string to a string (like '+00:00'
or '+08:00') to a `datetime.tzinfo` object, using the
class method `datetime.datetime.fromisoformat`.
"""
dummy = dt.datetime.fromisoformat(f'0001-01-01T00:00{tzname}')
return dummy.tzinfo
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('filename', nargs='?', default='backup.json',
help='file path of the backup file')
parser.add_argument('date', type=dt.date.fromisoformat,
help='date of the panel to load')
parser.add_argument('--enc', '-e', metavar='ENCODING', default='utf-8',
help='encoding for opening the file')
parser.add_argument('--width', '-w', type=int,
help='width of the printed panel')
parser.add_argument('--check', '-c', action='count', default=0,
help=('further checking for duplicate entries and '
'proper order (-c for warning and -cc for '
'exception)'))
args = parser.parse_args()
if args.width is None:
import shutil
width, _ = shutil.get_terminal_size()
else:
width = args.width
do_check = args.check >= 1
do_error = args.check >= 2
warnings = None
def _warn_or_raise(warning, msg, cause=None):
nonlocal warnings
if warnings is None:
import warnings
if do_error:
raise warning(msg) from cause
else:
warnings.warn(msg, warning, 2)
if cause is not None:
import traceback
traceback.print_exception(type(cause), cause,
cause.__traceback__)
import json
with open(args.filename, encoding=args.enc) as fp:
bk = json.load(fp)
attrs = backup_get_attributes(bk)
data = backup_get_data(bk)
if do_check:
try:
data_check_duplicates(data)
except ValueError as exc:
# len('duplicate date: ') equals 16
dupe_date = exc.args[0][16:]
_warn_or_raise(RuntimeWarning, f'duplicate date: {dupe_date}')
def _format_insight(insight):
if insight:
return 'an insight entry'
return 'a main entry'
last_panel = None
for i, panel_dict in enumerate(data):
try:
panel = Panel(panel_dict, attrs)
except (ValueError, TypeError) as exc:
panel_name = (repr(panel_dict['date'])
if 'date' in panel_dict else f'#{i}')
msg = f'error occured while loading panel {panel_name}'
_warn_or_raise(RuntimeWarning, msg, exc)
else:
# Panel order
if last_panel is not None and last_panel.date > panel.date:
_warn_or_raise(RuntimeWarning,
f'inconsistent order in panels: '
f'({panel.date} precedes '
f'{panel.date})')
# XXX: Comment this out
assert last_panel is None or last_panel.date < panel.date
# Entry order
main_entries = []
insight_entries = []
# Some criteria to look for:
#
# 1. Ideally the entries should be main then insight.
#
# Entry then main is okay... although in Perspective it's
# never like that. We'll issue a different warning for
# that.
#
# 2. Main entries and insight entries should be in
# chronological order. The check is similar the check
# for panels above.
#
# To implement the 1st criterion, we use a boolean
# 'has_switched' that is True when we see a change from
# main entry to insight entry.
has_switched = False
expected_insight_value = None
last_main_entry = None
last_insight_entry = None
for i, entry in enumerate(panel.entries):
if expected_insight_value is None:
expected_insight_value = entry.insight
# Checking main -> insight order
if expected_insight_value != entry.insight:
if has_switched:
expected = _format_insight(expected_insight_value)
got = _format_insight(entry.insight)
msg = (f'expected entry {i} to be {expected}, '
f'got {got} (on {panel.date})')
_warn_or_raise(RuntimeWarning, msg)
else:
has_switched = True
expected_insight_value = entry.insight
# Checking main entry order
if last_main_entry is not None and not entry.insight:
if last_main_entry.date_time > entry.date_time:
msg = (f'inconsistent order in main entries '
f'on {panel.date} (entry {i} precedes '
f'entry {i - 1})')
_warn_or_raise(RuntimeWarning, msg)
# Checking insight entry order
if last_insight_entry is not None and entry.insight:
if last_insight_entry.date_time > entry.date_time:
msg = (f'inconsistent order in insight entries '
f'on {panel.date} (entry {i} precedes '
f'entry {i - 1})')
_warn_or_raise(RuntimeWarning, msg)
if entry.insight:
last_insight_entry = entry
else:
last_main_entry = entry
last_panel = panel
panel_dict = data_get_by_date(data, args.date, duplicates_ok=True)
panel = Panel(panel_dict, attrs, width=width)
print(panel.to_string())
if __name__ == '__main__':
main()
#!/usr/bin/env python
"""Install the basicproc module."""
from distutils.core import setup
# from os.path import dirname, join
# def read(path):
# with open(join(dirname(__file__), path)) as fp:
# return fp.read()
setup(
name='basicproc',
version='0.1.0',
description='Basic processing for Perspective backup files',
# long_description=read('README.md'),
author='rapidcow',
author_email='thegentlecow@gmail.com',
py_modules=['basicproc'],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment