Skip to content

Instantly share code, notes, and snippets.

@anthonynsimon
Created April 6, 2018 12:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anthonynsimon/7dd032da66fa7511836aae54e00a6633 to your computer and use it in GitHub Desktop.
Save anthonynsimon/7dd032da66fa7511836aae54e00a6633 to your computer and use it in GitHub Desktop.
Signal pool thread pool example
import threading
from threading import Thread, Lock
import sys
import time
class Q:
def __init__(self):
self._items = []
self._mutex = Lock()
def put(self, item):
self._mutex.acquire()
self._items.insert(0, item)
self._mutex.release()
def get(self, timeout=None):
start = time.time()
while (timeout is None) or (time.time() - start) < timeout:
acquired = self._mutex.acquire(blocking=False)
if acquired:
if len(self._items) > 0:
r = self._items.pop()
self._mutex.release()
return r
self._mutex.release()
raise TimeoutError()
def join(self):
while True:
self._mutex.acquire()
if len(self._items) == 0:
self._mutex.release()
break
self._mutex.release()
def task_done(self):
pass
class SignalPool:
def __init__(self, num_workers=4):
self._shutdown = False
self._mutex = Lock()
self._queue = Q()
self._num_workers = num_workers
self._init_workers()
def _init_workers(self):
for _ in range(self._num_workers):
t = Thread(target=self._worker, args=[self, self._queue])
t.start()
def join(self):
self._queue.join()
def send(self, item):
t = Thread(target=self._queue.put, args=[item])
t.start()
@staticmethod
def _worker(signal, queue):
while True:
if signal._is_shutdown():
break
try:
item = queue.get(timeout=0.0001)
print(threading.currentThread().getName(), item)
except TimeoutError as e:
pass
print(threading.currentThread().getName(), "shutting down")
def _is_shutdown(self):
self._mutex.acquire()
r = self._shutdown
self._mutex.release()
return r
def shutdown(self):
self._mutex.acquire()
self._shutdown = True
self._mutex.release()
pool = SignalPool(40)
for line in range(0, 1000):
pool.send(line)
pool.join()
time.sleep(2)
for line in range(1000, 2000):
pool.send(line)
pool.join()
pool.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment