Skip to content

Instantly share code, notes, and snippets.

@IaroslavR
Last active October 16, 2015 11:11
Show Gist options
  • Save IaroslavR/c68e1945d9df677d6e7d to your computer and use it in GitHub Desktop.
Save IaroslavR/c68e1945d9df677d6e7d to your computer and use it in GitHub Desktop.
Simple multiprocessing for Python 2.7.x with ^C handling in threads
import argparse
import datetime
import multiprocessing as mp
import os
import logging
import Queue
import time
import subprocess
import random
import shutil
import sys
NAME = 'test'
def setup_logger(logger_name, log_file=None, log_file_mode='w', level=logging.INFO):
logger = logging.getLogger(logger_name)
if not logger.handlers:
formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%H:%M:%S %d/%m'
)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if log_file:
file_handler = logging.FileHandler(log_file, log_file_mode, encoding='utf8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(level)
return logger
def execute(cmd_parts, variables=None):
zero_div = 5 / random.randint(0, 10) # rise Error for test proposes
if variables:
env_vars = dict(os.environ, **variables)
else:
env_vars = os.environ
process = subprocess.Popen(cmd_parts, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_vars)
response, error_message = process.communicate()
if process.returncode:
raise ValueError(process.returncode, error_message)
return response
def worker(thread_id, tasks, results):
log.info("Starting thread: %s", thread_id)
dir_name = 'temp_for_{}'.format(thread_id)
os.makedirs(dir_name)
try:
while not tasks.empty():
data = tasks.get()
log.info("Run stress in %s thread for %s", thread_id, data)
response = execute(['stress', '--cpu', '1', '-t', str(data)])
results.put(response)
time.sleep(4)
except KeyboardInterrupt:
log.info('Interrupt signal received in thread %s.', thread_id)
except Queue.Empty:
log.info("All done in thread %s", thread_id)
except Exception:
log.error("Unexpected error in %s", thread_id, exc_info=True)
finally:
log.info(" Cleaning up in thread %s", thread_id)
shutil.rmtree(dir_name)
if __name__ == '__main__':
start_time = time.time()
parser = argparse.ArgumentParser(description=NAME)
parser.add_argument(
'--level',
choices=['FATAL', 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'],
nargs='?',
const='INFO', default='INFO',
help='Set logging level. Default level - INFO'
)
args = parser.parse_args()
log = setup_logger(NAME, log_file=NAME + '.log', level=args.level)
manager = mp.Manager()
tasks, results = manager.Queue(), manager.Queue() # http://stackoverflow.com/q/33152171/4249707
for _ in xrange(20):
tasks.put(random.randint(1, 15))
workers = [mp.Process(target=worker, args=(i, tasks, results)) for i in xrange(mp.cpu_count())]
for worker in workers:
worker.start()
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
log.info('Interrupt signal received in main. Cleaning up main')
finally:
while not results.empty():
log.info(results.get())
execution_time = time.time() - start_time
log.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment