Skip to content

Instantly share code, notes, and snippets.

@tag1216
Created April 2, 2017 12:19
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tag1216/40b75346fd4ffdbfba22a55905094b0e to your computer and use it in GitHub Desktop.
Save tag1216/40b75346fd4ffdbfba22a55905094b0e to your computer and use it in GitHub Desktop.
Pythonでconcurrent.futuresを使った並列タスク実行
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
if __name__ == "__main__":
main()
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
if __name__ == "__main__":
main()
import time
from concurrent.futures import ThreadPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
if __name__ == "__main__":
main()
import sys
import time
from concurrent.futures.process import ProcessPoolExecutor
from logging import StreamHandler, Formatter, INFO, getLogger
from random import random
from time import time
def init_logger():
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment