Created
January 24, 2014 12:24
-
-
Save Perlence/8596338 to your computer and use it in GitHub Desktop.
Convert CSV file to SQLite table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import re | |
import csv | |
import sqlite3 | |
from contextlib import closing | |
TYPES = [ | |
('INTEGER', re.compile('^[-+]?(\d+,)*\d+$')), | |
('REAL', re.compile('^[-+]?((\d+,)*\d+)?\.?[0-9]+([eE][-+]?[0-9]+)?$')), | |
('TEXT', re.compile('^.*$')) | |
] | |
def load(csvfile): | |
with open(csvfile) as fp: | |
return list(csv.reader(fp)) | |
def analyze(rows): | |
header = rows.pop(0) | |
columns = zip(*rows) | |
for name, column in zip(header, columns): | |
name = name.strip().replace(' ', '_').replace('-', '_') | |
yield name, max(map(analyze_value, column))[1] | |
def analyze_value(value): | |
for priority, (type_, regexp) in enumerate(TYPES): | |
if regexp.match(value) is not None: | |
return priority, type_ | |
def cast(rows, types): | |
for row in rows: | |
typed_row = [] | |
for value, type_ in zip(row, types): | |
if type_ == 'INTEGER': | |
value = int(value) | |
if type_ == 'REAL': | |
value = float(value) | |
if type_ == 'TEXT': | |
value = str(value) | |
typed_row.append(value) | |
yield tuple(typed_row) | |
def create_table(cursor, table, column_types): | |
try: | |
create = 'CREATE TABLE {} ({})' | |
create = create.format(table, | |
', '.join(' '.join(type_) | |
for type_ in column_types)) | |
cursor.execute(create) | |
except sqlite3.OperationalError as e: | |
if not 'already exists' in e.message: | |
raise | |
def insert(cursor, table, column_types, values): | |
insert = 'INSERT INTO {}({}) VALUES ({})' | |
insert = insert.format(table, | |
', '.join(name for name, __ in column_types), | |
', '.join('?' * len(column_types))) | |
cursor.executemany(insert, values) | |
def main(csvfiles, dbfile, table): | |
with closing(sqlite3.connect(dbfile)) as conn: | |
cursor = conn.cursor() | |
for csvfile in csvfiles: | |
csvdata = load(csvfile) | |
column_types = tuple(analyze(csvdata)) | |
typed = cast(csvdata, column_types) | |
create_table(cursor, table, column_types) | |
insert(cursor, table, column_types, typed) | |
conn.commit() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('csvfiles', metavar='CSV', nargs='+') | |
parser.add_argument('dbfile', metavar='DB') | |
parser.add_argument('table', metavar='TABLE') | |
args = parser.parse_args() | |
kwargs = dict(args._get_kwargs()) | |
main(**kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment