Skip to content

Instantly share code, notes, and snippets.

@leimao
Last active November 4, 2019 22:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leimao/f1d41ecedc04c34a28eb23fb4e5c95a0 to your computer and use it in GitHub Desktop.
Save leimao/f1d41ecedc04c34a28eb23fb4e5c95a0 to your computer and use it in GitHub Desktop.
Python Parallel Computing Example
import threading
import multiprocessing
import time
import random
class ParallelCounter(object):
"""
ParallelCounter instance would have num_counter counters, and num_worker workers.
Each worker would contribute to all counters.
The work load the worker contributed to each counter is determine by his corresponding work load in work_loads.
"""
def __init__(self, num_counters, num_workers, work_loads):
self.num_counters = num_counters
self.num_workers = num_workers
self.work_loads = work_loads
self.counts = [0] * self.num_counters
assert(self.num_workers == len(work_loads))
def multithread_count(self):
lock = threading.Lock()
for i in range(self.num_counters):
threads = [threading.Thread(target=self.worker_thread, args=(j, i, lock)) for j in range(self.num_workers)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
def multiprocess_count(self):
# Map
pool = multiprocessing.Pool(processes=self.num_workers)
results = pool.map(self.worker_process, range(self.num_workers))
# Reduce
for result in results:
for i in range(len(result)):
self.counts[i] += result[i]
def worker_thread(self, worker_id, counter_id, lock):
count = 0
for _ in range(self.work_loads[worker_id]):
count += 1
# Add some working time to minimize the effect brought by the lock
time.sleep(0.001)
# We use lock to prevent multiple threads writing to the memory simultaneously.
# Do not use lock very often since it would slow down the multithread process significantly.
lock.acquire()
self.counts[counter_id] += count
lock.release()
def worker_process(self, worker_id):
counts = [0] * self.num_counters
for i in range(self.num_counters):
for _ in range(self.work_loads[worker_id]):
counts[i] += 1
time.sleep(0.001)
return counts
def get_counts(self):
return self.counts
def reset_counts(self):
self.counts = [0] * self.num_counters
def measure_counter_time(num_counters, num_workers, work_loads):
expected_counts = [sum(work_loads)] * num_counters
print("Expected counts:")
print(expected_counts)
print("Using {} workers".format(num_workers))
print("Using work loads {}".format(work_loads))
print("Total work loads {}".format(sum(work_loads)))
parallel_counter = ParallelCounter(num_counters=num_counters, num_workers=num_workers, work_loads=work_loads)
print("Counting multithread mounter ...")
start_time = time.time()
parallel_counter.multithread_count()
end_time = time.time()
multithread_time_elapsed = end_time - start_time
counts = parallel_counter.get_counts()
print("Time elapsed: {:.2f}".format(multithread_time_elapsed))
if counts != expected_counts:
print("Counts are not correct!")
print("Got counts:")
print(counts)
else:
print("Counts are correct!")
parallel_counter.reset_counts()
print("Counting Using multiprocess Counter ...")
start_time = time.time()
parallel_counter.multiprocess_count()
end_time = time.time()
multiprocess_time_elapsed = end_time - start_time
counts = parallel_counter.get_counts()
print("Time elapsed: {:.2f}".format(multiprocess_time_elapsed))
if counts != expected_counts:
print("Counts are not correct!")
print("Got counts:")
print(counts)
else:
print("Counts are correct!")
return multithread_time_elapsed, multiprocess_time_elapsed
def generate_random_work_loads(num_workers, random_seed=None):
if random_seed is not None:
random.seed(random_seed)
return [random.randint(1, 100) for _ in range(num_workers)]
def main():
random_seed = None
num_counters = 5
num_workers = 10
random_work_loads = generate_random_work_loads(num_workers=num_workers, random_seed=random_seed)
fixed_work_loads = [100] * num_workers
print("-" * 75)
singlethread_time_elapsed, singleprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=1, work_loads=[sum(random_work_loads)])
print("-" * 75)
multithread_time_elapsed, multiprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=num_workers, work_loads=random_work_loads)
print("-" * 75)
print("Multi-thread counter is {:.2f} faster than single-thread counter".format(singlethread_time_elapsed / multithread_time_elapsed))
print("Multi-process counter is {:.2f} faster than single-process counter".format(singleprocess_time_elapsed / multiprocess_time_elapsed))
print("-" * 75)
# Because the work_loads are different among different workers, all the workers could hardly finish at the same time,
# That is why we sometimes could not see the multithread or multiprocess programs increases the processing efficiency as much as we expected.
# However, if we could make work_loads the same among different workers,
# Maximum efficiency enhancement could usually be observed.
singlethread_time_elapsed, singleprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=1, work_loads=[sum(fixed_work_loads)])
print("-" * 75)
multithread_time_elapsed, multiprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=num_workers, work_loads=fixed_work_loads)
print("-" * 75)
print("Multi-thread counter is {:.2f} faster than single-thread counter".format(singlethread_time_elapsed / multithread_time_elapsed))
print("Multi-process counter is {:.2f} faster than single-process counter".format(singleprocess_time_elapsed / multiprocess_time_elapsed))
print("-" * 75)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment