Skip to content

Instantly share code, notes, and snippets.

@sontek
Created April 22, 2015 07:34
multiprocess cassandra
import time
import uuid
from cassandra.cluster import Cluster
from multiprocessing import Pool, Queue, Process, Manager
from threading import Event
from cassandra.cluster import Cluster
from cassandra.query import dict_factory
from cassandra.cluster import PagedResult
start_time = time.time()
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
self.rows = []
def handle_page(self, rows):
self.rows += rows
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
print(exc)
self.finished_event.set()
def get_data(hours, result_queue):
id_ = uuid.uuid4()
print('get data, id %s, %s' % (id_, hours))
c = Cluster(['127.0.0.1'])
s = c.connect('test')
s.row_factory = dict_factory
prepared = s.prepare("""
SELECT *
FROM response
WHERE survey_id = 100326883
AND hour_created = ?
""")
rows = []
handlers = []
for hour in list(hours):
future = s.execute_async(prepared, [hour])
handler = PagedResultHandler(future)
handlers.append(handler)
for h in handlers:
h.finished_event.wait()
result_queue.put(h.rows)
print('worker %s done!=====================' % id_)
manager = Manager()
work_queue = manager.Queue()
result_queue = manager.Queue()
hours = range(1, 24)
processes = []
process_limit = 4
step_count = 24 // process_limit
pool = Pool(processes=process_limit)
from functools import partial
for i in range(0, process_limit):
start = int(i * step_count)
end = int(start + step_count)
processor_hours = hours[start:end]
pool.apply_async(get_data, [processor_hours, result_queue])
pool.close()
pool.join()
results = []
while True:
try:
results += result_queue.get(block=False)
except:
break
print("--- %s seconds ---" % (time.time() - start_time))
print(len(results))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment