Skip to content

Instantly share code, notes, and snippets.

@Perlence
Created January 24, 2014 12:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Perlence/8596338 to your computer and use it in GitHub Desktop.
Save Perlence/8596338 to your computer and use it in GitHub Desktop.
Convert CSV file to SQLite table
#!/usr/bin/python
import re
import csv
import sqlite3
from contextlib import closing
TYPES = [
('INTEGER', re.compile('^[-+]?(\d+,)*\d+$')),
('REAL', re.compile('^[-+]?((\d+,)*\d+)?\.?[0-9]+([eE][-+]?[0-9]+)?$')),
('TEXT', re.compile('^.*$'))
]
def load(csvfile):
with open(csvfile) as fp:
return list(csv.reader(fp))
def analyze(rows):
header = rows.pop(0)
columns = zip(*rows)
for name, column in zip(header, columns):
name = name.strip().replace(' ', '_').replace('-', '_')
yield name, max(map(analyze_value, column))[1]
def analyze_value(value):
for priority, (type_, regexp) in enumerate(TYPES):
if regexp.match(value) is not None:
return priority, type_
def cast(rows, types):
for row in rows:
typed_row = []
for value, type_ in zip(row, types):
if type_ == 'INTEGER':
value = int(value)
if type_ == 'REAL':
value = float(value)
if type_ == 'TEXT':
value = str(value)
typed_row.append(value)
yield tuple(typed_row)
def create_table(cursor, table, column_types):
try:
create = 'CREATE TABLE {} ({})'
create = create.format(table,
', '.join(' '.join(type_)
for type_ in column_types))
cursor.execute(create)
except sqlite3.OperationalError as e:
if not 'already exists' in e.message:
raise
def insert(cursor, table, column_types, values):
insert = 'INSERT INTO {}({}) VALUES ({})'
insert = insert.format(table,
', '.join(name for name, __ in column_types),
', '.join('?' * len(column_types)))
cursor.executemany(insert, values)
def main(csvfiles, dbfile, table):
with closing(sqlite3.connect(dbfile)) as conn:
cursor = conn.cursor()
for csvfile in csvfiles:
csvdata = load(csvfile)
column_types = tuple(analyze(csvdata))
typed = cast(csvdata, column_types)
create_table(cursor, table, column_types)
insert(cursor, table, column_types, typed)
conn.commit()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('csvfiles', metavar='CSV', nargs='+')
parser.add_argument('dbfile', metavar='DB')
parser.add_argument('table', metavar='TABLE')
args = parser.parse_args()
kwargs = dict(args._get_kwargs())
main(**kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment