Skip to content

Instantly share code, notes, and snippets.

@abersheeran
Created October 31, 2023 03:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abersheeran/de4dfb0626d9deb87fad67858e268741 to your computer and use it in GitHub Desktop.
Save abersheeran/de4dfb0626d9deb87fad67858e268741 to your computer and use it in GitHub Desktop.
Python:在多个进程中分别使用线程池处理任务
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import queue
import signal
import time
from typing import Callable, ParamSpec
from loguru import logger
P = ParamSpec("P")
class MultiWorker:
"""
在多个进程中分别使用线程池处理任务
```python
with MultiWorker(
processes=settings.receive_processes, threads=settings.receive_workers
) as workers:
...
workers.submit(fn, *args, **kwargs)
```
"""
def __init__(
self, processes: int = 4, threads: int = 10, *, maxsize: int = 0
) -> None:
self._threads = threads
self.queue = multiprocessing.Queue(maxsize)
self.processes = [
multiprocessing.Process(
target=self._run, args=(threads, self.queue), daemon=True
)
for _ in range(min(processes, multiprocessing.cpu_count()))
]
@staticmethod
def _run(threads: int, q: multiprocessing.Queue) -> None:
shutdown = False
def exit(*args, **kwargs) -> None:
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGINT, exit)
signal.signal(signal.SIGTERM, exit)
with ThreadPoolExecutor(max_workers=threads) as workers:
while not shutdown:
try:
fn, args, kwargs = q.get_nowait()
except queue.Empty:
time.sleep(0.1)
continue
workers.submit(fn, *args, **kwargs)
def submit(self, fn: Callable[P, None], *args: P.args, **kwargs: P.kwargs) -> None:
for idx, process in enumerate(tuple(self.processes)):
if process.is_alive():
continue
logger.warning("Child process [{}] died unexpectedly".format(process.pid))
del self.processes[idx]
process = multiprocessing.Process(
target=self._run, args=(self._threads, self.queue), daemon=True
)
self.processes.append(process)
process.start()
self.queue.put((fn, args, kwargs))
def start(self) -> None:
for process in self.processes:
process.start()
def close(self, block: bool = True) -> None:
for process in self.processes:
process.terminate()
if block:
for process in self.processes:
process.join()
def __enter__(self) -> "MultiWorker":
self.start()
return self
def __exit__(self, *args) -> None:
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment