Skip to content

Instantly share code, notes, and snippets.

@leonsas
Forked from ajcronk/batch-import.py
Last active December 20, 2015 03:29
Show Gist options
  • Save leonsas/6063955 to your computer and use it in GitHub Desktop.
Save leonsas/6063955 to your computer and use it in GitHub Desktop.
import dateutil.parser
import optparse
from Queue import Queue
import tempodb
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1")
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret")
parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host")
parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port")
parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
in_filename = options.filename
source_file = open(in_filename)
client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure)
temperature_key = "thermostat.1.temperature"
solar_radiation_key = "thermostat.1.solar_radiation"
humidity_key = "thermostat.1.humidity"
temperature_data = []
solar_radiation_data = []
humidity_data = []
count = 0
for line in source_file:
timestamp, temperature, solar_radiation, humidity = line.split(',')
# grab 20 lines at a time
if count >= 20:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
temperature_data = []
solar_radiation_id = []
humidity_id = []
count = 0
input_date = dateutil.parser.parse(timestamp)
temperature_data.append(tempodb.DataPoint(input_date, float(temperature)))
solar_radiation_data.append(tempodb.DataPoint(input_date, float(solar_radiation)))
humidity_data.append(tempodb.DataPoint(input_date, float(humidity)))
count += 1
# pick up any scraps
if len(temperature_data) > 0:
pool.add_task(client.write_key, temperature_key, temperature_data)
pool.add_task(client.write_key, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_key, humidity_key, humidity_data)
source_file.close()
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
main()
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment