Skip to content

Instantly share code, notes, and snippets.

@iandanforth
Forked from ajcronk/batch-import.py
Created November 29, 2012 21:40
Show Gist options
  • Save iandanforth/4172099 to your computer and use it in GitHub Desktop.
Save iandanforth/4172099 to your computer and use it in GitHub Desktop.
Python CSV batch import example
import dateutil.parser
import optparse
import csv
import tempodb
from threading import Thread
from Queue import Queue
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] filename",
version="%prog 0.1")
parser.add_option("-i", "--input",
dest="filename",
help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret",
dest="secret",
help="tempodb database secret")
parser.add_option("-H", "--host",
dest="host",
default="api.tempo-db.com",
help="tempodb host")
parser.add_option("-P", "--port",
dest="port",
default=443,
help="tempodb port")
parser.add_option("-S",
"--secure",
action="store_true",
dest="secure",
default=True,
help="tempodb secure")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
##########################################################################
# Create a connection to TempoDB
client = tempodb.Client(options.key,
options.secret,
options.host,
int(options.port),
options.secure)
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
# Define how many records to send in each batch
batch_size = 60
# Begin parsing and enqueing data
in_filename = options.filename
with open(in_filename) as source_file:
reader = csv.reader(source_file)
# Discover how many series exist in this file
line_one = reader.next()
column_count = len(line_one)
# Reset to the beginning of the file for futher reading
source_file.seek(0)
series = {}
for index in range(column_count):
series['series.' + str(index)] = []
# Process each line
for line in reader:
for i, value in enumerate(line):
if i == 0:
# Pull out and store the timestamp for this line and move on
input_date = dateutil.parser.parse(value)
continue
else:
# Regenerate our series name
series_name = 'series.'+ str(i)
if len(series[series_name]) >= batch_size:
# Send to TempoDB when we have some lines and flush the buffer
pool.add_task(client.write_key, series_name, series[series_name])
series[series_name] = []
else:
# Add this new value to the relevant series
series[series_name].append(tempodb.DataPoint(input_date, float(value)))
# Pick up any scraps
for series_name, data in series.iteritems():
if len(data) > 0:
pool.add_task(client.write_key, series_name, data)
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
main()
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment