Skip to content

Instantly share code, notes, and snippets.

@hlawrenz
Created June 6, 2014 14:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hlawrenz/1ef3490cb9aa021297a0 to your computer and use it in GitHub Desktop.
Save hlawrenz/1ef3490cb9aa021297a0 to your computer and use it in GitHub Desktop.
A bunch of csv related python scripts.
#!/usr/bin/env python
import argparse
import csv
import sys
from collections import OrderedDict
import simplejson as json
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert csv files to json.")
parser.add_argument("-p", "--pretty",
action='store_true',
help="Output should be pretty printed.")
parser.add_argument("--headers",
action='store_true',
help="Each list item should be a dictionary with the headers as keys.")
parser.add_argument("--order",
default=None,
help="Order for dictionary keys.")
parser.add_argument("-e", "--empty",
default=None,
help="Extra empty keys for dictionary. Implies -h or --headers.")
parser.add_argument("--empty-value",
default="",
help="Extra empty keys for dictionary. Implies -h or --headers.")
parser.add_argument("-o", "--output",
type=argparse.FileType("wb"),
default=sys.stdout,
help="Output file. Standard out will be used if not specified.")
parser.add_argument("input",
nargs="*",
type=argparse.FileType("rb"),
default=[sys.stdin],
help="Excel file to convert.")
args = parser.parse_args()
if args.order:
args.order = args.order.split(",")
else:
args.order = []
if args.empty:
args.empty = args.empty.split(",")
else:
args.empty = []
if len(args.empty):
args.headers = True
records = []
row_idx = 0
for csv_file in args.input:
reader = csv.reader(csv_file)
headers = dict()
for row in reader:
if args.headers:
if reader.line_num == 1:
headers = row
else:
record = OrderedDict()
for k in args.order:
record[k] = ""
for i, v in enumerate(row):
record[headers[i]] = v
for empty in args.empty:
record[empty] = args.empty_value
records.append(record)
else:
records.append(row)
json.dump(records, args.output, indent=' ')
#!/usr/bin/env python
import argparse
import csv
import sys
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Output csv with only specified columns from input csv file.")
parser.add_argument("-o", "--output",
type=argparse.FileType("wb"),
default=sys.stdout,
help="Output file. Standard out will be used if not specified.")
parser.add_argument("-i", "--input",
type=argparse.FileType("rb"),
default=sys.stdin,
help="Input file. Standard in will be used if not specified.")
parser.add_argument("--indialect",
default="excel",
help="Dialect of the input file.")
parser.add_argument("--outdialect",
default="excel",
help="Dialect of the output file.")
parser.add_argument("columns",
type=str,
nargs='+',
help="Columns to output.")
args = parser.parse_args()
writer = csv.writer(args.output, dialect=args.outdialect)
reader = csv.reader(args.input, dialect=args.indialect)
for row in reader:
new_row = []
for x in args.columns:
if x.startswith("x:"):
new_row.append(x[2:])
elif x.isdigit():
new_row.append(row[int(x)])
else:
exit("Bad column specifier: {}".format(x))
writer.writerow(new_row)
#!/usr/bin/env python
import argparse
import csv
import logging
import sys
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Count unique values in a column.")
parser.add_argument("-o", "--output",
type=argparse.FileType("wb"),
default=sys.stdout,
help="Output file. Standard out will be used if not specified.")
parser.add_argument("--indialect",
default="excel",
help="Dialect of the input file.")
parser.add_argument("--outdialect",
default="excel",
help="Dialect of the output file.")
parser.add_argument("--top",
default="excel",
help="Dialect of the output file.")
parser.add_argument("target",
type=int,
help="Target column.")
parser.add_argument("inputs",
type=argparse.FileType("rb"),
nargs='+',
default=sys.stdin,
help="CSV File from which to read.")
args = parser.parse_args()
counts = {}
writer = csv.writer(args.output, dialect=args.outdialect)
for input_file in args.inputs:
reader = csv.reader(input_file, dialect=args.indialect)
for row in reader:
try:
target_value = row[args.target]
except IndexError:
logging.warning("Skipping line. Target column doesn't exist on line {} of {}".format(reader.line_num,
input_file))
continue
value = target_value.strip()
if value in counts:
counts[value] += 1
else:
counts[value] = 1
sorted_counts = sorted([(k, v) for k, v in counts.iteritems()], key=lambda item: item[1])
for i in sorted_counts:
print '{} {}'.format(*i)
#!/usr/bin/env python
import argparse
import logging
import re
import sys
import unicodecsv as csv
def csvgrep(args):
matcher = re.compile(args.pattern)
writer = csv.writer(args.output, dialect=args.outdialect)
for input_file in args.inputs:
reader = csv.reader(input_file, dialect=args.indialect, encoding=args.encoding)
for row in reader:
try:
target_value = row[args.target]
except IndexError:
logging.warning("Skipping line. Target column doesn't exist on line {} of {}".format(reader.line_num,
input_file))
continue
if matcher.match(target_value):
writer.writerow(row)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Find rows in csv file that match pattern in target column.")
parser.add_argument("-o", "--output",
type=argparse.FileType("wb"),
default=sys.stdout,
help="Output file. Standard out will be used if not specified.")
parser.add_argument("--indialect",
default="excel",
help="Dialect of the input file.")
parser.add_argument("--encoding",
default="utf-8",
help="Encoding of the input file.")
parser.add_argument("--outdialect",
default="excel",
help="Dialect of the output file.")
parser.add_argument("target",
type=int,
help="Target column.")
parser.add_argument("pattern",
help="Pattern to match.")
parser.add_argument("inputs",
type=argparse.FileType("rb"),
nargs='*',
default=sys.stdin,
help="CSV File from which to read.")
args = parser.parse_args()
csvgrep(args)
#!/usr/bin/env python
from __future__ import print_function
from mmap import mmap, ACCESS_READ
import argparse
import csv
import datetime
import os
import sys
import xlrd
def warning(*objs):
print("WARNING: ", *objs, file=sys.stderr)
def csv_name(xl_name, sheet_name):
xl_name = os.path.splitext(os.path.basename(xl_name))[0]
fn = "{}_{}".format(xl_name, sheet_name)
fn = "".join([c for c in fn if c.isalnum() or c == '-' or c == '_']).rstrip()
return "{}.csv".format(fn)
def xl_rows(sh, date_format):
rn = 0
while rn < sheet.nrows:
values = sheet.row_values(rn)
# Cell Types: 0=Empty, 1=Text, 2=Number, 3=Date, 4=Boolean, 5=Error, 6=Blank
for idx, cell_type in enumerate(sh.row_types(rn)):
if cell_type == 3:
dt = datetime.datetime(*xlrd.xldate_as_tuple(values[idx], wb.datemode))
values[idx] = dt.strftime(date_format)
elif cell_type == 2 and int(values[idx]) == values[idx]:
values[idx] = int(values[idx])
rn += 1
yield values
def write_sheet(sheet, output, dialect, date_format):
writer = csv.writer(output, dialect=dialect)
for row in xl_rows(sheet, date_format):
writer.writerow(row)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert excel files to csv")
parser.add_argument("-o", "--output",
help="Output file. Standard out will be used if not specified.")
parser.add_argument("-d", "--dateformat",
default="%Y-%m-%d",
help="Format used for dates.")
parser.add_argument("-s", "--sheet",
type=int,
default=0,
help="Sheet you would like to read from.")
parser.add_argument("-a", "--all-sheets",
action="store_true",
help="Output all sheets. Will create a file for each sheet.")
parser.add_argument("--outdialect",
default="excel",
help="Dialect of the output file.")
parser.add_argument("input",
help="Excel file to convert.")
args = parser.parse_args()
if not os.path.exists(args.input):
sys.exit("Input file not found!")
if os.path.getsize(args.input) < 1:
sys.exit("Input file is empty!")
with open(args.input, "rb") as in_fh:
wb = xlrd.open_workbook(file_contents=mmap(in_fh.fileno(), 0, access=ACCESS_READ))
if args.all_sheets:
for sheet in wb.sheets():
with open(csv_name(args.input, sheet.name), "wt") as out_fh:
write_sheet(sheet, out_fh, args.outdialect, args.dateformat)
else:
if wb.nsheets > 1:
warning("There is more than one sheet in this file!")
sheet = wb.sheet_by_index(args.sheet)
if args.output:
with open(args.output, "rb") as out_fh:
write_sheet(sheet, out_fh, args.outdialect, args.dateformat)
else:
write_sheet(sheet, sys.stdout, args.outdialect, args.dateformat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment