Created
October 31, 2024 11:51
-
-
Save minrk/cd0db83f99106afa875d5b6c87abf156 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from socket import fromfd, AF_UNIX, SOCK_STREAM | |
from threading import Thread | |
import anyio | |
import zmq | |
async def async_recv(sock, flags=0): | |
if flags & zmq.NOBLOCK: | |
print("receiving immediately") | |
return sock.recv_multipart(flags) | |
# MUST check EVENTS before waiting | |
if sock.EVENTS & zmq.POLLIN: | |
# ready to send | |
print("ready to send") | |
return sock.recv_multipart(flags | zmq.NOBLOCK) | |
# wait on wait_readable | |
# in practice, we'll need an additional wake mechanism | |
# to be notified of other event-consumers | |
# I am not sure what this fromfd should be on Windows, | |
# but probably not exactly this | |
sock_fd = fromfd(sock.FD, AF_UNIX, SOCK_STREAM) | |
while not sock.EVENTS & zmq.POLLIN: | |
# should actually handle rcvtimeo timeout | |
print("receiver waiting") | |
await anyio.wait_socket_readable(sock_fd) | |
# should succeed based on EVENTS | |
print("receiver waited, should be ready") | |
return sock.recv_multipart(flags | zmq.NOBLOCK) | |
def send_later(socket): | |
"""Send later from a thread | |
To test waiting on wait_socket_readable | |
""" | |
time.sleep(2) | |
print("sender sending") | |
socket.send_multipart([b'abc', b'def']) | |
socket.close(linger=3000) | |
async def receiver_task(socket): | |
try: | |
await async_recv(socket, zmq.NOBLOCK) | |
except zmq.Again: | |
# expected | |
print("Again raised (good)") | |
else: | |
raise RuntimeError("should have raised zmq.Again") | |
recvd = await async_recv(socket) | |
print("received", recvd) | |
async def main(): | |
ctx = zmq.Context() | |
a = ctx.socket(zmq.PUSH) | |
b = ctx.socket(zmq.PULL) | |
url = 'inproc://test' | |
a.bind(url) | |
b.connect(url) | |
send_thread = Thread(target=send_later, args=(a,), daemon=True) | |
send_thread.start() | |
async with anyio.create_task_group(): | |
await receiver_task(b) | |
print("task group done") | |
if __name__ == "__main__": | |
anyio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment