Last active
November 4, 2019 22:50
-
-
Save leimao/f1d41ecedc04c34a28eb23fb4e5c95a0 to your computer and use it in GitHub Desktop.
Python Parallel Computing Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import multiprocessing | |
import time | |
import random | |
class ParallelCounter(object): | |
""" | |
ParallelCounter instance would have num_counter counters, and num_worker workers. | |
Each worker would contribute to all counters. | |
The work load the worker contributed to each counter is determine by his corresponding work load in work_loads. | |
""" | |
def __init__(self, num_counters, num_workers, work_loads): | |
self.num_counters = num_counters | |
self.num_workers = num_workers | |
self.work_loads = work_loads | |
self.counts = [0] * self.num_counters | |
assert(self.num_workers == len(work_loads)) | |
def multithread_count(self): | |
lock = threading.Lock() | |
for i in range(self.num_counters): | |
threads = [threading.Thread(target=self.worker_thread, args=(j, i, lock)) for j in range(self.num_workers)] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() | |
def multiprocess_count(self): | |
# Map | |
pool = multiprocessing.Pool(processes=self.num_workers) | |
results = pool.map(self.worker_process, range(self.num_workers)) | |
# Reduce | |
for result in results: | |
for i in range(len(result)): | |
self.counts[i] += result[i] | |
def worker_thread(self, worker_id, counter_id, lock): | |
count = 0 | |
for _ in range(self.work_loads[worker_id]): | |
count += 1 | |
# Add some working time to minimize the effect brought by the lock | |
time.sleep(0.001) | |
# We use lock to prevent multiple threads writing to the memory simultaneously. | |
# Do not use lock very often since it would slow down the multithread process significantly. | |
lock.acquire() | |
self.counts[counter_id] += count | |
lock.release() | |
def worker_process(self, worker_id): | |
counts = [0] * self.num_counters | |
for i in range(self.num_counters): | |
for _ in range(self.work_loads[worker_id]): | |
counts[i] += 1 | |
time.sleep(0.001) | |
return counts | |
def get_counts(self): | |
return self.counts | |
def reset_counts(self): | |
self.counts = [0] * self.num_counters | |
def measure_counter_time(num_counters, num_workers, work_loads): | |
expected_counts = [sum(work_loads)] * num_counters | |
print("Expected counts:") | |
print(expected_counts) | |
print("Using {} workers".format(num_workers)) | |
print("Using work loads {}".format(work_loads)) | |
print("Total work loads {}".format(sum(work_loads))) | |
parallel_counter = ParallelCounter(num_counters=num_counters, num_workers=num_workers, work_loads=work_loads) | |
print("Counting multithread mounter ...") | |
start_time = time.time() | |
parallel_counter.multithread_count() | |
end_time = time.time() | |
multithread_time_elapsed = end_time - start_time | |
counts = parallel_counter.get_counts() | |
print("Time elapsed: {:.2f}".format(multithread_time_elapsed)) | |
if counts != expected_counts: | |
print("Counts are not correct!") | |
print("Got counts:") | |
print(counts) | |
else: | |
print("Counts are correct!") | |
parallel_counter.reset_counts() | |
print("Counting Using multiprocess Counter ...") | |
start_time = time.time() | |
parallel_counter.multiprocess_count() | |
end_time = time.time() | |
multiprocess_time_elapsed = end_time - start_time | |
counts = parallel_counter.get_counts() | |
print("Time elapsed: {:.2f}".format(multiprocess_time_elapsed)) | |
if counts != expected_counts: | |
print("Counts are not correct!") | |
print("Got counts:") | |
print(counts) | |
else: | |
print("Counts are correct!") | |
return multithread_time_elapsed, multiprocess_time_elapsed | |
def generate_random_work_loads(num_workers, random_seed=None): | |
if random_seed is not None: | |
random.seed(random_seed) | |
return [random.randint(1, 100) for _ in range(num_workers)] | |
def main(): | |
random_seed = None | |
num_counters = 5 | |
num_workers = 10 | |
random_work_loads = generate_random_work_loads(num_workers=num_workers, random_seed=random_seed) | |
fixed_work_loads = [100] * num_workers | |
print("-" * 75) | |
singlethread_time_elapsed, singleprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=1, work_loads=[sum(random_work_loads)]) | |
print("-" * 75) | |
multithread_time_elapsed, multiprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=num_workers, work_loads=random_work_loads) | |
print("-" * 75) | |
print("Multi-thread counter is {:.2f} faster than single-thread counter".format(singlethread_time_elapsed / multithread_time_elapsed)) | |
print("Multi-process counter is {:.2f} faster than single-process counter".format(singleprocess_time_elapsed / multiprocess_time_elapsed)) | |
print("-" * 75) | |
# Because the work_loads are different among different workers, all the workers could hardly finish at the same time, | |
# That is why we sometimes could not see the multithread or multiprocess programs increases the processing efficiency as much as we expected. | |
# However, if we could make work_loads the same among different workers, | |
# Maximum efficiency enhancement could usually be observed. | |
singlethread_time_elapsed, singleprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=1, work_loads=[sum(fixed_work_loads)]) | |
print("-" * 75) | |
multithread_time_elapsed, multiprocess_time_elapsed = measure_counter_time(num_counters=num_counters, num_workers=num_workers, work_loads=fixed_work_loads) | |
print("-" * 75) | |
print("Multi-thread counter is {:.2f} faster than single-thread counter".format(singlethread_time_elapsed / multithread_time_elapsed)) | |
print("Multi-process counter is {:.2f} faster than single-process counter".format(singleprocess_time_elapsed / multiprocess_time_elapsed)) | |
print("-" * 75) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment