Skip to content

Instantly share code, notes, and snippets.

@lypro09539
Created December 28, 2020 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lypro09539/fd10fa057964d42ff25ee9cea9cdc47f to your computer and use it in GitHub Desktop.
Save lypro09539/fd10fa057964d42ff25ee9cea9cdc47f to your computer and use it in GitHub Desktop.
139行加了个sleep 10,配合229的sleep10 和 64行的KEEP_ALIVE_TIME=6 ,可以“模拟”特殊调度顺序。预期结果:第二次加的任务f2,一直不会执行
"""
史上最强的python线程池。
最智能的可自动实时调节线程数量的线程池。此线程池和官方concurrent.futures的线程池 是鸭子类关系,所以可以一键替换类名 或者 import as来替换类名。
对比官方线程池,有4个创新功能或改进。
1、主要是不仅能扩大,还可自动缩小(官方内置的ThreadpoolExecutor不具备此功能,此概念是什么意思和目的,可以百度java ThreadpoolExecutor的KeepAliveTime参数的介绍),
2、非常节制的开启多线程,例如实例化一个最大100线程数目的pool,每隔2秒submit一个函数任务,而函数每次只需要1秒就能完成,实际上只需要调节增加到1个线程就可以,不需要慢慢增加到100个线程
官方的线程池不够智能,会一直增加到最大线程数目,此线程池则不会。
3、线程池任务的queue队列,修改为有界队列
4、此线程池运行函数出错时候,直接显示线程错误,官方的线程池则不会显示错误,例如函数中写1/0,任然不现实错误。
"""
import os
import atexit
import queue
import sys
import threading
import time
import weakref
from nb_log import LoggerMixin, nb_print, LoggerLevelSetterMixin, LogManager
_shutdown = False
_threads_queues = weakref.WeakKeyDictionary()
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
class _WorkItem(LoggerMixin):
def __init__(self, fn, args, kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
# noinspection PyBroadException
try:
self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.logger.exception(f'函数 {self.fn.__name__} 中发生错误,错误原因是 {type(exc)} {exc} ')
def __str__(self):
return f'{(self.fn.__name__, self.args, self.kwargs)}'
class ThreadPoolExecutorShrinkAble(LoggerMixin, LoggerLevelSetterMixin):
# 为了和官方自带的THredpoolexecutor保持完全一致的鸭子类,参数设置成死的,不然用户传参了。
MIN_WORKERS = 5
KEEP_ALIVE_TIME = 6
def __init__(self, max_workers=None, thread_name_prefix=''):
"""
最好需要兼容官方concurren.futures.ThreadPoolExecutor 和改版的BoundedThreadPoolExecutor,入参名字和个数保持了一致。
:param max_workers:
:param thread_name_prefix:
"""
self._max_workers = max_workers or 4
self._thread_name_prefix = thread_name_prefix
self.work_queue = queue.Queue(max_workers)
# self._threads = set()
self._threads = weakref.WeakSet()
self._lock_compute_threads_free_count = threading.Lock()
self.threads_free_count = 0
self._shutdown = False
self._shutdown_lock = threading.Lock()
def _change_threads_free_count(self, change_num):
with self._lock_compute_threads_free_count:
self.threads_free_count += change_num
def submit(self, func, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('不能添加新的任务到线程池')
self._adjust_thread_count()
self.work_queue.put(_WorkItem(func, args, kwargs))
def _adjust_thread_count(self):
# if len(self._threads) < self._threads_num:
# self.logger.debug(
# (self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num()))
if self.threads_free_count < self.MIN_WORKERS and len(self._threads) < self._max_workers:
# t = threading.Thread(target=_work,
# args=(self._work_queue,self))
t = _CustomThread(self).set_log_level(self.logger.level)
t.setDaemon(True)
t.start()
self._threads.add(t)
_threads_queues[t] = self.work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self.work_queue.put(None)
if wait:
for t in self._threads:
t.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
# 两个名字都可以,兼容以前的老名字(中文意思是 自定义线程池),但新名字更能表达意思(可缩小线程池)。
CustomThreadpoolExecutor = ThreadPoolExecutorShrinkAble
# noinspection PyProtectedMember
class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin):
def __init__(self, executorx: ThreadPoolExecutorShrinkAble):
super().__init__()
self._executorx = executorx
def _remove_thread(self, stop_resson=''):
# noinspection PyUnresolvedReferences
self.logger.info(f'停止线程 {self._ident}, 触发条件是 {stop_resson} ')
self.logger.info(f'sleep模拟调度')
time.sleep(10)
self._executorx._change_threads_free_count(-1)
self._executorx._threads.remove(self)
_threads_queues.pop(self)
# noinspection PyProtectedMember
def run(self):
# noinspection PyUnresolvedReferences
self.logger.debug(f'新启动线程 {self._ident} ')
self._executorx._change_threads_free_count(1)
while True:
try:
work_item = self._executorx.work_queue.get(block=True, timeout=self._executorx.KEEP_ALIVE_TIME)
except queue.Empty:
# continue
# self._remove_thread()
# break
if self._executorx.threads_free_count > self._executorx.MIN_WORKERS:
self._remove_thread(
f'当前线程超过 {self._executorx.KEEP_ALIVE_TIME} 秒没有任务,线程池中不在工作状态中的线程数量是 '
f'{self._executorx.threads_free_count},超过了指定的数量 {self._executorx.MIN_WORKERS}')
break # 退出while 1,即是结束。这里才是决定线程结束销毁,_remove_thread只是个名字而已,不是由那个来销毁线程。
else:
continue
# nb_print(work_item)
if work_item is not None:
self._executorx._change_threads_free_count(-1)
work_item.run()
del work_item
self._executorx._change_threads_free_count(1)
continue
if _shutdown or self._executorx._shutdown:
self._executorx.work_queue.put(None)
break
process_name_set = set()
logger_show_current_threads_num = LogManager('show_current_threads_num').get_logger_and_add_handlers(
formatter_template=5, log_filename='show_current_threads_num.log', do_not_use_color_handler=False)
def show_current_threads_num(sleep_time=600, process_name='', block=False, daemon=True, work_queue=None):
process_name = sys.argv[0] if process_name == '' else process_name
def _show_current_threads_num():
while True:
# logger_show_current_threads_num.info(f'{process_name} 进程 的 并发数量是 --> {threading.active_count()}')
# nb_print(f' {process_name} {os.getpid()} 进程 的 线程数量是 --> {threading.active_count()}')
logger_show_current_threads_num.info(
f' {process_name} {os.getpid()} 进程 的 线程数量是 --> {threading.active_count()}, work_queue剩余任务数->{work_queue.qsize()}')
time.sleep(sleep_time)
if process_name not in process_name_set:
if block:
_show_current_threads_num()
else:
t = threading.Thread(target=_show_current_threads_num, daemon=daemon)
t.start()
process_name_set.add(process_name)
def get_current_threads_num():
return threading.active_count()
if __name__ == '__main__':
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutorShrinkAble(10)
# pool = ThreadPoolExecutor(200) # 测试对比官方自带
show_current_threads_num(sleep_time=5, work_queue=pool.work_queue)
def f1(a):
time.sleep(0.2) # 可修改这个数字测试多线程数量调节功能。
nb_print(f'{a} 。。。。。。。')
# raise Exception('抛个错误测试') # 官方的不会显示函数出错你,你还以为你写的代码没毛病呢。
def f2(a):
time.sleep(0.2) # 可修改这个数字测试多线程数量调节功能。
1/0
for i in range(30):
time.sleep(0.01) # 这里的间隔时间模拟,当任务来临不密集,只需要少量线程就能搞定f1了,因为f1的消耗时间短,
# 不需要开那么多线程,CustomThreadPoolExecutor比ThreadPoolExecutor 优势之一。
pool.submit(f1, str(i))
#
time.sleep(10)
for i in range(30):
pool.submit(f2, str(i))
time.sleep(100000)
@lypro09539
Copy link
Author

lypro09539 commented Dec 28, 2020

所以不是纯粹的调用方式复现的问题,而是修改了作者的源码(即139行的sleep)来放大问题。仅做研究之用吧。
shrinkable_threadpool_test_1_result.png

@ydf0509
Copy link

ydf0509 commented Dec 29, 2020

加q 909686719

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment