Skip to content

Instantly share code, notes, and snippets.

@vasilakisfil
Last active August 17, 2022 16:28
Show Gist options
  • Save vasilakisfil/c8f7530fd3a0674376b7bf4abc49cce6 to your computer and use it in GitHub Desktop.
Save vasilakisfil/c8f7530fd3a0674376b7bf4abc49cce6 to your computer and use it in GitHub Desktop.
Duplex channel using tokio constructs
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::{PollSendError, PollSender};
use std::pin::Pin;
use std::task::{Poll, Context};
pub struct Duplex<T: Send + 'static> {
channel1: Channel<T>,
channel2: Channel<T>,
}
impl<T: Send + 'static> Duplex<T> {
pub fn new() -> Self {
let (tx1, recv1) = mpsc::channel::<T>(100);
let (tx2, recv2) = mpsc::channel::<T>(100);
Self {
channel1: (tx1, recv2).into(),
channel2: (tx2, recv1).into(),
}
}
pub fn split(self) -> (Channel<T>, Channel<T>) {
(self.channel1, self.channel2)
}
}
pub struct Channel<T: Send + 'static> {
sink: PollSender<T>,
stream: ReceiverStream<T>,
}
impl<T: Send + 'static> futures::Stream for Channel<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl<T: Send + 'static> futures::Sink<T> for Channel<T> {
type Error = PollSendError<T>;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
impl<T: Send + 'static> From<(Sender<T>, Receiver<T>)> for Channel<T> {
fn from((sink, stream): (Sender<T>, Receiver<T>)) -> Self {
Self {
sink: PollSender::new(sink),
stream: ReceiverStream::new(stream),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment