Created
April 6, 2018 12:30
-
-
Save anthonynsimon/7dd032da66fa7511836aae54e00a6633 to your computer and use it in GitHub Desktop.
Signal pool thread pool example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from threading import Thread, Lock | |
import sys | |
import time | |
class Q: | |
def __init__(self): | |
self._items = [] | |
self._mutex = Lock() | |
def put(self, item): | |
self._mutex.acquire() | |
self._items.insert(0, item) | |
self._mutex.release() | |
def get(self, timeout=None): | |
start = time.time() | |
while (timeout is None) or (time.time() - start) < timeout: | |
acquired = self._mutex.acquire(blocking=False) | |
if acquired: | |
if len(self._items) > 0: | |
r = self._items.pop() | |
self._mutex.release() | |
return r | |
self._mutex.release() | |
raise TimeoutError() | |
def join(self): | |
while True: | |
self._mutex.acquire() | |
if len(self._items) == 0: | |
self._mutex.release() | |
break | |
self._mutex.release() | |
def task_done(self): | |
pass | |
class SignalPool: | |
def __init__(self, num_workers=4): | |
self._shutdown = False | |
self._mutex = Lock() | |
self._queue = Q() | |
self._num_workers = num_workers | |
self._init_workers() | |
def _init_workers(self): | |
for _ in range(self._num_workers): | |
t = Thread(target=self._worker, args=[self, self._queue]) | |
t.start() | |
def join(self): | |
self._queue.join() | |
def send(self, item): | |
t = Thread(target=self._queue.put, args=[item]) | |
t.start() | |
@staticmethod | |
def _worker(signal, queue): | |
while True: | |
if signal._is_shutdown(): | |
break | |
try: | |
item = queue.get(timeout=0.0001) | |
print(threading.currentThread().getName(), item) | |
except TimeoutError as e: | |
pass | |
print(threading.currentThread().getName(), "shutting down") | |
def _is_shutdown(self): | |
self._mutex.acquire() | |
r = self._shutdown | |
self._mutex.release() | |
return r | |
def shutdown(self): | |
self._mutex.acquire() | |
self._shutdown = True | |
self._mutex.release() | |
pool = SignalPool(40) | |
for line in range(0, 1000): | |
pool.send(line) | |
pool.join() | |
time.sleep(2) | |
for line in range(1000, 2000): | |
pool.send(line) | |
pool.join() | |
pool.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment