Skip to content

Instantly share code, notes, and snippets.

@silvestrpredko
Created July 5, 2023 14:41
Show Gist options
  • Save silvestrpredko/2792c8d0d641bfb2fa2953e444e68923 to your computer and use it in GitHub Desktop.
Save silvestrpredko/2792c8d0d641bfb2fa2953e444e68923 to your computer and use it in GitHub Desktop.
ChannelTransport
pub struct ChannelTransport {
rx: UnboundedReceiver<Vec<u8>>,
tx: UnboundedSender<Vec<u8>>,
}
type TransportImpl = Transport<ChannelTransport, Vec<u8>, Vec<u8>, Bincode<Vec<u8>, Vec<u8>>>;
impl ChannelTransport {
pub fn new() -> (
TransportImpl,
UnboundedSender<Vec<u8>>,
UnboundedReceiver<Vec<u8>>,
) {
let (tx_in, rx_in) = unbounded_channel::<Vec<u8>>();
let (tx_out, rx_out) = unbounded_channel::<Vec<u8>>();
let transport = serde_transport::new(
Framed::new(
Self {
rx: rx_in,
tx: tx_out,
},
LengthDelimitedCodec::new(),
),
Bincode::default(),
);
(transport, tx_in, rx_out)
}
}
impl AsyncWrite for ChannelTransport {
fn poll_write(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
std::task::Poll::Ready(
self.tx
.send(buf.to_vec())
.map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))
.map(|_| buf.len()),
)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
std::task::Poll::Ready(Ok(()))
}
}
impl AsyncRead for ChannelTransport {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let poll_res = self.get_mut().rx.poll_recv(cx);
match poll_res {
std::task::Poll::Ready(Some(data)) => {
buf.put_slice(&data);
std::task::Poll::Ready(Ok(()))
}
std::task::Poll::Ready(None) => std::task::Poll::Ready(Ok(())),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment