Skip to content

Instantly share code, notes, and snippets.

@Dansyuqri
Created May 9, 2020 09:55
Show Gist options
  • Save Dansyuqri/cb548fbf0eb489eaff27af3f172fd1ee to your computer and use it in GitHub Desktop.
Save Dansyuqri/cb548fbf0eb489eaff27af3f172fd1ee to your computer and use it in GitHub Desktop.
# multi_pub_coro.py
# This is a reimplementation of the https://gist.github.com/Dansyuqri/93b8df592946cf5ec2158aa10c13bbcc gist
# which allows for all functions to run asynnchronously using coroutines.
import asyncio
import time
import zmq
from zmq.asyncio import Context, Poller
host = "127.0.0.1"
port = "5001"
ctx = Context.instance()
async def update_smart_mirror(appliance):
socket = ctx.socket(zmq.PUB)
socket.connect(f"tcp://{host}:{port}")
await asyncio.sleep(1)
status = {"status": True}
# Sends multipart message to subscriber
await socket.send_string(appliance, flags=zmq.SNDMORE)
await socket.send_json(status)
async def get_messages():
"""
Runs the subscriber in a coroutine without a Poller.
"""
# Creates a socket instance
context = Context.instance()
socket = context.socket(zmq.SUB)
# Binds the socket to a predefined port on localhost
socket.bind(f"tcp://{host}:{port}")
# Subscribes to the coffee maker and toaster topic
socket.subscribe("COFFEE MAKER")
socket.subscribe("TOASTER")
while True:
topic = await socket.recv_string()
status = await socket.recv_json()
print(f"Topic: {topic} => {status}")
async def get_messages_poll():
"""
Runs the subscriber in a coroutine with a Poller.
"""
# Creates a socket instance
context = Context.instance()
socket = context.socket(zmq.SUB)
# Binds the socket to a predefined port on localhost
socket.bind(f"tcp://{host}:{port}")
# Subscribes to the coffee maker and toaster topic
socket.subscribe("COFFEE MAKER")
socket.subscribe("TOASTER")
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
evts = await poller.poll(timeout=100)
if socket in dict(evts):
topic = await socket.recv_string()
status = await socket.recv_json()
print(f"Topic: {topic} => {status}")
loop = asyncio.get_event_loop()
# Run get_messages_poll, update_smart_mirror concurrently
loop.run_until_complete(
asyncio.wait([
get_messages_poll(),
update_smart_mirror("COFFEE MAKER"),
update_smart_mirror("TOASTER")
]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment