Skip to content

Instantly share code, notes, and snippets.

@ReallyLiri
Created July 11, 2019 08:52
Show Gist options
  • Save ReallyLiri/54612a92fb7613fa316b5af7fd5f7679 to your computer and use it in GitHub Desktop.
Save ReallyLiri/54612a92fb7613fa316b5af7fd5f7679 to your computer and use it in GitHub Desktop.
Simply map-reduce with multiprocessing
import logging
import pandas as pd
import queue
from multiprocessing import Process, Queue
from time import sleep
DEFAULT_TIMEOUT_SEC = 60*10
def _chunks(l, n):
"""Yield successive n chunks from l."""
return (l[i::n] for i in range(n))
def _worker_job(inputs, output_queue, inner_worker):
try:
logging.info("Worker started")
result = inner_worker(inputs)
except:
output_queue.put(None)
logging.exception("worker failed")
else:
output_queue.put(result)
logging.info("Worker completed")
def map_reduce(workers_count, inputs, mapper, reducer, timeout_sec=None):
"""
Forks <workers_count> workers to process inputs using
mapper function: accepts as an input a list of inputs (portion of <inputs>)
reducer function: reduces the outputs of mapper
Returns the mapped-reduced result
"""
logging.info("Map-Reducing %d inputs using %d workers", len(inputs), workers_count)
divided_inputs = list(_chunks(inputs, workers_count))
processes = []
output_queue = Queue()
if not timeout_sec:
timeout_sec = DEFAULT_TIMEOUT_SEC
for i, inputs_ration in enumerate(divided_inputs):
process = Process(target=_worker_job, args=(inputs_ration, output_queue, mapper))
processes.append(process)
process.start()
final_result = None
for i in range(workers_count):
while True:
try:
single_result = output_queue.get(timeout=timeout_sec)
break
except queue.Empty:
sleep(5)
if single_result is None:
logging.warning("No result from worker %d", i)
continue
if final_result is None:
final_result = single_result
else:
final_result = reducer(final_result, single_result)
for process in processes:
process.join()
return final_result
def map_reduce_dataframes(workers_count, inputs, df_from_inputs_mapper, output_file_path=None, timeout_sec=None):
final_df = map_reduce(
workers_count, inputs,
mapper=df_from_inputs_mapper,
reducer=lambda df1, df2: pd.concat([df1, df2]),
timeout_sec=timeout_sec
)
if output_file_path:
final_df.to_excel(output_file_path, encoding='utf-8')
logging.info("Completed writing %d lines to %s", final_df.shape[0], output_file_path)
else:
logging.info("finish map-reduce df: {}".format(final_df.shape[0]))
return final_df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment