Skip to content

Instantly share code, notes, and snippets.

@meshulam
Forked from maddenpj/gist:5375492
Last active January 4, 2016 17:19
Show Gist options
  • Save meshulam/8652970 to your computer and use it in GitHub Desktop.
Save meshulam/8652970 to your computer and use it in GitHub Desktop.
import datetime
import dateutil.parser
import optparse
from Queue import Queue
import tempodb
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try: func(*args, **kargs)
except Exception, e: print e
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def main():
# This script assumes that the input file is sorted by key
parser = optparse.OptionParser(usage="usage: %prog [options] -i filename", version="%prog 0.1")
parser.add_option("-i", "--input", dest="filename", help="read data from FILENAME")
parser.add_option("-k", "--key", dest="key", help="tempodb database key")
parser.add_option("-s", "--secret", dest="secret", help="tempodb database secret")
parser.add_option("-H", "--host", dest="host", default="api.tempo-db.com", help="tempodb host")
parser.add_option("-P", "--port", dest="port", default=443, help="tempodb port")
parser.add_option("-S", "--secure", action="store_true", dest="secure", default=True, help="tempodb secure")
(options, args) = parser.parse_args()
if not options.filename:
parser.error("Enter a file to read from.")
in_filename = options.filename
source_file = open(in_filename)
client = tempodb.Client(options.key, options.secret, options.host, int(options.port), options.secure)
data = []
count = 0
# Init a Thread pool with the desired number of threads
pool = ThreadPool(3)
for line in source_file:
series_key, timestamp, datum = line.split(',')
# grab 20 lines at a time
if count >= 20:
pool.add_task(client.write_key, series_key, data)
data = []
count = 0
#input_date = dateutil.parser.parse(timestamp)
input_date = datetime.datetime.fromtimestamp(float(timestamp))
data.append(tempodb.DataPoint(input_date, float(datum)))
count += 1
# pick up any scraps
if len(data) > 0:
pool.add_task(client.write_key, series_key, data)
source_file.close()
# Wait for completion
pool.wait_completion()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment