Skip to content

Instantly share code, notes, and snippets.

@ArtemGr
Last active November 2, 2022 10:59
Show Gist options
  • Save ArtemGr/bf91613a021a536c7ce16cdba9168604 to your computer and use it in GitHub Desktop.
Save ArtemGr/bf91613a021a536c7ce16cdba9168604 to your computer and use it in GitHub Desktop.
llog
/__pycache__
/llog
#!/usr/bin/env python
# Install with
#
# python -mpip install --user --upgrade git+https://gist.github.com/bf91613a021a536c7ce16cdba9168604.git
import datetime
import math
import os
import sys
from inspect import currentframe, getframeinfo
def stdout2utf8():
'''https://stackoverflow.com/questions/3597480/how-to-make-python-3-print-utf8'''
#sys.stdout = open(1, 'w', encoding='utf-8', closefd=False)
sys.stdout.reconfigure(encoding='utf-8')
# rindex that does not throw
def rindexʹ(s, ss):
try:
return s.rindex(ss)
except ValueError:
return -1
def log(*args, sep=None, end=None, file=None, flush=False, back=1):
frame = currentframe()
for _ in range(back):
frame = frame.f_back
info = getframeinfo(frame)
name = info.filename
slash = max(rindexʹ(name, "/"), rindexʹ(name, "\\")) + 1
name = name[slash:]
if name.endswith(".py"):
name = name[:-3]
args = list(args)
args.insert(0, f"{name}:{frame.f_lineno}]")
args.insert(0, datetime.datetime.now().strftime("%H:%M:%S"))
print(*args, sep=sep, end=end, file=file, flush=flush)
LOGC_PAST = {}
def logc(line):
'''only log the `line` when it changes'''
frame = currentframe()
frame = frame.f_back
info = getframeinfo(frame)
name = info.filename
slash = max(rindexʹ(name, "/"), rindexʹ(name, "\\")) + 1
name = name[slash:]
if name.endswith(".py"):
name = name[:-3]
loc = f"{name}:{frame.f_lineno}"
if LOGC_PAST.get(loc) == line:
return False
tim = datetime.datetime.now().strftime("%H:%M:%S")
print(f"{tim} {loc}] {line}")
LOGC_PAST[loc] = line
return True
def floorʹ(v, extra=0):
'''drop more decimal places depending on whether the integer is large'''
if v == None:
return None
if math.isnan(v):
return v
av = abs(v)
if 10 <= av:
decimal_places = 0
elif 1 <= av:
decimal_places = 1
elif .1 <= av:
decimal_places = 2
elif .01 <= av:
decimal_places = 3
elif .001 <= av:
decimal_places = 4
elif .0001 <= av:
decimal_places = 5
else:
# See `format_float_positional` about printing these
decimal_places = 6
decimal_places += extra
if not decimal_places:
return int(v)
r = 10**decimal_places
return math.floor(v * r) / r
def floorᵃ(a, extra=0):
return list(map(lambda v: floorʹ(v, extra), a))
def bits2bedstead(ch):
'''
Row-major bits, 2x3, to [Bedstead](https://i.imgur.com/f3myFgM.png)
cf. https://youtu.be/5yoWxctJsYo graphics with teletext; Windows font rendering glitch
cf. https://github.com/saitoha/PySixel full-pixel color graphics on SIXEL-supported terminals
cf. https://stackoverflow.com/q/37288421/257568 termplotlib plotext uniplot terminalplot
'''
if 0b111111 < ch:
return chr(0xEE00)
if 32 <= ch:
return chr(0xEE40 + ch - 32)
if 0 < ch:
return chr(0xEE00 + ch)
return chr(0xEE00 + ch)
def bedstead2bits(ch):
'''[Bedstead](https://i.imgur.com/f3myFgM.png) to row-major bits, 2x3'''
ch = ord(ch) if len(ch) == 1 else 0
if 0xEE5F < ch: # Past G1
return 0
if 0xEE40 <= ch:
return ch - 0xEE40 + 32
if 0xEE00 <= ch:
return ch - 0xEE00
return 0 # Below G1
def replace_str(string, position, characters):
'''return a `string` with the `characters` at the given `position` replaced'''
# cf. https://stackoverflow.com/a/67361875/257568 fast replacement
return f"{string[:position]}{characters}{string[position+len(characters):]}"
class BedsteadMap:
def __init__(self, a, xs, ys, wofs=0, hofs=0, scale=True):
self.wofs = int(wofs)
self.hofs = int(hofs)
self.xsmin = float(min(xs))
self.xsmax = float(max(xs))
self.ysmin = float(min(ys))
self.ysmax = float(max(ys))
self.wh = (len(a[0]), len(a))
self.height = int(self.wh[1] * 3)
ε = 0.00006103515625 # f16: smallest positive normal number
self.coeff = float(self.height / max(ε, self.ysmax - self.ysmin))
self.scale = 1.0
if isinstance(scale, bool) and scale:
self.scale = (self.ysmax - self.ysmin) / (self.xsmax - self.xsmin) * (self.wh[0] * 2 / self.height)
self.width = min(self.wh[0] * 2, int((self.xsmax - self.xsmin) * self.coeff * self.scale + 1))
def zip(self, xs, ys):
for (x, y) in zip(xs, ys):
px = min(self.width - 1, int((float(x) - self.xsmin) * self.coeff * self.scale))
py = min(self.height - 1, int((self.ysmax - float(y)) * self.coeff))
ax = max(0, min(self.wh[0] - 1, px // 2 + self.wofs))
ay = max(0, min(self.wh[1] - 1, py // 3 + self.hofs))
bx = px % 2
by = py % 3
#print('x', x, 'y', y, 'px', px, 'py', py, 'ax', ax, 'ay', ay, 'bx', bx, 'by', by)
bit = 1 << (bx + by * 2)
yield bit, bx, by, px, py, ax, ay
def plot(a, xs, ys, wofs=0, hofs=0, scale=True):
'''
Example:
import shutil
wh = shutil.get_terminal_size ((111, 11))
a = [[' ' for x in range (wh.columns)] for y in range (3)]
map = plot (a, [1, 2, 3, 4, 5, 6, 7, 8, 9], [1, 2, 3, 4, 5, 4, 3, 2, 1])
for y in a: print (''.join (y) .rstrip())
'''
map = BedsteadMap(a, xs, ys, wofs, hofs, scale)
for bit, *_, ax, ay in map.zip(xs, ys):
bits = bedstead2bits(a[ay][ax])
bits |= bit
a[ay][ax] = bits2bedstead(bits)
return map
def has_color():
'''[CLICOLOR_FORCE](https://bixense.com/clicolors/) or `isatty`'''
return (os.getenv('CLICOLOR_FORCE', '0') != '0') or sys.stdout.isatty()
def c(color: int):
'''
https://en.wikipedia.org/wiki/ANSI_escape_code#8-bit
https://www.ditig.com/256-colors-cheat-sheet
if :func:`has_color`
'''
return f"\033[38;5;{color}m" if has_color() else ''
def esc0():
'''`\\033[0m` if :func:`has_color`'''
return '\033[0m' if has_color() else ''
if __name__ == '__main__':
import shutil
# clean the cache for library users
shutil.rmtree('__pycache__', ignore_errors=True)
for i in range(0, 2**6):
log(i, bits2bedstead(i), bedstead2bits(bits2bedstead(i)))
wh = shutil.get_terminal_size((111, 11))
a = [[' ' for x in range(wh.columns)] for y in range(3)]
plot(a, [.1, .15, .2, .3, .4, .5, .6, .7], [.1, .14, .15, .2, .25, .3, .25, .2], scale=False)
print('\n'.join(''.join(y).rstrip() for y in a))
#!/usr/bin/env python
import sys
def dump(path, size=False):
from llog import state
pv = state.State(path).dump(size=size)
try:
import yaml
print(yaml.dump(pv))
except ModuleNotFoundError:
import json
print(json.dumps(pv, ensure_ascii=False, indent=2))
if __name__ == '__main__':
size = False
for arg in sys.argv:
if arg == '--size':
size = True
if arg.startswith('--dump='):
dump(arg[7:], size)
exit(0)
print('Command line examples:')
print()
print(' python -mllog [--size] --dump=state.mdb')
class fragile(object):
'''
Fragile `with`, https://stackoverflow.com/a/23665658/257568
```
with fragile(open(path)) as f:
print 'before condition'
if condition:
raise fragile.Rollback
print 'after condition'
```
'''
class Rollback(Exception):
'''break out of the `with` statement with a failure'''
class Commit(Exception):
'''break out of the `with` statement with a success'''
def __init__(self, value):
self.value = value
def __enter__(self):
return self.value.__enter__()
def __exit__(self, etype, value, traceback):
if etype == self.Commit:
self.value.__exit__(None, None, None)
return True # consume the exception
error = self.value.__exit__(etype, value, traceback)
if etype == self.Rollback:
return True # consume the exception
return error
#!/usr/bin/env python
import re
from llog import log
# Rust code of the functions can be found at
# https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=8f7bd61f2c992fb9a3225c11d0b4aaec
# cf. https://github.com/ArtemGr/gstuff.rs/blob/5bb23a4ead69b1b7370e657f9c19061e75b6dd4a/lines.rs#L211
_esc = re.compile(rb'[\x01\x00\n\r",]')
def _bcb(ma):
ch = ma[0]
if ch == b',':
return b'\x01\x06'
if ch == b'"':
return b'\x01\x05'
if ch == b'\r':
return b'\x01\x04'
if ch == b'\n':
return b'\x01\x03'
if ch == b'\x00':
return b'\x01\x02'
if ch == b'\x01':
return b'\x01\x01'
return ch
def csesc(fr: bytes):
'''escape 0, 10, 13, 34 (double quote) and 44 (comma)'''
return _esc.sub(_bcb, fr)
_unesc = [1, 0, 10, 13, 34, 44]
def csunesc(fr: bytes):
'''unescape for csesc'''
result = bytearray()
encoded = False
for code in fr:
if encoded:
encoded = False
result.append(_unesc[code - 1])
elif code == 1:
encoded = True
else:
result.append(code)
return result
if __name__ == '__main__':
import timeit
import numpy as np
hello = "Привет, Юля!".encode('utf-8')
hesc = "Привет\u0001\u0006 Юля!".encode('utf-8')
def __csescᵗ():
assert csesc(hello) == hesc
t = timeit.timeit(__csescᵗ, number=99) / 99
log('csesc', np.format_float_positional(t, trim='-'), 'o/s')
assert csunesc("Привет\u0001\u0006 Юля!".encode('utf-8')) == "Привет, Юля!".encode('utf-8')
assert csunesc(
"0:\u0001\u0002\u0001\u000610:\u0001\u0003\u0001\u000613:\u0001\u0004\u0001\u000634:\u0001\u0005".encode(
'utf-8')) == "0:\x00,10:\x0a,13:\x0d,34:\"".encode('utf-8')
assert csesc("Привет, Юля!".encode('utf-8')) == "Привет\u0001\u0006 Юля!".encode('utf-8')
assert csesc(
"0:\x00,10:\x0a,13:\x0d,34:\"".encode('utf-8')
) == "0:\u0001\u0002\u0001\u000610:\u0001\u0003\u0001\u000613:\u0001\u0004\u0001\u000634:\u0001\u0005".encode(
'utf-8')
from setuptools import setup
import os
import shutil
if __name__ == "__main__":
if not os.path.exists('llog'):
os.mkdir('llog')
shutil.copyfile('__init__.py', 'llog/__init__.py')
shutil.copyfile('__main__.py', 'llog/__main__.py')
shutil.copyfile('state.py', 'llog/state.py')
shutil.copyfile('control.py', 'llog/control.py')
shutil.copyfile('lines.py', 'llog/lines.py')
setup(
name='llog',
version='2.0.3',
description='Log with line number. Bedstead plots. State.',
author='Artemciy',
author_email='artemciy@gmail.com',
packages=['llog'], # Same as name
# NB: “state.py” requires “lmdb” and “cbor2”, but one doesn't have to use it,
# they are *optional* so to speak.
# ⌥ use square bracket dependencies hence
install_requires=[], # External packages as dependencies
)
#!/usr/bin/env python
from datetime import timezone
import cbor2
import lmdb
# Python module: https://lmdb.readthedocs.io/en/release/
# Binary serialization for the fields: https://docs.python.org/3/library/struct.html
# https://docs.python.org/3/reference/compound_stmts.html#with
# https://docs.python.org/3/reference/datamodel.html#context-managers
# https://docs.python.org/3/reference/datamodel.html?emulating-container-types#emulating-container-types
# A short example at https://stackoverflow.com/a/23976949/257568
# https://docs.python.org/3/library/stdtypes.html#bytes-and-bytearray-operations
def str2byt(sob):
if isinstance(sob, str):
return sob.encode()
else:
return sob
class TranIter:
def __init__(self, cur):
self.cur = cur
def __next__(self):
if self.cur:
if self.cur.next():
return str(self.cur.key(), 'utf-8')
else:
self.cur.close()
self.cur = None
raise StopIteration
else:
raise StopIteration
def __del__(self):
if self.cur:
self.cur.close()
class Tran:
def __init__(self, env, dbn=b'default', write=True):
self.env = env
self.dbn = str2byt(dbn)
self.write = write
def __enter__(self):
self.txn = self.env.begin(write=self.write, buffers=True)
self.db = self.env.open_db(self.dbn, txn=self.txn)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type == None:
self.txn.commit()
else:
self.txn.abort()
def clear(self, drop=False):
self.txn.drop(self.db, delete=drop)
def __delitem__(self, key):
if not self.txn.delete(str2byt(key), db=self.db):
raise KeyError(f"No such key: “{key}”")
def __setitem__(self, key, value):
cv = cbor2.dumps(value, timezone=timezone.utc, value_sharing=True, string_referencing=True)
self.txn.put(str2byt(key), cv, db=self.db)
async def amem(self, key, vf):
'''
memoize the key, invoking `vf` to generate a missing value
```
with state.begin() as st:
foo = await st.amem('foo', lambda: return_coroutine())
```
'''
cur = self.txn.cursor(self.db)
key = str2byt(key)
found = cur.set_key(key)
if found:
pv = cbor2.loads(cur.value())
else:
pv = await vf()
cv = cbor2.dumps(pv, timezone=timezone.utc, value_sharing=True, string_referencing=True)
cur.put(key, cv)
cur.close()
return pv
def mem(self, key, vf):
'''memoize the key, invoking `vf` to generate a missing value'''
cur = self.txn.cursor(self.db)
key = str2byt(key)
found = cur.set_key(key)
if found:
pv = cbor2.loads(cur.value())
else:
pv = vf()
cv = cbor2.dumps(pv, timezone=timezone.utc, value_sharing=True, string_referencing=True)
cur.put(key, cv)
cur.close()
return pv
def get(self, key, default=None):
mv = self.txn.get(str2byt(key), db=self.db)
if mv == None:
return default
# https://docs.python.org/3/library/stdtypes.html#memoryview
pv = cbor2.loads(mv)
mv.release()
return pv
def __getitem__(self, key):
return self.get(key)
def __contains__(self, key):
cur = self.txn.cursor(self.db)
found = cur.set_key(str2byt(key))
cur.close()
return found
def __iter__(self):
'''
```
with state.begin('lemmings') as st:
lemmings = {name: st[name] for name in st}
```
'''
return TranIter(self.txn.cursor(self.db))
def items(self):
'''Note that value is a copy, save it back with assignment to persist.'''
cur = self.txn.cursor(self.db)
while cur.next():
key = str(cur.key(), 'utf-8')
val = cbor2.loads(cur.value())
yield (key, val)
cur.close()
def save(self, key: str, empty_collection):
'''Context Manager persisting the referenced collection on a clean `__exit__`'''
return Saviour(self, key, empty_collection)
class Saviour:
'''Context Manager for a state-hosted collection (like `dict`), saving it back on `__exit__`'''
def __init__(self, st: Tran, key: str, empty_collection):
self.st = st
self.key = key
self.empty_collection = empty_collection
def __enter__(self):
self.collection = self.st.get(self.key, self.empty_collection)
return self.collection
def __exit__(self, exc_type, exc_val, exc_tb):
self.st[self.key] = self.collection
class StateSaviour:
'''Context Manager for a state-hosted collection (like `dict`), saving it back on `__exit__`'''
def __init__(self, state, key: str, empty_collection):
self.state = state
self.key = key
self.empty_collection = empty_collection
def __enter__(self):
self.collection = self.state.get(self.key, self.empty_collection)
return self.collection
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type == None:
self.state[self.key] = self.collection
class State:
def __init__(self, path, max_dbs=64):
self.env = lmdb.Environment(path,
metasync=False,
sync=False,
map_async=True,
writemap=True,
max_dbs=max_dbs)
def begin(self, dbn=b'default', write=True):
return Tran(self.env, dbn, write)
def cur2(self, callback, dbn=b'default', write=True):
'''run `callback` on a cursor'''
with self.begin(dbn, write) as st:
cur = st.txn.cursor(st.db)
callback(st, cur)
cur.close()
def __setitem__(self, key, value):
with self.begin() as st:
st[key] = value
async def amem(self, key, vf):
'''memoize the key, invoking coroutine-returning `vf` to generate a missing value'''
with self.begin() as st:
return await st.amem(key, vf)
def mem(self, key, vf):
'''memoize the key, invoking `vf` to generate a missing value'''
with self.begin() as st:
return st.mem(key, vf)
def get(self, key, default=None):
with self.begin() as st:
return st.get(key, default)
def __getitem__(self, key):
with self.begin() as st:
return st[key]
def __contains__(self, key):
with self.begin() as st:
return key in st
def save(self, key: str, empty_collection):
'''Context Manager persisting the referenced collection on a clean `__exit__`'''
return StateSaviour(self, key, empty_collection)
def dump(self, size=False, cbor=False):
'''
python -c "import json; from llog.state import *; print(json.dumps(State('state.mdb').dump(), indent=2))"
python -c "import yaml; from llog.state import *; print(yaml.dump(State('state.mdb').dump()))"
python -mllog --dump=state.mdb
atexit.register(lambda: open('state.yaml', 'w', encoding='utf-8').write(
yaml.dump(State('perpetual.mdb').dump(), allow_unicode=True, width=99)))
'''
dick = {}
with self.env.begin(write=False) as txn:
for dbn, _ in txn.cursor().iternext():
dick[dbn.decode()] = {}
for dbn in dick:
db = self.env.open_db(dbn.encode(), txn=txn)
dick_cheney = {}
for key, value in txn.cursor(db=db).iternext():
if size:
dick_cheney[key.decode()] = len(value)
elif cbor:
dick_cheney[key.decode()] = value
else:
pv = cbor2.loads(value)
dick_cheney[key.decode()] = pv
dick[dbn] = dick_cheney
return dick
if __name__ == '__main__':
import os
import shutil
import tempfile
import time
from llog import log
# clean the cache for library users
shutil.rmtree('__pycache__', ignore_errors=True)
log(f"Going to “{tempfile.gettempdir()}”…")
os.chdir(tempfile.gettempdir())
# to start with a clean slate
#shutil.rmtree('test-state.mdb')
instance = int(time.time() / 600)
state = State('test-state.mdb')
# Example of extending the database size
# which helps with “MDB_MAP_FULL: Environment mapsize limit reached”
# https://lmdb.readthedocs.io/en/release/#lmdb.Environment.set_mapsize
state.env.set_mapsize(11 * 1024 * 1024)
with state.begin() as st:
if st['instance'] != instance:
log(f"Instance {instance} is new, scrapping the state…")
st.clear()
st['instance'] = instance
tim = st.mem('tim', lambda: int(time.time()))
state['foo'] = 'bar'
bar = state.mem('bar', lambda: 40 + 2)
with state.begin('count') as ct:
ct['cnt'] = ct['cnt'] + 1 if ct['cnt'] != None else 0
assert 'cnt' in ct
with state.begin() as st:
with st.save('dick', {}) as dicst:
dicst['tim'] = int(time.time())
assert 'dick' in st
with state.save('dick', {}) as dicst:
dicst['sec'] = dicst['tim'] % 60
log(state.dump())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment