Skip to content

Instantly share code, notes, and snippets.

@SpComb
Created September 29, 2014 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SpComb/09132a0524126c59771c to your computer and use it in GitHub Desktop.
Save SpComb/09132a0524126c59771c to your computer and use it in GitHub Desktop.
threading -> asyncio queue
import asyncio
import queue
import socket
class Error (Exception):
pass
Full = queue.Full
class Queue:
"""
An asyncio-thread queue using AF_UNIX sockets.
A thread can put() arbitrary objects.
These are placed on a threadsafe in-memory queue, and a single byte marker is written to a socketpair.
An asyncio task can get() arbitrary objects, blocking until there is something available.
"""
def __init__ (self):
self.queue = queue.Queue()
self.read, self.write = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
self.read.setblocking(False)
self.write.setblocking(True)
def put (self, msg):
"""
Raises Full if the queue becomes full. Aye.
"""
self.queue.put(msg)
# signal
if not self.write.send(b'.'):
raise Full
@asyncio.coroutine
def get (self):
loop = asyncio.get_event_loop()
out = yield from loop.sock_recv(self.read, 1)
try:
msg = self.queue.get(block=False)
except queue.Empty:
raise Error("queue desync")
return msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment