Skip to content

Instantly share code, notes, and snippets.

@konnov
Last active November 11, 2022 16:25
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save konnov/5774100 to your computer and use it in GitHub Desktop.
Save konnov/5774100 to your computer and use it in GitHub Desktop.
Simple CSV tools that help in filtering CSV data and creating plots
__all__ = ['util', 'csvgrep']
#!/usr/bin/python
#
# Tranlate a CSV table (coming with a header) to a SQLite table.
#
# Igor Konnov, 2016
import argparse
import csv
import getopt
import re
import sqlite3
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] dbfile table_name <csv" % sys.argv[0]
print ""
sys.exit(1)
def export(sniff, dbfilename, table_name, infile):
def escape(s):
if s.find(",") != -1:
return '"%s"' % s
else:
return s
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
conn = sqlite3.connect(dbfilename)
cursor = conn.cursor()
fields = None
lineno = 0
errno = 0
try:
for row_arr in reader:
lineno += 1
if not fields:
fields = row_arr # the first row is the header
cols = ['`' + f + '`' + ' text' for f in fields]
cursor.execute("CREATE TABLE %s (%s)" \
% (table_name, ",".join(cols)))
else:
placeholders = len(fields) * ["?"]
cursor.execute("INSERT INTO %s VALUES (%s)" \
% (table_name, ",".join(placeholders)), row_arr)
conn.commit()
except Exception, e:
conn.rollback()
raise e
finally:
conn.close()
if errno > 0:
print >>sys.stderr, ("%d errors occured" % errno)
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
if len(args) < 2:
use()
export(sniff, args[0], args[1], sys.stdin)
#!/usr/bin/python
#
# A filter on CSV that is similar to SELECT name1, .., name_k FROM CSV in SQL.
#
# Igor Konnov, 2013-2014
import argparse
import csv
import getopt
import re
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] field1 ... field_k <csv" % sys.argv[0]
print ""
sys.exit(1)
def parse(args):
if len(args) < 1:
use()
else:
return args
def cut_csv(sniff, target_fields, infile):
def escape(s):
if s.find(",") != -1:
return '"%s"' % s
else:
return s
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
fields = None
lineno=0
errno=0
for row_arr in reader:
lineno += 1
if not fields:
fields = row_arr # the first row is the header
for f in target_fields:
if f not in fields:
raise DataError("Column %s not found" % f)
print ",".join(target_fields)
else:
row = {}
for i, k in enumerate(fields):
try:
row[k] = row_arr[i]
except IndexError, _:
errno += 1
row[k] = ""
if errno <= 10:
print >>sys.stderr, \
("ERROR: column %d not found on line %d" % (i, lineno))
print ",".join([escape(row[f]) for f in target_fields])
if errno > 0:
print >>sys.stderr, ("%d errors occured" % errno)
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
fields = parse(args)
cut_csv(sniff, fields, sys.stdin)
#!/usr/bin/python
#
# A filter on CSV that is similar to SELECT * FROM CSV WHERE... in SQL.
#
# For instance, csvgrep "param~=.*F=3.*" -a "foo=bar" <my.csv
#
# Igor Konnov, 2013
import argparse
import csv
import getopt
import re
from sys import stdout
import sys
from util import *
class ArgError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
EQ = "EQ"
RE = "RE"
AND = "AND"
OR = "OR"
NOT = "NOT"
class Expr:
def __init__(self, kind, ops):
self.kind = kind
self.ops = ops
# XXX: a simple recursive parser, I don't have time to think about it
class Parser:
def __init__(self, args):
self.args = args
self.buf = args
def parse(self):
expr = self.parse_and()
if not self.is_eof():
raise ArgError("Expected end, found: " + " ".join(self.buf))
return expr
def parse_and(self):
ops = [self.parse_or()]
while self.top() == "-a":
self.consume("-a")
ops.append(self.parse_or())
if len(ops) > 1:
return Expr(AND, ops)
else:
return ops[0]
def parse_or(self):
ops = [self.parse_not()]
while self.top() == "-o":
self.consume("-o")
ops.append(self.parse_not())
if len(ops) > 1:
return Expr(OR, ops)
else:
return ops[0]
def parse_not(self):
neg = False
while self.top() == "!":
self.consume("!")
neg = not neg
expr = self.parse_eq_re()
if neg:
return Expr(NOT, [expr])
else:
return expr
def parse_eq_re(self):
tok = self.get()
if tok.find("~=") != -1:
i = tok.find("~=")
k, r = tok[: i], re.compile(tok[i + 2 :])
return Expr(RE, [k, r])
elif tok.find('=') != -1:
i = tok.find('=')
k, v = tok[: i], tok[i + 1 :]
return Expr(EQ, [k, v])
else:
raise ArgError("Expected k=v or k~=re, found %s" % tok)
def is_eof(self):
return self.buf == []
def top(self):
if self.buf == []:
return None
else:
return self.buf[0]
def get(self):
if self.buf == []:
raise ArgError("No arguments")
tok = self.buf[0]
self.buf = self.buf[1:]
return tok
def put(self, tok):
self.buf = [tok] + self.buf
def consume(self, expected):
if self.buf[0] <> expected:
raise ArgError("Expected %s, found %s" % (expected, self.buf[0]))
else:
self.buf = self.buf[1:]
def use():
print "Use: %s [--sniff-dialect] [!] k1=v1 -a k2~=v2 -o k3=v3 <csv" % sys.argv[0]
print ""
sys.exit(1)
def parse(args):
if len(args) < 1:
use()
try:
expr = Parser(args).parse()
except ArgError, e:
print "Error: " + str(e)
use()
return expr
def row_matches(expr, row):
if expr.kind == AND:
for op in expr.ops:
if not row_matches(op, row):
return False
return True
elif expr.kind == OR:
for op in expr.ops:
if row_matches(op, row):
return True
return False
elif expr.kind == NOT:
return not row_matches(expr.ops[0], row)
elif expr.kind == EQ:
name, val = expr.ops
try:
return row[name] == val
except KeyError:
raise DataError("Column %s not found" % name)
elif expr.kind == RE:
name, rexp = expr.ops
try:
return rexp.match(row[name])
except KeyError:
raise DataError("Column %s not found" % name)
def filter_csv(sniff, expr, infile):
def escape(s):
if s.find(",") != -1:
return '"%s"' % s
else:
return s
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
fields = None
line = 1
for row_arr in reader:
if not fields:
fields = row_arr # the first row is the header
print ",".join(fields)
else:
row = {}
for i, k in enumerate(fields):
row[k] = row_arr[i]
try:
if row_matches(expr, row):
print ",".join([escape(r) for r in row_arr])
except DataError, e:
print >>sys.stderr, "line %d: %s" % (line, str(e))
line += 1
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
expr = parse(args)
filter_csv(sniff, expr, sys.stdin)
#!/usr/bin/python
#
# Get two files on input: original and mask.
# Every non-empty cell in the mask overwrites the cell in the original.
#
# This can be used, when one wants to replace cells with values
# 'timeout', 'out of memory'.
#
# Igor Konnov, 2014
import argparse
import csv
import getopt
import re
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] original.csv mask.csv" % sys.argv[0]
print ""
sys.exit(1)
def parse(args):
if len(args) < 2:
use()
else:
return args
def merge_csv(sniff, inp, mask):
inpf = open(inp, 'r')
if sniff:
inpr = SniffingCsvReader(inpf)
else:
inpr = csv.reader(inpf)
maskf = open(mask, 'r')
if sniff:
maskr = SniffingCsvReader(maskf)
else:
maskr = csv.reader(maskf)
writer = csv.writer(sys.stdout, dialect = inpr.dialect)
lineno = 0
for row in inpr:
try:
mask_row = maskr.next()
except StopIteration:
mask_row = []
out_row = row
mlen = len(mask_row)
for i, r in enumerate(row):
if lineno != 0 and i < mlen:
if mask_row[i].strip() != "":
out_row[i] = mask_row[i]
writer.writerow(row)
lineno += 1
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
orig, mask = parse(args)
merge_csv(sniff, orig, mask)
#!/usr/bin/python
#
# Join CSV files similar to UNIX paste. If one file is longer, then the other
# file is padded.
#
# Igor Konnov, 2014
import argparse
import getopt
import csv
import re
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] file1.csv prefix1 file2.csv prefix2 ..." % sys.argv[0]
print ""
sys.exit(1)
def parse(args):
if len(args) % 2 != 0:
use()
else:
return (args[0::2], args[1::2])
def merge_csv(sniff, files, prefixes):
def escape(s):
if s.find(",") != -1:
return '"%s"' % s
else:
return s
readers = []
widths = []
max_line = 0
dialect = None
for f in files:
infile = open(f, 'r')
if sniff:
r = SniffingCsvReader(infile)
else:
r = csv.reader(infile)
dialect = r.dialect
readers.append(r)
max_line = max(r.line_num, max_line)
widths.append(0)
writer = csv.writer(sys.stdout, dialect = dialect)
has_more = True
lineno = 0
while has_more:
has_more = False
row = []
for i, r in enumerate(readers):
try:
row_arr = r.next()
if lineno == 0:
row_arr = [ prefixes[i] + ":" + r for r in row_arr ]
has_more = True
row += row_arr
widths[i] = len(row_arr)
except StopIteration:
row += [''] * widths[i]
except IOError:
row += [''] * widths[i]
if has_more:
writer.writerow(row)
lineno += 1
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
files, prefixes = parse(args)
merge_csv(sniff, files, prefixes)
#!/usr/bin/python
#
# Replace the expressions of the form (column@expr) with the (only) value
# matching the query: SELECT column FROM <table_name> WHERE expr.
#
# Igor Konnov, 2016
import argparse
import csv
import getopt
import re
import sqlite3
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] dbfile table_name <csv" % sys.argv[0]
print ""
sys.exit(1)
def sub_special(cursor, table_name, text):
before = 0
pieces = []
for m in re.finditer(r"\$\((.*?)@(.*?)\)", text):
if before < m.start():
pieces.append(text[before : m.start()])
cursor.execute('SELECT %s FROM %s WHERE %s'\
% (m.group(1).strip(), table_name, m.group(2).strip()))
value = cursor.fetchone()
if value:
if cursor.fetchone():
value = '>1 results!'
else:
value = value[0] # a one-element tuple
else:
value = 'no result!'
pieces.append(str(value))
before = m.end() + 1
pieces.append(text[before:])
return "".join(pieces)
def sub_general(cursor, text):
before = 0
pieces = []
for m in re.finditer(r"\[(.*?)\]", text):
if before < m.start():
pieces.append(text[before : m.start()])
cursor.execute(m.group(1))
value = cursor.fetchone()
if value:
if cursor.fetchone():
value = '>1 results!'
else:
value = value[0] # a one-element tuple
else:
value = 'no result!'
pieces.append(str(value))
before = m.end() + 1
pieces.append(text[before:])
return "".join(pieces)
def export(sniff, dbfilename, table_name, infile):
def escape(s):
if s.find(",") != -1:
return '"%s"' % s
else:
return s
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
conn = sqlite3.connect(dbfilename)
cursor = conn.cursor()
lineno = 0
errno = 0
committed = False
try:
for row_arr in reader:
lineno += 1
n = len(row_arr)
for i, v in enumerate(row_arr):
newv = sub_general(cursor, sub_special(cursor, table_name, v))
sys.stdout.write(newv)
if i < n - 1:
sys.stdout.write(",")
sys.stdout.write("\n")
conn.commit()
committed = True
finally:
if not committed:
conn.rollback()
conn.close()
if errno > 0:
print >>sys.stderr, ("%d errors occured" % errno)
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
if len(args) < 2:
use()
export(sniff, args[0], args[1], sys.stdin)
#!/usr/bin/python
#
# Replace the value of a column, optionaly, using a regular expression.
# In case, the regular expression on the whole line is given,
# only the lines matching the expression are to be replaced.
#
# For instance, csvsub param 'N=([0-9]+);.*' '\1'
#
# Igor Konnov, 2013-2014
import argparse
import csv
import getopt
import re
from sys import stdout
import sys
from util import *
class DataError(BaseException):
def __init__(self, msg):
BaseException.__init__(self, msg)
def use():
print "Use: %s [--sniff-dialect] field field_regex replacement [line_regex] <csv" % sys.argv[0]
print ""
sys.exit(1)
def parse(args):
if len(args) < 3:
use()
field, regex, subst = args[0:3]
if len(args) > 3:
line_regex = args[3]
else:
line_regex = ""
return field, regex, subst, line_regex
def sub(sniff, field, regex, subst, line_regex, infile):
def sub_field(f, val):
if f == field:
return re.sub(regex, subst, val)
else:
return val
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
writer = csv.writer(sys.stdout, dialect = reader.dialect, escapechar = '"')
fields = None
if line_regex != "":
lre = re.compile(line_regex)
else:
lre = None
for row_arr in reader:
out_row = row_arr
if not fields:
fields = row_arr # the first row is the header
else:
needs_replace = True
if lre:
if not lre.match(",".join(row_arr)):
needs_replace = False
if needs_replace:
for i, f in enumerate(fields):
out_row[i] = sub_field(f, row_arr[i])
writer.writerow(out_row)
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
field, regex, subst, line_regex = parse(args)
sub(sniff, field, regex, subst, line_regex, sys.stdin)
#!/usr/bin/python
#
# Replace the value of a column using python 'eval'.
# The current value of the column is passed into variable '_1'.
# The value of a column 'x' can be acccessed as r['x'].
# The script imports all functions from math, so you can use
# them with _1.
#
# Warning: this might be extremely insecure, as one can write
# virtually anything in 'eval'. Don't give root permissions to
# this script.
#
# Example: csvsubeval memory '([0-9]+)' '_1 / 1024'
#
# Igor Konnov, 2014
import argparse
import csv
import getopt
import io
import math
import re
from sys import stdout
import sys
from util import *
def use():
print("Use: {} [--sniff-dialect] [-i|--include-empty] field 'expr(_1)' <csv".format(sys.argv[0]))
print("")
print(" You can refer to the current version of the column as _1,")
print(" or to a value of a column x as r[x].")
print(" Note that empty fields are ignored by default.")
print("")
print(" When -i or --include-empty is set, empty fields are not ignored")
print("")
sys.exit(1)
def parse():
try:
opts, args = getopt.getopt(sys.argv[1:], "hi",
["help", "include-empty", "sniff-dialect"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
use()
sys.exit(2)
include_empty = False
sniff = False
for o, a in opts:
if o in ("-h", "--help"):
use()
sys.exit()
elif o in ("-i", "--include-empty"):
include_empty = True
elif o in ("--sniff-dialect"):
sniff = True
else:
assert False, "unexpected option"
if len(args) < 2:
use()
return args[0], args[1], include_empty, sniff
def sub(sniff, field, replacement, include_empty, infile):
def sub_field(f, val, row):
if f == field and (include_empty or val.strip() != ''):
try:
loc = { "_1": val, "r": row }
glob = {}
for k, v in __builtins__.__dict__.iteritems():
glob[k] = v
for k, v in math.__dict__.iteritems():
glob[k] = v
return eval(replacement, glob, loc)
except ValueError as e:
print >>sys.stderr, "Error on %s=%s: %s" % (f, val, str(e))
return val
else:
return val
def escape(s):
return s.replace('\\', '\\\\')
if sniff:
reader = SniffingCsvReader(infile)
else:
reader = csv.reader(infile)
writer = csv.writer(sys.stdout, dialect = reader.dialect, escapechar = '"')
fields = None
for row_arr in reader:
out_row = row_arr
if not fields:
fields = row_arr # the first row is the header
else:
row = {f: '"%s"' % escape(row_arr[i]) for i, f in enumerate(fields)}
for i, f in enumerate(fields):
out_row[i] = sub_field(f, row_arr[i], row)
writer.writerow(out_row)
if __name__ == "__main__":
field, replacement, include_empty, sniff = parse()
sub(sniff, field, replacement, include_empty, sys.stdin)
import csv
import io
import sys
class SniffingCsvReader:
def __init__(self, infile):
rawfile = io.open(infile.fileno(), mode='rb', buffering = 8192)
bufreader = io.BufferedReader(rawfile, buffer_size = 8192)
try:
self.dialect = csv.Sniffer().sniff(bufreader.peek(8192))
except csv.Error as e:
print >>sys.stderr, "CSV dialect unknown: " + e.message
print >>sys.stderr, "Falling back to the default (excel)"
self.dialect = csv.get_dialect('excel')
self._reader = csv.reader(bufreader, dialect = self.dialect)
def __iter__(self):
return self
def next(self):
return self._reader.next()
def line_num(self):
return self._reader.line_num
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment