Skip to content

Instantly share code, notes, and snippets.

@sontek
Created May 1, 2015 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sontek/0f65e71401fadd1293ea to your computer and use it in GitHub Desktop.
Save sontek/0f65e71401fadd1293ea to your computer and use it in GitHub Desktop.
multiprocessing cassandra v2
import time
import uuid
from functools import partial
from cassandra.cluster import Cluster
from multiprocessing import Pool, Queue, Process, Manager
from threading import Event
from cassandra.query import dict_factory, tuple_factory
from cassandra.cluster import PagedResult
from cassandra.io.libevreactor import LibevConnection
start_time = time.time()
nodes = ['10.128.36.179', '10.128.36.180', '10.128.36.181', '10.128.36.145']
class PagedResultHandler(object):
def __init__(self, future, hour):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
self.rows = []
self.hour = hour
def handle_page(self, rows):
self.rows += rows
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
print(exc)
self.finished_event.set()
def get_data(hours, result_queue):
id_ = uuid.uuid4()
c = Cluster(nodes, protocol_version=3, compression='lz4')
c.connection_class = LibevConnection
s = c.connect('surveymonkey')
s.row_factory = tuple_factory
prepared = s.prepare("""
SELECT *
FROM hour_response
WHERE survey_id = ?
AND hour_created = ?
""")
rows = []
handlers = []
survey_id = 100326883
for hour in list(hours):
future = s.execute_async(prepared, [survey_id, hour])
handler = PagedResultHandler(future, hour)
handlers.append(handler)
for h in handlers:
h.finished_event.wait()
result_queue.put((h.hour, len(h.rows)))
if __name__ == '__main__':
manager = Manager()
work_queue = manager.Queue()
result_queue = manager.Queue()
hours = range(1, 24)
processes = []
process_limit = 4
step_count = 24 // process_limit
def run():
pool = Pool(processes=process_limit)
for i in range(0, process_limit):
start = int(i * step_count)
end = int(start + step_count)
processor_hours = hours[start:end]
pool.apply_async(get_data, [processor_hours, result_queue])
pool.close()
pool.join()
print("--- %s seconds ---" % (time.time() - start_time))
results = []
while True:
try:
results.append(result_queue.get(block=False))
except:
break
from pprint import pprint
pprint(sorted(results, key=lambda x: x[0]))
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment