Skip to content

Instantly share code, notes, and snippets.

@fuzzy-focus
Created July 1, 2020 23:47
Show Gist options
  • Save fuzzy-focus/2e92919eca5d9f07c1bc902b381ce08d to your computer and use it in GitHub Desktop.
Save fuzzy-focus/2e92919eca5d9f07c1bc902b381ce08d to your computer and use it in GitHub Desktop.
Test Asyncio in conjunction with threading
import asyncio
import threading
import queue
import itertools
import time
class Bus(threading.Thread):
def __init__(self, eq, bq):
super().__init__(daemon=True)
self.q = bq
self.eq = eq
self.val1 = 0
self._val2 = itertools.cycle(range(10))
@property
def val2(self):
return next(self._val2)
def run(self):
while True:
ind, msg, data = self.q.get()
print(f"BUS {ind: <3} msg = {msg}, data = {data}")
time.sleep(.1)
if msg == "read":
self.eq.put((ind, "read_resp", getattr(self, data)))
elif msg == "write":
self.val1 = data
time.sleep(.01)
class Entity(threading.Thread):
def __init__(self, eq, h):
super().__init__(daemon=True)
self.q = eq
self.h = h
def run(self):
while True:
ind, msg, data = self.q.get()
print(f"ENT {ind: <3} msg = {msg}, data = {data}")
time.sleep(.1)
if msg == "read":
print("?!?")
elif msg == "read_resp":
self.h.handle_read_resp(ind, msg, data)
elif msg == "write":
print("?!?")
class Data:
def __init__(self, h):
self.h = h
@property
def val1(self):
return self.h.handle_read("val1")
@val1.setter
def val1(self, value):
self.h.handle_write(value)
@property
def val2(self):
return self.h.handle_read("val2")
@val2.setter
def val2(self, value):
raise AttributeError()
class Handler:
def __init__(self, q):
self.futures = {}
self.q = q
self._i = 0
@property
def i(self):
i = self._i
self._i +=1
return i
def handle_read(self, data):
i = self.i
f = asyncio.Future()
self.futures[i] = f
self.q.put((i, "read", data))
return f
def handle_read_resp(self, ind, msg, data):
print(f"HDL: {ind: <3} msg = {msg}, data = {data}, fut={self.futures}")
f = self.futures.pop(ind)
print(f)
f.set_result(data)
print(f)
def handle_write(self, data):
self.q.put((self.i, "write", data))
async def sim(data):
while True:
print("writing val1")
data.val1 = 5
print("trying to read val1")
v1 = data.val1
print(f"reading v1: {v1}")
v1 = await asyncio.wait_for(v1, 1)
print(f"successfully read v1 = {v1}")
await asyncio.sleep(1)
print("read multiple")
futures = [data.val2 for _ in range(3)]
print("waiting for them to complete")
await asyncio.sleep(1)
vals = await asyncio.gather(*futures)
print(f"returned {vals}")
await asyncio.sleep(1)
def main():
eq = queue.Queue()
bq = queue.Queue()
h = Handler(bq)
ent = Entity(eq,h)
bus = Bus(eq, bq)
data = Data(h)
bus.start()
ent.start()
ev = threading.Event()
try:
print("running all the threads")
asyncio.run(sim(data))
ev.wait()
except (KeyboardInterrupt, SystemExit):
print("\nbye")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment