Skip to content

Instantly share code, notes, and snippets.

@pypeach
Created July 23, 2018 13:34
Show Gist options
  • Save pypeach/62e7c9cfc4b6be8e62f031bdec3f6a27 to your computer and use it in GitHub Desktop.
Save pypeach/62e7c9cfc4b6be8e62f031bdec3f6a27 to your computer and use it in GitHub Desktop.
マルチスレッド処理を行うサンプルアプリケーションです
# coding:utf-8
import concurrent
import logging
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from time import sleep
from app.app_logic_base import AppLogicBase
from app.enum.gender_type import GenderType
from app.sql import employees
from app.util import date_helper
"""
マルチスレッドでDBアクセスするサンプルアプリケーションです
"""
__author__ = "t.ebinuma"
__version__ = "1.0"
__date__ = "27 May 2018"
class ThreadPoolExecutorOperateDb(AppLogicBase):
def __init__(self):
super().__init__()
# loggerを設定する
self.logger = logging.getLogger(__name__)
self.default_insert_count = 10
self.default_max_workers = 5
self.default_timeout = 30
def insert(self, emp_no):
"""
employeesのレコード登録を行う
"""
start_time = int(datetime.now().strftime('%Y%m%d%H%M%S'))
try:
# employeesにレコードを登録する
logging.debug("emp_no={}".format(emp_no))
gender_type = GenderType.get_random_choice()
param = {'emp_no': emp_no,
'first_name': 'first_name',
'last_name': 'last_name',
'gender': gender_type.value,
'birth_date': date_helper.convert_string_to_date("19950101", date_helper.format_ymd),
'hire_date': date_helper.convert_string_to_date("20160517", date_helper.format_ymd),
}
employees.insert(param)
# シングルプロセスとマルチスレッドで
# 処理速度の違いをわかりやすくするため、sleepを追加しています
sleep(1)
except Exception as e:
self.logger.debug("レコード登録エラー:%s", e)
return int(datetime.now().strftime('%Y%m%d%H%M%S')) - start_time
def thread_pool_executor_insert(self, max_workers=None, max_insert_count=None):
"""
employeesのレコード登録をマルチスレッド処理で行う
"""
if max_workers is None:
max_workers = self.default_max_workers
if max_insert_count is None:
max_insert_count = self.default_insert_count
# マルチスレッド処理を行う
with ThreadPoolExecutor(max_workers=max_workers) as executor:
result = {executor.submit(self.insert, emp_no): emp_no for emp_no in range(max_insert_count)}
for future in concurrent.futures.as_completed(result):
try:
interval = future.result()
# タイムアウト時間(second)を超えた場合はタイムアウトエラーにする
if interval > self.default_timeout:
raise TimeoutError("処理時間は{}秒です".format(interval))
except TimeoutError as e:
logging.info("TimeoutError:%s", e)
def main():
app_class = ThreadPoolExecutorOperateDb()
app_class.thread_pool_executor_insert(app_class.default_max_workers, app_class.default_insert_count)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment