Created
May 9, 2020 09:55
-
-
Save Dansyuqri/cb548fbf0eb489eaff27af3f172fd1ee to your computer and use it in GitHub Desktop.
This is a reimplementation of the https://gist.github.com/Dansyuqri/93b8df592946cf5ec2158aa10c13bbcc gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# multi_pub_coro.py | |
# This is a reimplementation of the https://gist.github.com/Dansyuqri/93b8df592946cf5ec2158aa10c13bbcc gist | |
# which allows for all functions to run asynnchronously using coroutines. | |
import asyncio | |
import time | |
import zmq | |
from zmq.asyncio import Context, Poller | |
host = "127.0.0.1" | |
port = "5001" | |
ctx = Context.instance() | |
async def update_smart_mirror(appliance): | |
socket = ctx.socket(zmq.PUB) | |
socket.connect(f"tcp://{host}:{port}") | |
await asyncio.sleep(1) | |
status = {"status": True} | |
# Sends multipart message to subscriber | |
await socket.send_string(appliance, flags=zmq.SNDMORE) | |
await socket.send_json(status) | |
async def get_messages(): | |
""" | |
Runs the subscriber in a coroutine without a Poller. | |
""" | |
# Creates a socket instance | |
context = Context.instance() | |
socket = context.socket(zmq.SUB) | |
# Binds the socket to a predefined port on localhost | |
socket.bind(f"tcp://{host}:{port}") | |
# Subscribes to the coffee maker and toaster topic | |
socket.subscribe("COFFEE MAKER") | |
socket.subscribe("TOASTER") | |
while True: | |
topic = await socket.recv_string() | |
status = await socket.recv_json() | |
print(f"Topic: {topic} => {status}") | |
async def get_messages_poll(): | |
""" | |
Runs the subscriber in a coroutine with a Poller. | |
""" | |
# Creates a socket instance | |
context = Context.instance() | |
socket = context.socket(zmq.SUB) | |
# Binds the socket to a predefined port on localhost | |
socket.bind(f"tcp://{host}:{port}") | |
# Subscribes to the coffee maker and toaster topic | |
socket.subscribe("COFFEE MAKER") | |
socket.subscribe("TOASTER") | |
poller = Poller() | |
poller.register(socket, zmq.POLLIN) | |
while True: | |
evts = await poller.poll(timeout=100) | |
if socket in dict(evts): | |
topic = await socket.recv_string() | |
status = await socket.recv_json() | |
print(f"Topic: {topic} => {status}") | |
loop = asyncio.get_event_loop() | |
# Run get_messages_poll, update_smart_mirror concurrently | |
loop.run_until_complete( | |
asyncio.wait([ | |
get_messages_poll(), | |
update_smart_mirror("COFFEE MAKER"), | |
update_smart_mirror("TOASTER") | |
])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment