Skip to content

Instantly share code, notes, and snippets.

@jeetsukumaran
Created August 26, 2021 21:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeetsukumaran/c7c236d244b188c6e56031d6a36cff89 to your computer and use it in GitHub Desktop.
Save jeetsukumaran/c7c236d244b188c6e56031d6a36cff89 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
###############################################################################
##
## Copyright 2014 Jeet Sukumaran.
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program. If not, see <http://www.gnu.org/licenses/>.
##
###############################################################################
"""
Manipulation of data in table format.
"""
import argparse
import sys
import os
import re
import math
import collections
import csv
def open_source_file(filepath):
if sys.version_info >= (3,0,0):
f = open(filepath, 'r')
else:
f = open(filepath, 'rU')
return f
def open_output(filepath):
if filepath is None:
return sys.stdout
f = open(filepath, "w")
return f
def open_output_file_for_csv_writer(filepath, append=False):
if filepath is None:
out = sys.stdout
elif sys.version_info >= (3,0,0):
out = open(filepath,
"a" if append else "w",
newline='')
else:
out = open(filepath, "ab" if append else "wb")
return out
def get_fieldname_value_map(labels):
if not labels:
return collections.OrderedDict()
fieldname_value_map = collections.OrderedDict()
for label in labels:
match = re.match(r"\s*(.*)\s*:\s*(.*)\s*", label)
if not match:
raise ValueError("Cannot parse fieldname and label (format required: fieldname:value): {}".format(label))
fieldname, value = match.groups(0)
fieldname_value_map[fieldname] = value
return fieldname_value_map
def process_delimiter_entry(delimiter):
if delimiter == r'\t':
delimiter = "\t"
elif delimiter == r'\|':
delimiter = "|"
return delimiter
def flatten_table(args):
out = open_output(args.output_filepath)
for filepath in args.filepaths:
if not args.quiet:
sys.stderr.write("-- [{}]\n".format(filepath))
src = open_source_file(filepath)
with src:
reader = csv.DictReader(src, delimiter=args.field_separator)
fieldnames = reader.fieldnames
max_field_name_width = max([len(f) for f in fieldnames])
for row_idx, row_data in enumerate(reader):
if not args.quiet:
sys.stderr.write("-- [{}:{}]\n".format(filepath, row_idx+1))
if args.num_rows and row_idx > args.num_rows:
break
for fieldname in fieldnames:
fieldvalue = row_data[fieldname]
if args.no_align:
out.write("{}: {}\n".format(fieldname, fieldvalue))
else:
out.write("{fieldname:{fieldnamewidth}}: {fieldvalue}\n".format(
fieldname=fieldname,
fieldnamewidth=max_field_name_width,
fieldvalue=fieldvalue))
def merge_table(args):
additional_fields = get_fieldname_value_map(args.add_field)
table_data = []
fieldnames = []
fieldnames.extend(additional_fields)
for filepath in args.filepaths:
if not args.quiet:
sys.stderr.write("-- [{}]".format(filepath))
src = open_source_file(filepath)
with src:
reader = csv.DictReader(src, delimiter=args.field_separator)
subfieldnames = reader.fieldnames
if subfieldnames is None:
if args.ignore_invalid_files:
if not args.quiet:
sys.stderr.write(": EMPTY (ignoring)\n")
continue
else:
sys.exit("\nFile is invalid")
if not args.quiet:
sys.stderr.write("\n")
for f in subfieldnames:
if f not in fieldnames:
fieldnames.append(f)
for row_idx, row_data in enumerate(reader):
if args.num_rows and row_idx > args.num_rows:
break
row_data.update(additional_fields)
table_data.append(row_data)
out = open_output_file_for_csv_writer(args.output_filepath, append=args.append)
with out:
if args.output_delimiter is None:
delimiter = args.field_separator
else:
delimiter = args.output_delimiter
writer = csv.DictWriter(
out,
fieldnames=fieldnames,
restval=args.empty_field,
delimiter=delimiter,
lineterminator=os.linesep,
)
if not args.no_header_row:
writer.writeheader()
writer.writerows(table_data)
def inspect_table(args):
out = sys.stdout
if args.zero_based_column_indexing:
index_offset = 0
else:
index_offset = 1
for filepath in args.filepaths:
if not args.quiet:
sys.stderr.write("-- [{}]\n".format(filepath))
src = open_source_file(filepath)
with src:
reader = csv.DictReader( src, delimiter=args.field_separator)
fieldnames = reader.fieldnames
max_field_name_width = max([len(f) for f in fieldnames])
if not args.no_list_columns:
nfields = len(fieldnames)
ndigits = int(math.floor(math.log(nfields, 10)) + 1)
max_index_field_width = ndigits + 2
for field_idx, fieldname in enumerate(fieldnames):
index_field = ("[{}]".format(field_idx + index_offset)).rjust(max_index_field_width)
out.write("{} {}\n".format(index_field, fieldname))
if args.as_python_list:
out.write("fields = [\n")
for field_idx, fieldname in enumerate(fieldnames):
fieldname_entry = "'{}',".format(fieldname)
out.write(" {fieldname:{width}} # {field_idx}\n".format(
fieldname=fieldname_entry,
field_idx=field_idx + index_offset,
width=max_field_name_width + 3))
out.write("]\n")
def main():
"""
Main CLI handler.
"""
master_parser = argparse.ArgumentParser()
subparsers = master_parser.add_subparsers(title="Operation Commands", help="type '%(prog)s <command-name> --help' for more information.")
source_options_parser = argparse.ArgumentParser(add_help=False)
source_options_parser.add_argument("filepaths", metavar="FILE", type=str, nargs="*", help="Path to input data file(s).")
source_options = source_options_parser.add_argument_group("Source Options")
source_options.add_argument("--from-file",
default=None,
metavar="FILEPATH",
help="Read list of files from FILEPATH.")
source_options.add_argument("-F", "--field-separator",
default="\t",
metavar="CHAR",
help="Delimiter separating fields in input (default: '<TAB>').")
source_options.add_argument("-H", "--no-header-row",
action="store_true",
default=False,
help="Input table(s) have no header rows.")
table_output_options_parser = argparse.ArgumentParser(add_help=False)
table_output_options = table_output_options_parser.add_argument_group("Output Options")
table_output_options.add_argument("-o", "--output-filepath",
default=None,
help="Path to output file (default: standard output)")
table_output_options.add_argument("-D", "--output-delimiter",
default=None,
metavar="CHAR",
help="Delimiter to use to separate fields in output (default: same as input delimiter).")
table_output_options.add_argument("--suppress-header-row",
action="store_true",
default=False,
help="Do not write header row")
table_output_options.add_argument( "--append",
action="store_true",
default=False,
help="Append to output file if it already exists instead of overwriting.")
inspect_parser = subparsers.add_parser('inspect',
parents=[source_options_parser],
help="Report information about the tables.",
)
inspect_processing_options = inspect_parser.add_argument_group("Processing Options")
inspect_processing_options.add_argument("-r", "--count-rows",
action="store_true",
default=False,
help="Count the number of rows (not including header row).")
inspect_processing_options.add_argument("-C", "--no-list-columns",
action="store_true",
default=False,
help="Do not list the columns by name.")
inspect_processing_options.add_argument("--0-based",
dest="zero_based_column_indexing",
action="store_true",
default=False,
help="Use 0-based column indexing.")
inspect_processing_options.add_argument("--as-python-list",
action="store_true",
default=False,
help="Show results as a Python-syntax list.")
inspect_parser.set_defaults(func=inspect_table)
merge_parser = subparsers.add_parser('merge',
parents=[source_options_parser, table_output_options_parser],
help="Merge rows of all tables.",
)
merge_processing_options = merge_parser.add_argument_group("Processing Options")
merge_processing_options.add_argument("-n", "--num-rows",
default=None,
metavar="NUM-ROWS",
type=int,
help="Maximum number of data rows to process in each file.")
# merge_processing_options.add_argument("--file-index-fieldname",
# default=False,
# metavar="FIELDNAME",
# help="Add a new field, '<FIELDNAME>', that will be given a value equal to the (1-based) index of the source file of the row.")
# merge_processing_options.add_argument("--file-path-fieldname",
# default=False,
# metavar="FIELDNAME",
# help="Add a new field, '<FIELDNAME>', that will be given a value equal to the full path index of the source file.")
merge_processing_options.add_argument("-a", "--add-field",
action="append",
metavar="NAME:VALUE",
help="Fields to insert into output (in format 'NAME:VALUE')")
merge_processing_options.add_argument("-e", "--empty-field",
metavar="VALUE",
default=None,
help="Value to use for missing fields (if not specified, a missing field will result in an error).")
merge_processing_options.add_argument("-i", "--ignore-invalid-files",
action="store_true",
default=None,
help="Ignore broken or empty files.")
merge_parser.set_defaults(func=merge_table)
flatten_parser = subparsers.add_parser('flatten',
parents=[source_options_parser, table_output_options_parser],
help="Print each field name-value pair as a separate row.",
)
flatten_processing_options = flatten_parser.add_argument_group("Processing Options")
flatten_processing_options.add_argument("-n", "--num-rows",
default=None,
metavar="NUM-ROWS",
type=int,
help="Maximum number of data rows to process in each file.")
flatten_processing_options.add_argument("-a", "--add-field",
action="append",
metavar="NAME:VALUE",
help="Fields to append to output (in format 'NAME:VALUE')")
flatten_processing_options.add_argument("--no-align",
action="store_true",
default=False,
help="Do not align output.")
flatten_parser.set_defaults(func=flatten_table)
master_parser.add_argument("-q", "--quiet",
action="store_true",
default=False,
help="Suppress meta-information and progress messages.")
args = master_parser.parse_args()
if len(args.filepaths) == 0 and args.from_file is None:
sys.exit("Need to specify path(s) to input filepaths.")
elif args.from_file is not None:
with open(args.from_file, "r") as f:
for path in f.read().split("\n"):
if not path:
continue
path = os.path.expandvars(os.path.expanduser(path))
args.filepaths.append(path)
args.field_separator = process_delimiter_entry(args.field_separator)
try:
args.output_delimiter = process_delimiter_entry(args.output_delimiter)
except AttributeError:
pass
args.func(args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment