Created
May 1, 2015 22:44
-
-
Save sontek/0f65e71401fadd1293ea to your computer and use it in GitHub Desktop.
multiprocessing cassandra v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
from functools import partial | |
from cassandra.cluster import Cluster | |
from multiprocessing import Pool, Queue, Process, Manager | |
from threading import Event | |
from cassandra.query import dict_factory, tuple_factory | |
from cassandra.cluster import PagedResult | |
from cassandra.io.libevreactor import LibevConnection | |
start_time = time.time() | |
nodes = ['10.128.36.179', '10.128.36.180', '10.128.36.181', '10.128.36.145'] | |
class PagedResultHandler(object): | |
def __init__(self, future, hour): | |
self.error = None | |
self.finished_event = Event() | |
self.future = future | |
self.future.add_callbacks( | |
callback=self.handle_page, | |
errback=self.handle_error) | |
self.rows = [] | |
self.hour = hour | |
def handle_page(self, rows): | |
self.rows += rows | |
if self.future.has_more_pages: | |
self.future.start_fetching_next_page() | |
else: | |
self.finished_event.set() | |
def handle_error(self, exc): | |
self.error = exc | |
print(exc) | |
self.finished_event.set() | |
def get_data(hours, result_queue): | |
id_ = uuid.uuid4() | |
c = Cluster(nodes, protocol_version=3, compression='lz4') | |
c.connection_class = LibevConnection | |
s = c.connect('surveymonkey') | |
s.row_factory = tuple_factory | |
prepared = s.prepare(""" | |
SELECT * | |
FROM hour_response | |
WHERE survey_id = ? | |
AND hour_created = ? | |
""") | |
rows = [] | |
handlers = [] | |
survey_id = 100326883 | |
for hour in list(hours): | |
future = s.execute_async(prepared, [survey_id, hour]) | |
handler = PagedResultHandler(future, hour) | |
handlers.append(handler) | |
for h in handlers: | |
h.finished_event.wait() | |
result_queue.put((h.hour, len(h.rows))) | |
if __name__ == '__main__': | |
manager = Manager() | |
work_queue = manager.Queue() | |
result_queue = manager.Queue() | |
hours = range(1, 24) | |
processes = [] | |
process_limit = 4 | |
step_count = 24 // process_limit | |
def run(): | |
pool = Pool(processes=process_limit) | |
for i in range(0, process_limit): | |
start = int(i * step_count) | |
end = int(start + step_count) | |
processor_hours = hours[start:end] | |
pool.apply_async(get_data, [processor_hours, result_queue]) | |
pool.close() | |
pool.join() | |
print("--- %s seconds ---" % (time.time() - start_time)) | |
results = [] | |
while True: | |
try: | |
results.append(result_queue.get(block=False)) | |
except: | |
break | |
from pprint import pprint | |
pprint(sorted(results, key=lambda x: x[0])) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment