Skip to content

Instantly share code, notes, and snippets.

@sontek
Created April 22, 2015 07:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sontek/542f13307ef9679c0094 to your computer and use it in GitHub Desktop.
Save sontek/542f13307ef9679c0094 to your computer and use it in GitHub Desktop.
multiprocess cassandra
import time
import uuid
from cassandra.cluster import Cluster
from multiprocessing import Pool, Queue, Process, Manager
from threading import Event
from cassandra.cluster import Cluster
from cassandra.query import dict_factory
from cassandra.cluster import PagedResult
start_time = time.time()
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
self.rows = []
def handle_page(self, rows):
self.rows += rows
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
print(exc)
self.finished_event.set()
def get_data(hours, result_queue):
id_ = uuid.uuid4()
print('get data, id %s, %s' % (id_, hours))
c = Cluster(['127.0.0.1'])
s = c.connect('test')
s.row_factory = dict_factory
prepared = s.prepare("""
SELECT *
FROM response
WHERE survey_id = 100326883
AND hour_created = ?
""")
rows = []
handlers = []
for hour in list(hours):
future = s.execute_async(prepared, [hour])
handler = PagedResultHandler(future)
handlers.append(handler)
for h in handlers:
h.finished_event.wait()
result_queue.put(h.rows)
print('worker %s done!=====================' % id_)
manager = Manager()
work_queue = manager.Queue()
result_queue = manager.Queue()
hours = range(1, 24)
processes = []
process_limit = 4
step_count = 24 // process_limit
pool = Pool(processes=process_limit)
from functools import partial
for i in range(0, process_limit):
start = int(i * step_count)
end = int(start + step_count)
processor_hours = hours[start:end]
pool.apply_async(get_data, [processor_hours, result_queue])
pool.close()
pool.join()
results = []
while True:
try:
results += result_queue.get(block=False)
except:
break
print("--- %s seconds ---" % (time.time() - start_time))
print(len(results))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment