Skip to content

Instantly share code, notes, and snippets.

@74togo
Last active August 29, 2015 14:11
Show Gist options
  • Save 74togo/52297f99d8c175d2b32d to your computer and use it in GitHub Desktop.
Save 74togo/52297f99d8c175d2b32d to your computer and use it in GitHub Desktop.
Testing listener model that is thread safe
from threading import Thread, Lock, Condition, get_ident, current_thread
from collections import deque, namedtuple
import time
cond = Condition()
lock = Lock()
ThreadSub = namedtuple("ThreadSub", ["queue", "callbacks"])
subscriptions = {"chat":{}, "file":{}}
def add_listener(type, callback):
with lock:
thread = get_ident()
if thread not in subscriptions[type]:
subscriptions[type][thread] = ThreadSub(deque(), [])
subscriptions[type][thread].callbacks.append(callback)
def listen():
while True:
with cond:
cond.wait()
lock.acquire()
for type in subscriptions:
sub = subscriptions[type].get(get_ident())
if sub is None:
continue
queue, callbacks = sub
while queue:
data = queue.popleft()
callbacks[:] = [callback for callback in callbacks if callback(data) is not False]
lock.release()
def send_message():
message = "kek"
with lock:
for queue, _ in subscriptions['chat'].values():
queue.append(message)
with cond:
cond.notify_all()
def keep_sending():
while True:
time.sleep(3)
send_message()
def plsadd():
muh_thread = get_ident()
def kok(msg):
nonlocal muh_thread
print(msg, muh_thread == get_ident())
if current_thread().name == "listen2":
return False
add_listener('chat', kok)
listen()
Thread(target=plsadd, name="listen1").start()
Thread(target=plsadd, name="listen2").start()
last = Thread(target=keep_sending, name="sender")
last.start()
last.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment