Skip to content

Instantly share code, notes, and snippets.

@andrewxhill
Created June 19, 2013 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewxhill/5816303 to your computer and use it in GitHub Desktop.
Save andrewxhill/5816303 to your computer and use it in GitHub Desktop.
CartoDB CSV Bulkloader for Python
import csv, json
import string
import sys
import urllib
import urllib2, httplib
## CartoDB account name
account_name = "viz2"
## Account API KEY
k = "{YOUR API KEY}"
## Target table
table = 'my_cartodb_table'
## Input CSV
f = "my_input_file.csv"
## Does your CSV have a header row at the top
header = True
## Error filename
e = 'my_sql_errors.sql'
## Order of columns in CSV to be captured. Leave out any that you do not want in your table
## Columns must already exist in 'table' above, include the_geom last even though it isn't in your CSV
## real columns in my CSV are: my_id, latitude, longitude, name, description
o_columns = 'my_id, name, description, the_geom'
## Column types in the same order as listed in o_columns,
## except for the lat, lon columns which are omitted above so that they aren't inserted in addition to the_geom
## Type options are num, str, time (numeric, string, and timestamp respectively)
## Lat, Lon are special types to convert Latitude and Longitude to geometry types on insert
o_types = 'num,lat,lon,str,str'.split(',')
## Unique index, the column in the CSV containing some unique id or variable
## Recommended to place an index on this value before running
## CREATE UNIQUE INDEX idx_YOURTABLENAME_followedby_YOURCOLUMNNAME ON YOURTABLENAME (YOURCOLUMNNAME)
unique_col = 'my_id'
## The column number (start counting at 0) that the unique_col can be found in. Remember to also count columns you are skipping on the insert above
unique_col_index = 7
## How many inserts to send per request
batch_s = 25000
url = "https://%s.cartodb.com/api/v1/sql" % account_name
def shipsingle(value):
b_insert = 'INSERT INTO %s (%s) ' % (table,o_columns)
insert = b_insert + 'VALUES ' + value + ';'
# insert = 'WITH n ('+o_columns+') AS (VALUES ' + value + ') ' + b_insert + '(SELECT * FROM n WHERE n.'+unique_col+' NOT IN (SELECT '+unique_col+' FROM '+table+'))'
params = {
'api_key' : k,
'q' : insert
}
req = urllib2.Request(url, urllib.urlencode(params))
# print req
try:
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
contents = error.read()
print " ERROR"
error_f.write(insert)
error_f.write("\n")
return
def ship(values):
b_insert = 'INSERT INTO %s (%s) ' % (table,o_columns)
insert = b_insert + '(VALUES ' + ','.join(values) + ');'
# insert = 'WITH n ('+o_columns+') AS (VALUES ' + ','.join(values) + ') ' + b_insert + '(SELECT * FROM n WHERE n.'+unique_col+' NOT IN (SELECT '+unique_col+' FROM '+table+'))'
# insert = b_insert + '(VALUES ' + ','.join(values) + ');'
# print insert
params = {
'api_key' : k,
'q' : insert
}
req = urllib2.Request(url, urllib.urlencode(params))
try:
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
print '\nCHECK SINGLES '
for value in values:
shipsingle(value)
return
def vacuum():
params = {
'api_key' : k,
'q' : "vacuum full %s" % table
}
req = urllib2.Request(url, urllib.urlencode(params))
try:
print '\nVACUUM'
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
pass
return
def main():
global url
global k
global table
global f
global e
global o_columns
global o_types
global unique_col
global unique_col_index
global batch_s
global header
params = {
'api_key' : k,
'q' : "SELECT %s FROM %s ORDER BY cartodb_id DESC LIMIT 1" % (unique_col,table)
}
req = urllib2.Request(url, urllib.urlencode(params))
response = urllib2.urlopen(req)
res = response.read()
if len(json.loads(res)['rows']) > 0:
skip = True
last_unique = json.loads(res)['rows'][0][unique_col].decode('utf-8')
# print last_unique
else:
skip = False
error_f = open(e, 'w')
rowreader = csv.reader(open(f, 'rb'), delimiter=',')
values = []
i = 0
for ln, data in enumerate(rowreader):
sys.stdout.write("\r%s" % ln)
sys.stdout.flush()
# print data
if skip:
if data[unique_col_index].decode('utf-8') == last_unique: #do this to skip existing quickly
skip = False
continue
if header:
header = False
continue
# print data
# data = row.split(',')
value = []
j = 0
lat = None
lon = None
for d in data:
if o_types[j] == 'num':
if d == '':
value.append('null::float')
else:
value.append(d)
elif o_types[j] == 'str':
value.append("'"+d+"'")
elif o_types[j] == 'time':
if d == '':
value.append('null::timestamp')
else:
value.append("'"+d+"'::timestamp")
elif o_types[j] == 'lat':
lat = d
elif o_types[j] == 'lon':
lon = d
j+=1
if lat and lon:
value.append('CDB_LatLng('+lat+','+lon+')')
else:
value.append('')
values.append('('+','.join(value)+')')
i+=1
if i == batch_s:
ship(values)
i = 0
values = []
if ln%200000 == 0:
vacuum()
if i > 0:
ship(values)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment