Created
June 19, 2013 17:46
-
-
Save andrewxhill/5816303 to your computer and use it in GitHub Desktop.
CartoDB CSV Bulkloader for Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv, json | |
import string | |
import sys | |
import urllib | |
import urllib2, httplib | |
## CartoDB account name | |
account_name = "viz2" | |
## Account API KEY | |
k = "{YOUR API KEY}" | |
## Target table | |
table = 'my_cartodb_table' | |
## Input CSV | |
f = "my_input_file.csv" | |
## Does your CSV have a header row at the top | |
header = True | |
## Error filename | |
e = 'my_sql_errors.sql' | |
## Order of columns in CSV to be captured. Leave out any that you do not want in your table | |
## Columns must already exist in 'table' above, include the_geom last even though it isn't in your CSV | |
## real columns in my CSV are: my_id, latitude, longitude, name, description | |
o_columns = 'my_id, name, description, the_geom' | |
## Column types in the same order as listed in o_columns, | |
## except for the lat, lon columns which are omitted above so that they aren't inserted in addition to the_geom | |
## Type options are num, str, time (numeric, string, and timestamp respectively) | |
## Lat, Lon are special types to convert Latitude and Longitude to geometry types on insert | |
o_types = 'num,lat,lon,str,str'.split(',') | |
## Unique index, the column in the CSV containing some unique id or variable | |
## Recommended to place an index on this value before running | |
## CREATE UNIQUE INDEX idx_YOURTABLENAME_followedby_YOURCOLUMNNAME ON YOURTABLENAME (YOURCOLUMNNAME) | |
unique_col = 'my_id' | |
## The column number (start counting at 0) that the unique_col can be found in. Remember to also count columns you are skipping on the insert above | |
unique_col_index = 7 | |
## How many inserts to send per request | |
batch_s = 25000 | |
url = "https://%s.cartodb.com/api/v1/sql" % account_name | |
def shipsingle(value): | |
b_insert = 'INSERT INTO %s (%s) ' % (table,o_columns) | |
insert = b_insert + 'VALUES ' + value + ';' | |
# insert = 'WITH n ('+o_columns+') AS (VALUES ' + value + ') ' + b_insert + '(SELECT * FROM n WHERE n.'+unique_col+' NOT IN (SELECT '+unique_col+' FROM '+table+'))' | |
params = { | |
'api_key' : k, | |
'q' : insert | |
} | |
req = urllib2.Request(url, urllib.urlencode(params)) | |
# print req | |
try: | |
response = urllib2.urlopen(req) | |
except urllib2.HTTPError, error: | |
contents = error.read() | |
print " ERROR" | |
error_f.write(insert) | |
error_f.write("\n") | |
return | |
def ship(values): | |
b_insert = 'INSERT INTO %s (%s) ' % (table,o_columns) | |
insert = b_insert + '(VALUES ' + ','.join(values) + ');' | |
# insert = 'WITH n ('+o_columns+') AS (VALUES ' + ','.join(values) + ') ' + b_insert + '(SELECT * FROM n WHERE n.'+unique_col+' NOT IN (SELECT '+unique_col+' FROM '+table+'))' | |
# insert = b_insert + '(VALUES ' + ','.join(values) + ');' | |
# print insert | |
params = { | |
'api_key' : k, | |
'q' : insert | |
} | |
req = urllib2.Request(url, urllib.urlencode(params)) | |
try: | |
response = urllib2.urlopen(req) | |
except urllib2.HTTPError, error: | |
print '\nCHECK SINGLES ' | |
for value in values: | |
shipsingle(value) | |
return | |
def vacuum(): | |
params = { | |
'api_key' : k, | |
'q' : "vacuum full %s" % table | |
} | |
req = urllib2.Request(url, urllib.urlencode(params)) | |
try: | |
print '\nVACUUM' | |
response = urllib2.urlopen(req) | |
except urllib2.HTTPError, error: | |
pass | |
return | |
def main(): | |
global url | |
global k | |
global table | |
global f | |
global e | |
global o_columns | |
global o_types | |
global unique_col | |
global unique_col_index | |
global batch_s | |
global header | |
params = { | |
'api_key' : k, | |
'q' : "SELECT %s FROM %s ORDER BY cartodb_id DESC LIMIT 1" % (unique_col,table) | |
} | |
req = urllib2.Request(url, urllib.urlencode(params)) | |
response = urllib2.urlopen(req) | |
res = response.read() | |
if len(json.loads(res)['rows']) > 0: | |
skip = True | |
last_unique = json.loads(res)['rows'][0][unique_col].decode('utf-8') | |
# print last_unique | |
else: | |
skip = False | |
error_f = open(e, 'w') | |
rowreader = csv.reader(open(f, 'rb'), delimiter=',') | |
values = [] | |
i = 0 | |
for ln, data in enumerate(rowreader): | |
sys.stdout.write("\r%s" % ln) | |
sys.stdout.flush() | |
# print data | |
if skip: | |
if data[unique_col_index].decode('utf-8') == last_unique: #do this to skip existing quickly | |
skip = False | |
continue | |
if header: | |
header = False | |
continue | |
# print data | |
# data = row.split(',') | |
value = [] | |
j = 0 | |
lat = None | |
lon = None | |
for d in data: | |
if o_types[j] == 'num': | |
if d == '': | |
value.append('null::float') | |
else: | |
value.append(d) | |
elif o_types[j] == 'str': | |
value.append("'"+d+"'") | |
elif o_types[j] == 'time': | |
if d == '': | |
value.append('null::timestamp') | |
else: | |
value.append("'"+d+"'::timestamp") | |
elif o_types[j] == 'lat': | |
lat = d | |
elif o_types[j] == 'lon': | |
lon = d | |
j+=1 | |
if lat and lon: | |
value.append('CDB_LatLng('+lat+','+lon+')') | |
else: | |
value.append('') | |
values.append('('+','.join(value)+')') | |
i+=1 | |
if i == batch_s: | |
ship(values) | |
i = 0 | |
values = [] | |
if ln%200000 == 0: | |
vacuum() | |
if i > 0: | |
ship(values) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment