Skip to content

Instantly share code, notes, and snippets.

@gmalmquist
Last active August 11, 2023 13:14
Show Gist options
  • Save gmalmquist/f9df224881b503bd03a3 to your computer and use it in GitHub Desktop.
Save gmalmquist/f9df224881b503bd03a3 to your computer and use it in GitHub Desktop.
Command-line stream editing using python.
#!/usr/bin/env python
# coding=utf-8
from __future__ import print_function
from collections import defaultdict
import datetime
import io
import json
import locale
import os
import re
import sys
from textwrap import dedent
date = datetime.datetime.strptime
def union(lists):
ls = []
for sublist in lists:
ls.extend(sublist)
return ls
def intersection(*args):
result = None
for arg in args:
if result is None:
result = set(arg)
else:
result = result.intersection(set(arg))
return result
def underindent(lines):
def ui(line):
l = len(line)
line = line.lstrip()
return '{}{}'.format('_'*(l - len(line)), line)
return map(ui, lines)
def t2sort(lines, pos=-1, df='%b %d, %Y %I:%M %p'):
def fixlines(lines):
cline = ''
for line in lines:
if re.search('[0-9]+:[0-9]+ [ap]m*', line.strip()):
if len(cline) > 0: cline += ' '
yield cline + line
cline = ''
else:
if len(cline) > 0: cline += ' '
cline += line
if cline:
yield cline
lines = fixlines(lines)
lines = [l.strip() for l in lines if re.search('[0-9]+ [ap]m*$', l.strip())]
lines = [l for l in lines if l]
lines = sorted(lines, key=lambda l: date(l.split('\t')[pos], df))
return lines
def after(s, p, right=False):
if p not in s:
return s
try:
i = s.rfind(p) if right else s.find(p)
return s[i+len(p):]
except: # Assume is list.
for i,e in enumerate(s):
if e == p:
break
else:
i = -1
if i >= 0:
return s[i+1:]
return s
def before(s, p, right=False):
if p not in s:
return s
try:
i = s.rfind(p) if right else s.find(p)
if i < 0: i = len(s)
return s[:i]
except: # Assume is list.
for i,e in enumerate(s):
if e == p:
return s[:i]
return s
def aftf(p):
return lambda s: after(s, p)
def beff(p):
return lambda s: before(s, p)
def strip(s):
return s.strip()
def indent(s, spaces=4):
return ' '*4+s
def block_indent(s, spaces=4):
return '\n'.join([indent(line, spaces) for line in dedent(s).split('\n')])
def numbered(path):
h = open(path, 'r')
lines = h.readlines()
h.close()
return ''.join(['%s:%00d: %s' % (path, i+1, s) for (i,s) in enumerate(lines)])
def abs_listdir(path):
if not os.path.exists(path) or not os.path.isdir(path):
return []
return [os.path.join(path, name) for name in os.listdir(path)]
def call_me_maybe(value, old_value):
if not value:
return value
if callable(value):
return value(old_value)
if isinstance(value, tuple) and all(callable(v) for v in value):
for c in value:
old_value = c(old_value)
return old_value
return value
def counts(ls):
cts = defaultdict(int)
for item in ls:
cts[item] += 1
return ['{}: {}'.format(key, value) for key, value in sorted(cts.items(), key=lambda x: -x[1])]
def unique(ls):
seen = set()
results = []
for item in ls:
if item not in seen:
seen.add(item)
results.append(item)
return results
def hist(key=None):
if key is None:
key = lambda x: x
def hist(ls):
cts = defaultdict(int)
for item in ls:
cts[key(item)] += 1
max_len = max(len('key'), max(map(len, map(str, cts.keys()))))
max_val_len = max(len('counts'), max(map(len, map(str, cts.values()))))
wall_vertical = '║'
wall_horizontal = '═'
wall_both = '╬'
col_fmt = '{v} {col:<' + str(max_len) + '}'
val_fmt = '{val:<' + str(max_val_len) + '} {v}'
rows = [(col_fmt + ' {v} ' + val_fmt).format(col=k, v=wall_vertical, val=val) for k, val in cts.items()]
return [
'╔' + (wall_horizontal*(max_len + 2)) + '╦' + (wall_horizontal*(max_val_len + 2)) + '╗',
('%s %s %s' % (col_fmt, wall_vertical, val_fmt)).format(col='key', v=wall_vertical, val='counts'),
'{}{}{}{}{}'.format('╠', wall_horizontal*(2+max_len), wall_both, wall_horizontal*(2+len('counts')), '╣'),
] + rows + [
'╚' + (wall_horizontal*(max_len + 2)) + '╩' + (wall_horizontal*(max_val_len + 2)) + '╝',
]
return hist
def alphabet(i):
try:
i = int(i)
except:
return ''
result = []
while i > 0:
result.append((i-1) % 26)
i = int((i-1) / 26)
return ''.join(chr(ord('A') + x) for x in reversed(result))
def reindex_from(x):
return lambda lines: reindex(lines, x)
def reindex(lines, start_index=0):
pattern = '@Index[(].*?[)]'
index = start_index - 1
for line in lines:
if not re.search(pattern, line):
yield line
continue
index += 1
yield re.sub(pattern, '@Index(%s)' % index, line)
def hrnum(x):
locale.setlocale(locale.LC_ALL, 'en_US')
return locale.format('%f', float(str(x).strip()), grouping=True)
def main():
lines = []
while '-help' not in sys.argv[1:] and '-h' not in sys.argv[1:]:
try:
line = raw_input()
except: break
if line is None: break
while '\n' in line:
n = line.find('\n')
lines.append(line[:n])
line = line[n+1:]
lines.append(line)
flags = set()
args = []
for a in sys.argv[1:]:
if re.match('^[-][a-zA-Z]+$', a) is not None:
flags.update(a[1:])
else:
args.append(a)
# Useful Constants
NL = '\n'
TB = '\t'
CN = ':'
BS = '\\'
FS = '/'
SR = '*'
SP = ' '
SQ = "'"
DQ = '"'
SC = ';'
CC = ':'
if 'h' in flags or 'help' in flags:
print('Flags:')
print(' -s: Convert all lines to single NL-joined string s, and process it.')
print(' -f: Return lines filtered by predicate over variable s.')
print(' -g: Like -s, but override the lines variable instead and print out NL.join(lines)')
print(' : default behavior is to run command once per line, with lines[i] = s.')
return
command = 's = ' + ' '.join(args)
if 's' in flags:
s = '\n'.join(lines)
old_s = s
exec(command)
s = call_me_maybe(s, old_s)
print(s)
elif 'f' in flags:
nlines = []
for i,s in enumerate(lines):
line = s
exec(command)
s = call_me_maybe(s, line)
if s: nlines.append(line)
print('\n'.join(nlines))
elif 'g' in flags:
s = '\n'.join(lines)
old_lines = lines
command = 'lines = ' + ' '.join(args)
exec(command)
lines = call_me_maybe(lines, old_lines)
s = '\n'.join(lines)
print(s)
else:
for i,s in enumerate(lines):
old_s = s
exec(command)
s = call_me_maybe(s, old_s)
lines[i] = str(s)
print('\n'.join(lines))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment