Skip to content

Instantly share code, notes, and snippets.

@vadimkantorov
Last active January 10, 2024 17:40
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save vadimkantorov/7a0cb496492fd840e4d5f21e9d02bcfe to your computer and use it in GitHub Desktop.
Filter a subset of columns and of rows from a CSV file
# -*- coding: utf-8 -*-
#
# Usage:
# python csvsubset.py mycsv.csv --ignorecase --grep "donat" --key "First Name" "Last Name" "Email" "Mailing Postal Code" "Mailing City" --output-encoding=utf-16le > mycsvsubset.csv
import sys
import csv
import argparse
import re
parser = argparse.ArgumentParser()
parser.add_argument('input_path')
parser.add_argument('--key', nargs = '+', default = ['First Name', 'Last Name', 'Email'], help = 'ignorecase is used')
parser.add_argument('--grep', help = 'regex applied to at least one of fields')
parser.add_argument('--input-delimiter', default = ',', choices = [',', ';', '\t'])
parser.add_argument('--input-encoding', default = 'utf-8-sig', help = 'utf-8-sig is used to remove BOM')
parser.add_argument('--output-delimiter', default = ',', choices = [',', ';', '\t'])
parser.add_argument('--output-encoding', default = 'utf-8', help = '''to open correctly in Excel and ensure a BOM, use --encoding=utf-16le ''')
parser.add_argument('--ignorecase', action = 'store_true', help = 'in the key fields and in grep')
args = parser.parse_args()
col = lambda rows, fieldname: ([k for k in rows[0] if (k or '').lower() == (fieldname or '').lower()] + [None])[0] if rows else None
print('#', args, file = sys.stderr)
reader = csv.DictReader(open(args.input_path, encoding = args.input_encoding), delimiter = args.input_delimiter)
rows = list(reader)
key = [col(rows, k) for k in args.key]
print('# key:', key, file = sys.stderr)
print('# sorted(reader.fieldnames):', sorted(reader.fieldnames), file = sys.stderr)
if key:
rows = [d for d in rows if (not args.grep) or any(re.search(args.grep, str(v), flags = re.IGNORECASE if args.ignorecase else 0) for v in d.values())]
sys.stdout.reconfigure(encoding = args.output_encoding, newline = None)
writer = csv.DictWriter(sys.stdout, fieldnames = key, delimiter = args.output_delimiter, extrasaction = 'ignore')
writer.writeheader()
writer.writerows(rows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment