Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pythonでconcurrent.futuresを使った並列タスク実行
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
if __name__ == "__main__":
main()
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
if __name__ == "__main__":
main()
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
if __name__ == "__main__":
main()
import sys
import time
from concurrent.futures.process import ProcessPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
from random import random
from time import time
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment