Skip to content

Instantly share code, notes, and snippets.

@Nick011
Created July 16, 2021 18:50
Show Gist options
  • Save Nick011/6db12a03468701b0733ce8014ccc8eda to your computer and use it in GitHub Desktop.
Save Nick011/6db12a03468701b0733ce8014ccc8eda to your computer and use it in GitHub Desktop.
import faust
import zmq
import zmq.asyncio
from faust.types import ChannelT, EventT
from typing import (
Mapping,
TypeVar,
no_type_check,
)
T = TypeVar('T')
class ZMQChannel(faust.Channel):
def __init__(self, host, *args, **kwargs):
self.host = host
super().__init__(*args, **kwargs)
self.ctx = zmq.asyncio.Context()
self.sock = None
if self.is_iterator and self.host:
self.bind_socket()
def __str__(self) -> str:
return f'<ZMQChannel: {self.host}>'
def __aiter__(self) -> ChannelT:
if self.is_iterator:
return self
else:
return self.clone(is_iterator=True)
@no_type_check # incompatible with base class, but OK
def _clone_args(self) -> Mapping:
return {
**super()._clone_args(),
**{
'host': self.host
}
}
async def __anext__(self) -> EventT[T]:
if not self.is_iterator:
raise RuntimeError('Need to call channel.__aiter__()')
# can recv_json, recv_string, recv_pyobj
return await self.sock.recv_string()
async def send(self, *args, **kwargs):
raise NotImplementedError('ZMQChannel only supports consumers at this time')
async def _send_now(self, *args, **kwargs):
raise NotImplementedError('ZMQChannel only supports consumers at this time')
async def publish_message(self, *args, **kwargs):
raise NotImplementedError('ZMQChannel only supports consumers at this time')
def bind_socket(self) -> None:
"""Declare/create this channel.
This is used to create this channel on a server,
if that is required to operate it.
"""
self.sock = self.ctx.socket(zmq.PULL)
self.sock.bind(self.host)
def clone(self, *args, **kwargs) -> ChannelT[T]:
zmq_channel = super().clone(*args, **kwargs)
#zmq_channel.bind_socket()
return zmq_channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment