-
-
Save lypro09539/fd10fa057964d42ff25ee9cea9cdc47f to your computer and use it in GitHub Desktop.
139行加了个sleep 10,配合229的sleep10 和 64行的KEEP_ALIVE_TIME=6 ,可以“模拟”特殊调度顺序。预期结果:第二次加的任务f2,一直不会执行
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
史上最强的python线程池。 | |
最智能的可自动实时调节线程数量的线程池。此线程池和官方concurrent.futures的线程池 是鸭子类关系,所以可以一键替换类名 或者 import as来替换类名。 | |
对比官方线程池,有4个创新功能或改进。 | |
1、主要是不仅能扩大,还可自动缩小(官方内置的ThreadpoolExecutor不具备此功能,此概念是什么意思和目的,可以百度java ThreadpoolExecutor的KeepAliveTime参数的介绍), | |
2、非常节制的开启多线程,例如实例化一个最大100线程数目的pool,每隔2秒submit一个函数任务,而函数每次只需要1秒就能完成,实际上只需要调节增加到1个线程就可以,不需要慢慢增加到100个线程 | |
官方的线程池不够智能,会一直增加到最大线程数目,此线程池则不会。 | |
3、线程池任务的queue队列,修改为有界队列 | |
4、此线程池运行函数出错时候,直接显示线程错误,官方的线程池则不会显示错误,例如函数中写1/0,任然不现实错误。 | |
""" | |
import os | |
import atexit | |
import queue | |
import sys | |
import threading | |
import time | |
import weakref | |
from nb_log import LoggerMixin, nb_print, LoggerLevelSetterMixin, LogManager | |
_shutdown = False | |
_threads_queues = weakref.WeakKeyDictionary() | |
def _python_exit(): | |
global _shutdown | |
_shutdown = True | |
items = list(_threads_queues.items()) | |
for t, q in items: | |
q.put(None) | |
for t, q in items: | |
t.join() | |
atexit.register(_python_exit) | |
class _WorkItem(LoggerMixin): | |
def __init__(self, fn, args, kwargs): | |
self.fn = fn | |
self.args = args | |
self.kwargs = kwargs | |
def run(self): | |
# noinspection PyBroadException | |
try: | |
self.fn(*self.args, **self.kwargs) | |
except BaseException as exc: | |
self.logger.exception(f'函数 {self.fn.__name__} 中发生错误,错误原因是 {type(exc)} {exc} ') | |
def __str__(self): | |
return f'{(self.fn.__name__, self.args, self.kwargs)}' | |
class ThreadPoolExecutorShrinkAble(LoggerMixin, LoggerLevelSetterMixin): | |
# 为了和官方自带的THredpoolexecutor保持完全一致的鸭子类,参数设置成死的,不然用户传参了。 | |
MIN_WORKERS = 5 | |
KEEP_ALIVE_TIME = 6 | |
def __init__(self, max_workers=None, thread_name_prefix=''): | |
""" | |
最好需要兼容官方concurren.futures.ThreadPoolExecutor 和改版的BoundedThreadPoolExecutor,入参名字和个数保持了一致。 | |
:param max_workers: | |
:param thread_name_prefix: | |
""" | |
self._max_workers = max_workers or 4 | |
self._thread_name_prefix = thread_name_prefix | |
self.work_queue = queue.Queue(max_workers) | |
# self._threads = set() | |
self._threads = weakref.WeakSet() | |
self._lock_compute_threads_free_count = threading.Lock() | |
self.threads_free_count = 0 | |
self._shutdown = False | |
self._shutdown_lock = threading.Lock() | |
def _change_threads_free_count(self, change_num): | |
with self._lock_compute_threads_free_count: | |
self.threads_free_count += change_num | |
def submit(self, func, *args, **kwargs): | |
with self._shutdown_lock: | |
if self._shutdown: | |
raise RuntimeError('不能添加新的任务到线程池') | |
self._adjust_thread_count() | |
self.work_queue.put(_WorkItem(func, args, kwargs)) | |
def _adjust_thread_count(self): | |
# if len(self._threads) < self._threads_num: | |
# self.logger.debug( | |
# (self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num())) | |
if self.threads_free_count < self.MIN_WORKERS and len(self._threads) < self._max_workers: | |
# t = threading.Thread(target=_work, | |
# args=(self._work_queue,self)) | |
t = _CustomThread(self).set_log_level(self.logger.level) | |
t.setDaemon(True) | |
t.start() | |
self._threads.add(t) | |
_threads_queues[t] = self.work_queue | |
def shutdown(self, wait=True): | |
with self._shutdown_lock: | |
self._shutdown = True | |
self.work_queue.put(None) | |
if wait: | |
for t in self._threads: | |
t.join() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.shutdown(wait=True) | |
return False | |
# 两个名字都可以,兼容以前的老名字(中文意思是 自定义线程池),但新名字更能表达意思(可缩小线程池)。 | |
CustomThreadpoolExecutor = ThreadPoolExecutorShrinkAble | |
# noinspection PyProtectedMember | |
class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin): | |
def __init__(self, executorx: ThreadPoolExecutorShrinkAble): | |
super().__init__() | |
self._executorx = executorx | |
def _remove_thread(self, stop_resson=''): | |
# noinspection PyUnresolvedReferences | |
self.logger.info(f'停止线程 {self._ident}, 触发条件是 {stop_resson} ') | |
self.logger.info(f'sleep模拟调度') | |
time.sleep(10) | |
self._executorx._change_threads_free_count(-1) | |
self._executorx._threads.remove(self) | |
_threads_queues.pop(self) | |
# noinspection PyProtectedMember | |
def run(self): | |
# noinspection PyUnresolvedReferences | |
self.logger.debug(f'新启动线程 {self._ident} ') | |
self._executorx._change_threads_free_count(1) | |
while True: | |
try: | |
work_item = self._executorx.work_queue.get(block=True, timeout=self._executorx.KEEP_ALIVE_TIME) | |
except queue.Empty: | |
# continue | |
# self._remove_thread() | |
# break | |
if self._executorx.threads_free_count > self._executorx.MIN_WORKERS: | |
self._remove_thread( | |
f'当前线程超过 {self._executorx.KEEP_ALIVE_TIME} 秒没有任务,线程池中不在工作状态中的线程数量是 ' | |
f'{self._executorx.threads_free_count},超过了指定的数量 {self._executorx.MIN_WORKERS}') | |
break # 退出while 1,即是结束。这里才是决定线程结束销毁,_remove_thread只是个名字而已,不是由那个来销毁线程。 | |
else: | |
continue | |
# nb_print(work_item) | |
if work_item is not None: | |
self._executorx._change_threads_free_count(-1) | |
work_item.run() | |
del work_item | |
self._executorx._change_threads_free_count(1) | |
continue | |
if _shutdown or self._executorx._shutdown: | |
self._executorx.work_queue.put(None) | |
break | |
process_name_set = set() | |
logger_show_current_threads_num = LogManager('show_current_threads_num').get_logger_and_add_handlers( | |
formatter_template=5, log_filename='show_current_threads_num.log', do_not_use_color_handler=False) | |
def show_current_threads_num(sleep_time=600, process_name='', block=False, daemon=True, work_queue=None): | |
process_name = sys.argv[0] if process_name == '' else process_name | |
def _show_current_threads_num(): | |
while True: | |
# logger_show_current_threads_num.info(f'{process_name} 进程 的 并发数量是 --> {threading.active_count()}') | |
# nb_print(f' {process_name} {os.getpid()} 进程 的 线程数量是 --> {threading.active_count()}') | |
logger_show_current_threads_num.info( | |
f' {process_name} {os.getpid()} 进程 的 线程数量是 --> {threading.active_count()}, work_queue剩余任务数->{work_queue.qsize()}') | |
time.sleep(sleep_time) | |
if process_name not in process_name_set: | |
if block: | |
_show_current_threads_num() | |
else: | |
t = threading.Thread(target=_show_current_threads_num, daemon=daemon) | |
t.start() | |
process_name_set.add(process_name) | |
def get_current_threads_num(): | |
return threading.active_count() | |
if __name__ == '__main__': | |
from concurrent.futures import ThreadPoolExecutor | |
pool = ThreadPoolExecutorShrinkAble(10) | |
# pool = ThreadPoolExecutor(200) # 测试对比官方自带 | |
show_current_threads_num(sleep_time=5, work_queue=pool.work_queue) | |
def f1(a): | |
time.sleep(0.2) # 可修改这个数字测试多线程数量调节功能。 | |
nb_print(f'{a} 。。。。。。。') | |
# raise Exception('抛个错误测试') # 官方的不会显示函数出错你,你还以为你写的代码没毛病呢。 | |
def f2(a): | |
time.sleep(0.2) # 可修改这个数字测试多线程数量调节功能。 | |
1/0 | |
for i in range(30): | |
time.sleep(0.01) # 这里的间隔时间模拟,当任务来临不密集,只需要少量线程就能搞定f1了,因为f1的消耗时间短, | |
# 不需要开那么多线程,CustomThreadPoolExecutor比ThreadPoolExecutor 优势之一。 | |
pool.submit(f1, str(i)) | |
# | |
time.sleep(10) | |
for i in range(30): | |
pool.submit(f2, str(i)) | |
time.sleep(100000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
所以不是纯粹的调用方式复现的问题,而是修改了作者的源码(即139行的sleep)来放大问题。仅做研究之用吧。