Skip to content

Instantly share code, notes, and snippets.

@shinkou
Last active September 2, 2023 02:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shinkou/f4d39866dbdffec1b740065619de1e12 to your computer and use it in GitHub Desktop.
Save shinkou/f4d39866dbdffec1b740065619de1e12 to your computer and use it in GitHub Desktop.
A snippet which crops from CSVs with a very small memory footprint
#!/usr/bin/env python3
# vim: fileencoding=utf-8 ff=unix hls lcs=tab\:>. list noet sw=4 ts=4 tw=76
#
# How to use:
#
# Scenario 1 - show only the first 16 entries
# $ lcsv.py --limit 16 my_data.csv
#
# Scenario 2 - convert CSV which uses pipe characters "|" back to commas
# $ lcsv.py --output-delimiter=, --delimiter='|' my_data.csv
#
# Scenario 3 - hide the "password" column
# $ lcsv.py --hide password -- my_accesses.csv
#
# Scenario 4 - show only the "gender" and "age" columns
# $ lcsv.py --show gender age -- employees.csv
#
from argparse import ArgumentParser
import csv
import io
QTLVL = {
'all': csv.QUOTE_ALL
, 'minimal': csv.QUOTE_MINIMAL
, 'nonnumeric': csv.QUOTE_NONNUMERIC
, 'none': csv.QUOTE_NONE
}
def escstr(s):
return s.encode('raw_unicode_escape').decode('unicode_escape')
def get_args():
parser = ArgumentParser(description='le CSV cropper')
parser.add_argument(
'files'
, metavar='FILE'
, type=str
, nargs='+'
, help='CSV file to read data from'
)
parser.add_argument(
'--delimiter'
, metavar='CHAR'
, type=escstr
, default=','
, help='delimiter'
)
parser.add_argument(
'--doublequote'
, action='store_true'
, help='doublequote'
)
parser.add_argument(
'--eol'
, metavar='STR'
, type=escstr
, default='\r\n'
, help='end of line string (default: CRLF)'
)
parser.add_argument(
'--escchar'
, metavar='CHAR'
, type=escstr
, help='escape character'
)
parser.add_argument(
'--limit'
, metavar='N'
, type=int
, default=0
, help='max number of rows to return'
)
parser.add_argument(
'--offset'
, metavar='N'
, type=int
, default=0
, help='number of rows to skip from the beginning'
)
parser.add_argument(
'--output-delimiter'
, metavar='CHAR'
, type=escstr
, help='output delimiter (only if different from input)'
)
parser.add_argument(
'--output-doublequote'
, action='store_true'
, help='output doublequote (only if different from input)'
)
parser.add_argument(
'--output-eol'
, metavar='STR'
, type=escstr
, help='output end of line string (only if different from input)'
)
parser.add_argument(
'--output-escchar'
, metavar='CHAR'
, type=escstr
, help='output escape character (only if different from input)'
)
parser.add_argument(
'--output-quotechar'
, metavar='CHAR'
, type=escstr
, help='output quotechar (only if different from input)'
)
parser.add_argument(
'--output-quoting'
, choices=QTLVL.keys()
, help='output quoting level (only if different from input)'
)
parser.add_argument(
'--quotechar'
, metavar='CHAR'
, type=escstr
, help='quotechar'
)
parser.add_argument(
'--quoting'
, choices=QTLVL.keys()
, help='quoting level'
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
'--show'
, metavar='COLUMN'
, nargs='*'
, help='column names to show (cannot be used with --hide)'
)
group.add_argument(
'--hide'
, metavar='COLUMN'
, nargs='*'
, help='column names to hide (cannot be used with --only-show)'
)
return parser.parse_args()
def row2csvstr(dlmt, dblqt, eol, escchr, qtchr, qt):
def renderrow(r):
with io.StringIO() as strio:
csv2console_writer = csv.writer(
strio
, delimiter=dlmt
, doublequote=dblqt
, escapechar=escchr
, lineterminator=eol
, quotechar=qtchr
, quoting=QTLVL[
qt if qt is not None else
'none' if qtchr is None else 'minimal'
]
)
csv2console_writer.writerow(r)
strio.seek(0)
return strio.read().rstrip("\r\n")
return renderrow
def main():
args = get_args()
fn_renderrow = row2csvstr(
args.output_delimiter or args.delimiter
, args.output_doublequote or args.doublequote
, args.output_eol or args.eol
, args.output_escchar or args.escchar
, args.output_quotechar or args.quotechar
, args.output_quoting or args.quoting
)
for fpath in args.files:
with open(fpath, 'r') as f:
reader = csv.reader(
f
, delimiter=args.delimiter
, doublequote=args.doublequote
, escapechar=args.escchar
, lineterminator=args.eol
, quotechar=args.quotechar
, quoting=QTLVL[
args.quoting if args.quoting is not None else
'none' if args.quotechar is None else 'minimal'
]
)
headers = next(reader)
if args.show:
idxs_show = [
headers.index(col)
for col in args.show if col in headers
]
fn_showhide = lambda r: [
r[idx]
for idx in range(0, len(r)) if idx in idxs_show
]
elif args.hide:
idxs_hide = [
headers.index(col)
for col in args.hide if col in headers
]
fn_showhide = lambda r: [
r[idx]
for idx in range(0, len(r)) if idx not in idxs_hide
]
else:
fn_showhide = lambda r: r
for i in range(0, args.offset):
next(reader)
print(fn_renderrow(fn_showhide(headers)))
i = 1
for r in reader:
if 0 < args.limit and args.limit < i:
break
print(fn_renderrow(fn_showhide(r)))
i += 1
if '__main__' == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment