Created
April 22, 2015 07:34
multiprocess cassandra
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
from cassandra.cluster import Cluster | |
from multiprocessing import Pool, Queue, Process, Manager | |
from threading import Event | |
from cassandra.cluster import Cluster | |
from cassandra.query import dict_factory | |
from cassandra.cluster import PagedResult | |
start_time = time.time() | |
class PagedResultHandler(object): | |
def __init__(self, future): | |
self.error = None | |
self.finished_event = Event() | |
self.future = future | |
self.future.add_callbacks( | |
callback=self.handle_page, | |
errback=self.handle_error) | |
self.rows = [] | |
def handle_page(self, rows): | |
self.rows += rows | |
if self.future.has_more_pages: | |
self.future.start_fetching_next_page() | |
else: | |
self.finished_event.set() | |
def handle_error(self, exc): | |
self.error = exc | |
print(exc) | |
self.finished_event.set() | |
def get_data(hours, result_queue): | |
id_ = uuid.uuid4() | |
print('get data, id %s, %s' % (id_, hours)) | |
c = Cluster(['127.0.0.1']) | |
s = c.connect('test') | |
s.row_factory = dict_factory | |
prepared = s.prepare(""" | |
SELECT * | |
FROM response | |
WHERE survey_id = 100326883 | |
AND hour_created = ? | |
""") | |
rows = [] | |
handlers = [] | |
for hour in list(hours): | |
future = s.execute_async(prepared, [hour]) | |
handler = PagedResultHandler(future) | |
handlers.append(handler) | |
for h in handlers: | |
h.finished_event.wait() | |
result_queue.put(h.rows) | |
print('worker %s done!=====================' % id_) | |
manager = Manager() | |
work_queue = manager.Queue() | |
result_queue = manager.Queue() | |
hours = range(1, 24) | |
processes = [] | |
process_limit = 4 | |
step_count = 24 // process_limit | |
pool = Pool(processes=process_limit) | |
from functools import partial | |
for i in range(0, process_limit): | |
start = int(i * step_count) | |
end = int(start + step_count) | |
processor_hours = hours[start:end] | |
pool.apply_async(get_data, [processor_hours, result_queue]) | |
pool.close() | |
pool.join() | |
results = [] | |
while True: | |
try: | |
results += result_queue.get(block=False) | |
except: | |
break | |
print("--- %s seconds ---" % (time.time() - start_time)) | |
print(len(results)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment