Last active
October 3, 2016 22:31
-
-
Save bhuvanrawal/93c5ae6cdd020de47e0981d36d2c0785 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def multiprocess_producer_paginated(statement, queue, keyspace, num_producers, is_completed_producer, email_dictionary, logger_name=''): | |
""" | |
Statement should be in the format Select x,y,z from sometable where token(x) >= '%d' AND token(k) < '%d';" | |
Token range will be split into num producers and concurrent read will happen | |
:param statement: of the form - "SELECT col1,col2... from my_table where Token(x,y) >= '%d' AND token(x,y) < '%d';" here x,y are the partition key members. | |
:param queue: A multiprocessor queue - multiprocessor.Queue() or multiprocessor.Manager().Queue() . It was found that latter behaved a bit faster, more on that later. | |
:param keyspace: - keyspace of the table | |
:param num_producers: set appropriate number of processes to launch for fetching rows. Depends on size of cluster and speed with which fetches are to be performed | |
:param is_completed_producer: - A signal to processes reading queue that producers have been completed. In this case `multiprocessing.Value('i', 0)` has been used otherwise `multiprocessing.Event()` can be also used to signal | |
* Please note that Joinable Queue can also be used for this purpose (task_done() method) but its much slower in comparison. [Reference](http://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue) | |
:param email_dictionary: - a multiprocessor dict which can be used to track progress. In this case i have used it to sent status via a mail by reading this dict | |
:param logger_name: - in case if this method is used in multiple places a loggername can be used for clarity | |
""" | |
logger = logging.getLogger() | |
logger.info("Initiated Cassandra Producer") | |
producer_proc_list = [] | |
_start = time.time() | |
print "Start Time is - ",_start | |
if num_producers == 0 or num_producers is None : | |
num_producers = 1 | |
# consumer count as per requirement | |
workers = num_producers | |
multiprocessing.Array('i', range(workers)) | |
count = Counter(0) | |
range_slice = 2**64 / workers | |
start_range = -2**63 | |
for i in range(0, workers): | |
# Hack for ending token range | |
if start_range + range_slice == 2**63: | |
logger.info("Worker %d, Start Range : %d , End Range : %d", i, start_range, 2**63 -1) | |
consumer_proc = Process(target=multiprocess_row_producer, args=(keyspace,statement, queue, count, start_range, 2**63-1, email_dictionary, logger_name, True)) | |
else: | |
logger.info("Worker %d, Start Range : %d , End Range : %d", i, start_range,start_range + range_slice ) | |
consumer_proc = Process(target=multiprocess_row_producer, args=(keyspace,statement,queue, count, start_range, start_range+range_slice, email_dictionary, logger_name, True)) | |
# Sleep for couple of seconds before starting next consumer, dont overwhelm the cluster | |
time.sleep(2) | |
start_range = start_range + range_slice | |
consumer_proc.daemon = False | |
consumer_proc.name = 'cassandra_consumer-' + logger_name + str(i) | |
consumer_proc.start() # Launch consumer_proc() as a separate python process | |
producer_proc_list.append(consumer_proc) | |
logger.info("PID of Consumer %d is %d ", i, consumer_proc.pid) | |
# Join the processes into parent process | |
[consumer_proc.join() for consumer_proc in producer_proc_list] | |
is_completed_producer.value = 1 | |
logger.info("Count of bucket_content_attributes_mappings table is %d",count.value()) | |
logger.info("Time Taken For Counting : %d", time.time() - _start) | |
def multiprocess_row_producer(keyspace,query, queue, counter, start_range, end_range, email_dictionary, logger_name, check_spillover=False): | |
global cassandra_session | |
logger = logging.getLogger('multiprocess') | |
logger.info("Initiated Cassandra Producer") | |
if cassandra_session is None or cassandra_session.is_shutdown: | |
cassandra_session = connection_manager.get_cassandra_session() | |
cassandra_session.set_keyspace(keyspace) | |
cql_command = query % (start_range, end_range) | |
exception_count = 0 | |
slice_count = 0 | |
previous_paging_state = None | |
exception_occurred = True | |
simple_statement = SimpleStatement(cql_command, fetch_size=5000) | |
cassandra_session.row_factory = tuple_factory | |
while True: | |
try: | |
if previous_paging_state is None: | |
rs = cassandra_session.execute(simple_statement) | |
elif exception_occurred == True: | |
logger.error("Retrying query %s with Paging State %s", str(query), str(previous_paging_state)) | |
rs = cassandra_session.execute(cql_command, paging_state=previous_paging_state) | |
rows = rs.current_rows | |
previous_paging_state = rs.response_future._paging_state | |
queue.put(rows) | |
if queue.qsize() > 1000: | |
logger.info("Sleeping for 10 Seconds as Queue Size Large") | |
time.sleep(10) | |
counter.update(len(rows)) | |
email_dictionary['Count Processed ' + logger_name] = str(counter.value()) | |
logger.info("Process : %s Fetched %d", str(multiprocessing.current_process().name), len(rows)) | |
logger.info("Process : %s Total Rows Fetched by all processes %d", str(multiprocessing.current_process().name), counter.value()) | |
sw = stopwatch.StopWatch() | |
sw.start('fetch-loop-initiated') | |
if not rs.has_more_pages: | |
# No More Pages to fetch | |
break | |
rs.fetch_next_page() | |
sw.stop('fetch-loop-initiated') | |
# After every Fetch Sleep for 100 ms to avoid pushing limits | |
time.sleep(.1) | |
logger.debug("Time taken to fetch %d rows is %s", len(rs._current_rows) ,str(sw.accum)) | |
exception_occurred = False | |
exception_count = 0 | |
except Exception as ex: | |
exception_occurred = True | |
exception_count +=1 | |
if exception_count > 64: | |
logger.exception("Breaking out of producer due to exception") | |
break | |
# In case of Time out exception or if some mis happening occurs | |
# Exponential retry till 64**2 in the worst case thats more than 1 Hour | |
time.sleep(exception_count**2) | |
logger.exception("Exception occurred while fetching page, retrying again with same page") | |
continue | |
# Close cassandra session as work is completed by producer | |
cassandra_session.shutdown() | |
# Method which is worked on by each consumer process | |
def queue_processor(content_queue, is_completed_producer, email_dictionary): | |
""" | |
:param content_queue: Input queue with containing messages to be processed. Each message is a set of rows from cassandra producer | |
:param is_completed_producer: Signal from cassandra process that it has finished producing rows | |
:param email_dictionary: Push tracking information into this dict | |
""" | |
logger = logging.getLogger() | |
logger.info("Cassandra Queue Processor") | |
while is_completed_producer.value is 0 or content_queue.qsize() > 0: | |
try: | |
try: | |
# Get a message containing set of rows | |
message = content_queue.get(block=False, timeout=.5) | |
except Queue.Empty: | |
logger.info("Give Me work Ive gone jobless") | |
time.sleep(.1) | |
continue | |
except Exception as ex: | |
logger.exception("Exception while fetching from queue, retrying") | |
time.sleep(1) | |
continue | |
logger.info("Queue Status %d ", content_queue.qsize()) | |
# Message fetched is a set of rows producer by cassandra producer process | |
# Do some work on the message, say print each row | |
for row in message: | |
print row | |
except Exception as ex: | |
logger.exception("Exception Occurred during processing cassandra message") | |
continue | |
manager = multiprocessing.Manager() | |
content_queue = manager.Queue() | |
is_completed_producer = multiprocessing.Value('i', 0) | |
producer_process = Process(target=multiprocess_producer_paginated, args=("SELECT col1,col2 from my_table where token(x) >= '%d' AND token(k) < '%d' ", content_queue, "my_keyspace", 30, is_completed_producer, email_dictionary, 'my_table_producer' )) | |
producer_process.daemon = False | |
producer_process.name = 'cassandra_standard_mapping_producer' | |
producer_process.start() | |
logger.info("PID of standard count creation producer is %d", producer_process.pid) | |
num_consumers = 10 | |
# Boot up consumers to pick up messages pushed by cassandra consumers | |
for i in range(0, num_consumers): | |
consumer_proc = Process(target=queue_processor, args=(content_queue, is_completed_producer, email_dictionary)) | |
consumer_proc.daemon = False | |
consumer_proc.name = 'cassandra_standard_count_middleware' + str(i) | |
consumer_proc.start() # Launch reader() as a separate python process | |
logger.info("Sleeping for 10 Seconds before booting up a consumer") | |
consumer_proc_list.append(consumer_proc) | |
logger.info("PID of Cassandra standard Count Consumer %d is %d", i, consumer_proc.pid) | |
# Join the processes back into parent process | |
[consumer_proc.join() for consumer_proc in consumer_proc_list] | |
producer_process.join() | |
# Below is the utility class used for keeping atomic counts across multiple processes | |
class Counter(object): | |
''' | |
Counter implementation for counting across multiple processes | |
Multiprocess Lock is taken on count variable before any operation | |
''' | |
def __init__(self, initval=0): | |
self.val = Value('i', initval) | |
self.lock = Lock() | |
def increment(self): | |
with self.lock: | |
self.val.value += 1 | |
def update(self, delta): | |
with self.lock: | |
self.val.value += delta | |
def value(self): | |
with self.lock: | |
return self.val.value |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment