Skip to content

Instantly share code, notes, and snippets.

@Cologler
Last active June 26, 2018 00:07
Show Gist options
  • Save Cologler/3417beb18b5c89a68420aa8aaafec9de to your computer and use it in GitHub Desktop.
Save Cologler/3417beb18b5c89a68420aa8aaafec9de to your computer and use it in GitHub Desktop.
format output from pipe.
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017~2999 - cologler <skyoflw@gmail.com>
# ----------
# format output from pipe.
# ----------
'''Usage:
python pr.py PATTERN
For example:
echo abc|python pr.py {line} => output: abc
echo abc|python pr.py {$len(line)} => output: 3 # use func
ping 10.0.0.1 -t|pr "[{time}] {line}" => print with time!
Available variables:
line # line text (raw input)
index # line index
datetime
date
time
year
month
day
hour
minute
second
Available funcs:
$len # for any kinds variables
$ms # for `datetime` and `time`, show milliseconds
$us # for `datetime` and `time`, show microseconds
$iso # for `datetime`, show `T` and `Z`
'''
import sys
import traceback
import datetime
from collections import Mapping
import re
# exprs
class IExpr:
def resolve_value(self, mapping: Mapping):
raise NotImplementedError
class VarGetterExpr(IExpr):
def __init__(self, key):
self._key = key
def resolve_value(self, mapping: Mapping):
cb = mapping.get_callback(self._key)
return cb() if cb else None
class MethodCallExpr(IExpr):
def __init__(self, method_name, obj_expr):
self._method_name = method_name
self._obj_expr = obj_expr
def resolve_value(self, mapping: Mapping):
obj = self._obj_expr.resolve_value(mapping)
if obj is not None:
if not isinstance(obj, Value): #ensure wraped
obj = Value(obj)
cb = getattr(obj, self._method_name, None)
return cb() if cb else None
def _key_clear(key):
'''remove invaild parts.'''
ret = key.strip()
if ret.startswith('(') and ret.endswith(')'):
ret = ret[1:-1]
return ret if len(key) == len(ret) else _key_clear(ret) # continue
RE_FUNC_CALL = re.compile('^\\$([a-z][a-z0-9]*)\\((.*)\\)$')
RE_VAR = re.compile('^([a-z]+)$')
def make_expr(key: str):
key = _key_clear(key)
if not key:
return None
func_call_match = RE_FUNC_CALL.match(key)
if func_call_match:
func_name, args_value = func_call_match.groups()
if func_name[0] == '_':
return None # cannot access private method
obj_expr = make_expr(args_value)
if not obj_expr:
return None # no arguments always be property
return MethodCallExpr(func_name, obj_expr)
var_match = RE_VAR.match(key)
if var_match:
return VarGetterExpr(var_match.groups()[0])
# values
class Value:
'''wrap date to hide methods.'''
def __init__(self, value):
self._value = value
def __str__(self):
return str(self._value)
def __getattr__(self, attr: str):
if len(attr) > 1:
if attr[1:].isdigit():
if attr[0] == 'l':
return lambda: str(self).ljust(int(attr[1:]))
if attr[0] == 'r':
return lambda: str(self).rjust(int(attr[1:]))
def len(self):
return len(str(self))
# datetimes
def now():
'''short for `datetime.datetime.now()`'''
return datetime.datetime.now()
class DateTime(Value):
def __init__(self, value: datetime.datetime, fmt=None):
super().__init__(value)
self._fmt = fmt or {
'sep': ' ',
'timespec': 'seconds'
}
def __str__(self):
return self._value.isoformat(**self._fmt)
def ms(self):
fmt = self._fmt.copy()
fmt['timespec'] = 'milliseconds'
return DateTime(self._value, fmt)
def us(self):
fmt = self._fmt.copy()
fmt['timespec'] = 'microseconds'
return DateTime(self._value, fmt)
def iso(self):
fmt = self._fmt.copy()
fmt['sep'] = 'T'
return DateTime(self._value, fmt)
class Time(Value):
def __init__(self, value: datetime.time, fmt=None):
super().__init__(value)
self._fmt = fmt or {
'timespec': 'seconds'
}
def __str__(self):
return self._value.isoformat(**self._fmt)
def ms(self):
fmt = self._fmt.copy()
fmt['timespec'] = 'milliseconds'
return Time(self._value, fmt)
def us(self):
fmt = self._fmt.copy()
fmt['timespec'] = 'microseconds'
return Time(self._value, fmt)
class ArgumentMapping(Mapping):
def __init__(self):
super().__init__()
self._expr_cache = {}
self._callbacks = {
'datetime': lambda: DateTime(now()),
'date': lambda: now().date(),
'time': lambda: Time(now().time()),
'year': lambda: now().date().year,
'month': lambda: now().date().month,
'day': lambda: now().date().day,
'hour': lambda: now().time().hour,
'minute': lambda: now().time().minute,
'second': lambda: now().time().second,
}
def __getitem__(self, key):
expr = self._expr_cache.get(key)
if not expr:
self._expr_cache[key] = expr = make_expr(key) or key
if expr == key: # build fail.
return key
value = expr.resolve_value(self)
return str(value) if value is not None else key
def __iter__(self):
return iter(self._callbacks)
def __len__(self):
return len(self._callbacks)
def set_value(self, key, value):
self._callbacks[key] = lambda: value
def get_callback(self, key):
return self._callbacks.get(key)
def main(argv=None):
if argv is None:
argv = sys.argv
try:
if sys.stdin.isatty():
print('No input from pipe')
print(__doc__)
exit(1)
if len(argv) != 2:
print('Require argument PATTERN')
print(__doc__)
exit(2)
pattern = argv[1]
am = ArgumentMapping()
try:
for index, line in enumerate(sys.stdin):
line = line.rstrip()
am.set_value('index', index)
am.set_value('line', line)
am.set_value('$len(line)', len(line))
print(pattern.format_map(am))
except KeyboardInterrupt:
pass
except Exception: # pylint: disable=W0703
traceback.print_exc()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment