Skip to content

Instantly share code, notes, and snippets.

@jg75
Last active May 16, 2019 15:39
Show Gist options
  • Save jg75/7d82a82ce0a46b7b4da18af771c7d652 to your computer and use it in GitHub Desktop.
Save jg75/7d82a82ce0a46b7b4da18af771c7d652 to your computer and use it in GitHub Desktop.
threading and multiprocessing, two great tastes that taste great together
"""Threading vs. Multiprocessing thread pools example."""
import argparse
import logging
import multiprocessing.pool
import random
import threading
import time
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s]:%(name)s:%(threadName)s:%(message)s"
)
class Example:
"""An example base class for thread pool examples."""
def __init__(self, count):
"""Override."""
self.count = count
self.logger = logging.getLogger(self.__class__.__name__)
def random_data(self, low=2, high=10, count=None):
"""Get a list of random numbers."""
data = [
random.randint(low, high)
for i in range(count if count else self.count)
]
self.logger.info(f"Random Data:{data}")
return data
def worker(self, amount):
"""Sleep for the amount of time specified and return the amount."""
self.logger.info("Started")
time.sleep(amount)
self.logger.info(amount)
return amount
def start(self):
"""Start the thread pool."""
pass
class ThreadingPool(Example):
"""Example thread pool using threading."""
def start(self):
"""Override."""
data = self.random_data()
threads = [
threading.Thread(
target=self.worker,
kwargs={"amount": data[i-1]}
)
for i in range(threading.active_count(), self.count + 1)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.logger.info(f"Completed:{data}")
class MultiprocessingPool(Example):
"""Example thread pool using multiprocessing."""
def start(self):
"""Override."""
data = self.random_data()
pool = multiprocessing.pool.ThreadPool()
results = pool.map(self.worker, data)
pool.close()
pool.join()
self.logger.info(f"Completed:{results}")
class MultiprocessingThreadPool(Example):
def __init__(self, processor_threads=8):
super().__init__(processor_threads)
def get_data(self, count):
d = count // self.count
r = count % self.count
return [
self.random_data(count=d if i != r else d + 1)
for i in range(self.count)
]
def queue(self, data):
threads = [
threading.Thread(
target=self.worker,
kwargs={"amount": amount}
)
for amount in data
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.logger.info(f"Completed:{data}")
return data
def start(self, count):
data = self.get_data(count)
pool = multiprocessing.pool.ThreadPool()
results = pool.map(self.queue, data)
pool.close()
pool.join()
self.logger.info(f"Completed:{results}")
def threads(count):
"""Create and start a ThreadingPool."""
threads = ThreadingPool(count)
threads.start()
def processes(count):
"""Create and start a MultiprocessingPool."""
processes = MultiprocessingPool(count)
processes.start()
def process_threads(count):
pool = MultiprocessingThreadPool()
pool.start(count)
def parse_args():
"""Parse input arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="Thread pool type")
subparsers.required = True
subparsers.dest = "command"
parser.add_argument("count", type=int, help="Thread count")
subparser = subparsers.add_parser('threads')
subparser.set_defaults(func=threads)
subparser = subparsers.add_parser('processes')
subparser.set_defaults(func=processes)
subparser = subparsers.add_parser('process_threads')
subparser.set_defaults(func=process_threads)
return parser.parse_args()
def main():
arguments = parse_args()
arguments.func(arguments.count)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment