Skip to content

Instantly share code, notes, and snippets.

@rishibarve
Created August 30, 2018 06:54
Show Gist options
  • Save rishibarve/ccab04b9d53c0106c6c3f690089d0229 to your computer and use it in GitHub Desktop.
Save rishibarve/ccab04b9d53c0106c6c3f690089d0229 to your computer and use it in GitHub Desktop.
Code for multiprocessing python with feature to save work in middle to skip in case of failure. Helpful for processing large number of small files in a environment where failures are common.
import multiprocessing
# split a list into evenly sized chunks
def chunks(l, n):
return [l[i:i+n] for i in range(0, len(l), n)]
def do_job(job_id, data_slice):
processed = []
for i, item in enumerate(data_slice):
#saving the work in middle in case of failure
if i%1000 == 0:
outfile = open('monitor.txt', 'a')
outfile.write("\n".join(processed))
processed = []
#do the actual job here
print "job", job_id, item
#append the item to the processed list
processed.append(item)
def dispatch_jobs(data, job_number):
total = len(data)
chunk_size = total / job_number
slice = chunks(data, chunk_size)
jobs = []
for i, s in enumerate(slice):
j = multiprocessing.Process(target=do_job, args=(i, s))
jobs.append(j)
for j in jobs:
j.start()
if __name__ == "__main__":
data = ['0']*10000
dispatch_jobs(data, 3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment