Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample script in Multiprocessing
import time
import datetime
import os
import pandas as pd
from multiprocessing import Process, Queue, current_process
def do_something(fromprocess):
time.sleep(1)
print("Operation Ended for Process:{}, process Id:{}".format(
current_process().name, os.getpid()
))
return "msg"
def submit_job(df, list_items, q, fromprocess):
a = []
for items in list_items:
oneitemdf = df[df['MinorCategory']==items]['FilePath'].values.tolist()
oneitemdf = [x for x in oneitemdf if x.endswith('.png')]
result = do_something(fromprocess)
if result == 'error_occured':
pass
else:
a. append(result)
q.put(a)
def get_n_process(as_batches, processes, df, q):
p = []
for i in range(processes):
work = Process(target=submit_job, args=(df, as_batches[i], q, i))
p.append(work)
work.start()
as_batches = as_batches[processes:]
return p, as_batches
if __name__ == '__main__':
#program start
start = time.perf_counter()
filepath = "F:/multiview_CNN/multiviewclassification/data/processed/train.csv"
df = pd.read_csv(filepath, sep=",")
#get unique categories
minor_categories = list(df.MinorCategory.value_counts().to_dict().keys())
#shared memory
as_batches = []
start_, end_ = 0, len(minor_categories)
#queue results
q = Queue()
print("Started creating batch out of the records...")
while(start_ <= end_):
as_batches.append(minor_categories[start_:start_+10])
start_ += 10 #assing 10 categories to each process
print("Finished creating batches!")
print("Total number of Batches found:{}".format(len(as_batches)))
while(len(as_batches) > 0):
t = []
#dynamically check the lists
if len(as_batches) > 8:
n_process = 8
else:
n_process = len(as_batches)
print("For this it Requries {} Process".format(n_process))
process_obj_inlist, as_batches = get_n_process(as_batches, n_process, df, q)
for ind_process in process_obj_inlist:
ind_process.join()
with open("logs.txt", "a") as f:
f.write("\n")
f.write("Log Recording at: {timestamp}, Remaining N = {remaining} yet to be processed".format(
timestamp=datetime.datetime.now(),
remaining = len(as_batches)
))
f.close()
finish = time.perf_counter()
print("Time took to execute the program:{}".format(round(finish - start, 2)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment