Skip to content

Instantly share code, notes, and snippets.

@meshulam
Forked from ajcronk/batch-import.py
Last active August 29, 2015 14:02
Show Gist options
  • Save meshulam/423ac91cde1e9bec6511 to your computer and use it in GitHub Desktop.
Save meshulam/423ac91cde1e9bec6511 to your computer and use it in GitHub Desktop.
import dateutil.parser
import optparse
from Queue import Queue
from tempodb.client import Client
from tempodb.protocol import DataPoint
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] filename", version="%prog 0.1")
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret")
parser.add_option("-U", "--base-url", dest="url", default="https://api.tempo-db.com/v1/", help="tempodb base url")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
in_filename = options.filename
source_file = open(in_filename)
client = Client(options.key, options.key, options.secret, base_url=options.url)
temperature_key = "thermostat.1.temperature"
solar_radiation_key = "thermostat.1.solar_radiation"
humidity_key = "thermostat.1.humidity"
temperature_data = []
solar_radiation_data = []
humidity_data = []
count = 0
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
for line in source_file:
timestamp, temperature, solar_radiation, humidity = line.split(',')
# grab 20 lines at a time
if count >= 20:
pool.add_task(client.write_data, key=temperature_key, data=temperature_data)
pool.add_task(client.write_data, key=solar_radiation_key, data=solar_radiation_data)
pool.add_task(client.write_data, key=humidity_key, data=humidity_data)
temperature_data = []
solar_radiation_id = []
humidity_id = []
count = 0
input_date = dateutil.parser.parse(timestamp)
temperature_data.append(DataPoint.from_data(input_date, float(temperature)))
solar_radiation_data.append(DataPoint.from_data(input_date, float(solar_radiation)))
humidity_data.append(DataPoint.from_data(input_date, float(humidity)))
count += 1
# pick up any scraps
if len(temperature_data) > 0:
pool.add_task(client.write_data, temperature_key, temperature_data)
pool.add_task(client.write_data, solar_radiation_key, solar_radiation_data)
pool.add_task(client.write_data, humidity_key, humidity_data)
source_file.close()
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
main()
2012-04-10T19:43:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:44:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:45:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:46:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:47:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:48:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:49:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:50:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:51:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:52:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:53:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:54:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:55:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T19:56:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T19:57:17.000+0600, 49.133, 940.7, 24.45
2012-04-10T19:58:17.000+0600, 50.021, 923.3, 24.87
2012-04-10T19:59:17.000+0600, 52.113, 950.3, 25.23
2012-04-10T20:00:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:01:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:02:17.000+0600, 47.234, 923.8, 25.01
2012-04-10T20:03:17.000+0600, 47.234, 923.8, 25.01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment