public
Last active

UPDATED VERSION NOW AT https://github.com/rgrp/csv2sqlite [Script to load CSV to SQLite]

  • Download Gist
csv2sqlite.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
#!/usr/bin/env python
# A simple Python script to convert csv files to sqlite (with type guessing)
#
# @author: Rufus Pollock
# Placed in the Public Domain
import csv
import sqlite3
 
def convert(filepath_or_fileobj, dbpath, table='data'):
if isinstance(filepath_or_fileobj, basestring):
fo = open(filepath_or_fileobj)
else:
fo = filepath_or_fileobj
reader = csv.reader(fo)
 
types = _guess_types(fo)
fo.seek(0)
headers = reader.next()
 
_columns = ','.join(
['"%s" %s' % (header, _type) for (header,_type) in zip(headers, types)]
)
 
conn = sqlite3.connect(dbpath)
c = conn.cursor()
c.execute('CREATE table %s (%s)' % (table, _columns))
 
_insert_tmpl = 'insert into %s values (%s)' % (table,
','.join(['?']*len(headers)))
for row in reader:
# we need to take out commas from int and floats for sqlite to
# recognize them properly ...
row = [ x.replace(',', '') if y in ['real', 'integer'] else x
for (x,y) in zip(row, types) ]
c.execute(_insert_tmpl, row)
 
conn.commit()
c.close()
 
def _guess_types(fileobj, max_sample_size=100):
'''Guess column types (as for SQLite) of CSV.
 
:param fileobj: read-only file object for a CSV file.
'''
reader = csv.reader(fileobj)
# skip header
_headers = reader.next()
# we default to text for each field
types = ['text'] * len(_headers)
# order matters
# (order in form of type you want used in case of tie to be last)
options = [
('text', unicode),
('real', float),
('integer', int)
# 'date',
]
# for each column a set of bins for each type counting successful casts
perresult = {
'integer': 0,
'real': 0,
'text': 0
}
results = [ dict(perresult) for x in range(len(_headers)) ]
for count,row in enumerate(reader):
for idx,cell in enumerate(row):
cell = cell.strip()
# replace ',' with '' to improve cast accuracy for ints and floats
cell = cell.replace(',', '')
for key,cast in options:
try:
# for null cells we can assume success
if cell:
cast(cell)
results[idx][key] = (results[idx][key]*count + 1) / float(count+1)
except (ValueError), inst:
pass
if count >= max_sample_size:
break
for idx,colresult in enumerate(results):
for _type, dontcare in options:
if colresult[_type] == 1.0:
types[idx] = _type
return types
 
 
if __name__ == '__main__':
import sys
if len(sys.argv) < 3:
print('''csv2sqlite.py {csv-file-path} {sqlite-db-path} [{table-name}]
 
Convert a csv file to a table in an sqlite database (which need not yet exist).
 
* table-name is optional and defaults to 'data'
''')
sys.exit(1)
convert(*sys.argv[1:])
test.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
from csv2sqlite import convert
 
def test():
'''Simple test case'''
import StringIO
import os
fileobj = StringIO.StringIO(
'''heading_1,heading_2,heading_3
abc,1,1.0
xyz,2,2.0
efg,3,3.0'''
)
dbpath = '/tmp/csv2sqlite-test-data.db'
if os.path.exists(dbpath):
os.remove(dbpath)
table = 'data'
convert(fileobj, dbpath, table)
conn = sqlite3.connect(dbpath)
c = conn.cursor()
c.execute('select count(*) from %s' % table);
row = c.next()
assert row[0] == 3, row

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.