Skip to content

Instantly share code, notes, and snippets.

@haginara
Last active September 27, 2019 23:07
Show Gist options
  • Save haginara/59e92a6bf6b45547013ecfd754763cc6 to your computer and use it in GitHub Desktop.
Save haginara/59e92a6bf6b45547013ecfd754763cc6 to your computer and use it in GitHub Desktop.
zee-cut script with python3
#!/usr/bin/env python3
import os
import sys
import csv
import json
import logging
import pprint
import time
from datetime import datetime
from argparse import ArgumentParser
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
parser = ArgumentParser()
parser.add_argument(
"-c",
"--print_header",
action="store_true",
help="Include the first format header block in the output.",
)
parser.add_argument("-C")
parser.add_argument(
"-d",
"--time_column",
action="append",
default=[],
help="Convert time values into human-readable format.",
)
parser.add_argument("-D")
parser.add_argument(
"-f", "--fields", action="store_true", help="print field header line"
)
parser.add_argument(
"-F",
"--sep",
type=str,
help="<ofs> Sets a different output field separator character.",
)
parser.add_argument("-a", "--all", action="store_true", help="print all fields")
parser.add_argument(
"-n", "--no", action="append", default=[], help="print *execpt* these fields"
)
parser.add_argument("-u")
parser.add_argument("-U")
parser.add_argument("filename")
options, args = parser.parse_known_args(sys.argv[1:])
def convert_sep(sep_str):
return chr(int(sep_str.replace("\\x", "")))
def convert_datetime(datetime_str, timeformat="%Y-%m-%d %H:%M:%S"):
dt = datetime.fromtimestamp(float(datetime_str))
return dt.strftime(timeformat)
def get_meta(filename) -> dict:
meta = dict()
# keys variable is orderd.
keys = [
"separator",
"set_separator",
"empty_field",
"unset_field",
"path",
"open",
"fields",
"types",
]
with open(filename, "r") as f:
for i in range(len(keys)):
line = f.readline()
value = line.replace(f"#{keys[i]}", "").strip()
if value.startswith("\\x"):
value = convert_sep(value)
meta[keys[i]] = value
meta["fields"] = meta["fields"].split(meta["separator"])
meta["types"] = meta["types"].split(meta["separator"])
return meta
def find_timecol(line: str, lp):
pass
def follow_zeeklog(
filename,
meta,
fields=False,
sep=None,
all_column=False,
columns=[],
excludes=[],
time_columns=[],
):
print_sep = sep if sep else meta["separator"]
columns = time_columns + columns
if len(columns) == 0 or all_column is True:
columns = meta["fields"]
# Exclude columns
try:
[columns.remove(exclude) for exclude in excludes]
except ValueError as e:
logger.debug(f"{e}")
logger.debug(f"Columns: {columns}, time_columns: {time_columns}")
if fields:
print(f"{print_sep}".join(meta["fields"]))
with open(filename, "r") as f:
for line in f.readlines():
line = line.strip()
if line.startswith("#"):
continue
row = dict(zip(meta["fields"], line.split(meta["separator"])))
for tf in time_columns:
row[tf] = convert_datetime(row.get(tf))
logger.debug(pprint.pformat(row, indent=2))
print(f"{print_sep}".join([row[column] for column in columns]))
def main():
logger.debug(f"Load: {options.filename}")
meta = get_meta(options.filename)
if options.print_header:
print(meta)
follow_zeeklog(
options.filename,
meta,
fields=options.fields,
sep=options.sep,
all_column=options.all,
excludes=options.no,
time_columns=options.time_column,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment