Skip to content

Instantly share code, notes, and snippets.

@minrk
Created October 31, 2024 11:51
Show Gist options
  • Save minrk/cd0db83f99106afa875d5b6c87abf156 to your computer and use it in GitHub Desktop.
Save minrk/cd0db83f99106afa875d5b6c87abf156 to your computer and use it in GitHub Desktop.
import time
from socket import fromfd, AF_UNIX, SOCK_STREAM
from threading import Thread
import anyio
import zmq
async def async_recv(sock, flags=0):
if flags & zmq.NOBLOCK:
print("receiving immediately")
return sock.recv_multipart(flags)
# MUST check EVENTS before waiting
if sock.EVENTS & zmq.POLLIN:
# ready to send
print("ready to send")
return sock.recv_multipart(flags | zmq.NOBLOCK)
# wait on wait_readable
# in practice, we'll need an additional wake mechanism
# to be notified of other event-consumers
# I am not sure what this fromfd should be on Windows,
# but probably not exactly this
sock_fd = fromfd(sock.FD, AF_UNIX, SOCK_STREAM)
while not sock.EVENTS & zmq.POLLIN:
# should actually handle rcvtimeo timeout
print("receiver waiting")
await anyio.wait_socket_readable(sock_fd)
# should succeed based on EVENTS
print("receiver waited, should be ready")
return sock.recv_multipart(flags | zmq.NOBLOCK)
def send_later(socket):
"""Send later from a thread
To test waiting on wait_socket_readable
"""
time.sleep(2)
print("sender sending")
socket.send_multipart([b'abc', b'def'])
socket.close(linger=3000)
async def receiver_task(socket):
try:
await async_recv(socket, zmq.NOBLOCK)
except zmq.Again:
# expected
print("Again raised (good)")
else:
raise RuntimeError("should have raised zmq.Again")
recvd = await async_recv(socket)
print("received", recvd)
async def main():
ctx = zmq.Context()
a = ctx.socket(zmq.PUSH)
b = ctx.socket(zmq.PULL)
url = 'inproc://test'
a.bind(url)
b.connect(url)
send_thread = Thread(target=send_later, args=(a,), daemon=True)
send_thread.start()
async with anyio.create_task_group():
await receiver_task(b)
print("task group done")
if __name__ == "__main__":
anyio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment