Skip to content

Instantly share code, notes, and snippets.

@kalexmills
Last active January 17, 2020 22:09
Show Gist options
  • Save kalexmills/97cb8bb4828a93971c0678ebf27d4cef to your computer and use it in GitHub Desktop.
Save kalexmills/97cb8bb4828a93971c0678ebf27d4cef to your computer and use it in GitHub Desktop.
Rust: asyncronous CBOR channels over raw TCP using tokio-serde and serde_cbor.
/**
* Asynchronous CBOR message traffic via tokio / serde. More at https://cbor.io
*
* Test via `cargo run` and, in a separate terminal test using these messages.
*
* $ echo '00000017A2616364526561646161826568656C6C6F65776F726C64' | xxd -r -p | nc 127.0.0.1 6142
* $ echo '0000000FA1616D6B68656C6C6F20776F726C64' | xxd -r -p | nc 127.0.0.1 6142
*
* First 8 bytes are the length of the data frame, rest is the CBOR message.
*/
use futures::*;
use serde::{Deserialize, Serialize};
use tokio::net::{tcp, TcpListener, TcpStream};
use tokio_serde::SymmetricallyFramed;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:6142";
let mut listener: TcpListener = TcpListener::bind(addr).await.unwrap();
let server = async move {
let mut incoming = listener.incoming();
while let Some(socket_res) = incoming.next().await {
match socket_res {
Err(err) => println!("accept error = {:?}", err),
Ok(mut sock) => {
tokio::spawn(async move {
// split the socket and wrap the read / write halves in the protocol stack.
let (mut w_in, mut w_out) = framed_cbor_split::<Protocol>(&mut sock);
while let Some(msg) = w_in.try_next().await.unwrap() {
println!("Got: {:?}", msg);
w_out.send(msg).await.unwrap();
}
});
}
}
}
};
println!("Server running on localhost:6142");
// Start the server and block this async fn until `server` spins down.
server.await;
Ok(())
}
/**
* Protocol is a simple Rust enum describing possible messages crossing the channel.
*/
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)] // GOTCHA: serde(untagged) avoids extra fields but implies each message must have a unique shape
enum Protocol {
Chat {
#[serde(rename = "m")] // single byte names for serialization; nice names for code.
msg: String,
},
Command {
#[serde(rename = "c")]
cmd: Command,
#[serde(rename = "a")]
args: Vec<String>,
},
}
#[derive(Debug, Deserialize, Serialize)]
enum Command {
// TODO: currently serialized as strings. Can be made even more compact using serde(rename) as above.
Read,
Write,
Panic,
}
/**
* FramedCborRead is a wrapped socket that knows how to read length-framed CBOR values of type T
* serialized using Serde.
*/
type FramedCborRead<'a, T> =
SymmetricallyFramed<FramedRead<tcp::ReadHalf<'a>, LengthDelimitedCodec>, T, Cbor<T, T>>;
type FramedCborWrite<'a, T> =
SymmetricallyFramed<FramedWrite<tcp::WriteHalf<'a>, LengthDelimitedCodec>, T, Cbor<T, T>>;
/**
* framed_cbor_split wraps a TcpStream, producing a pair of wrapped protocols for reading and writing CBOR
* values of type T
*/
fn framed_cbor_split<'de, T: Deserialize<'de> + Serialize>(
sock: &'de mut TcpStream,
) -> (FramedCborRead<T>, FramedCborWrite<T>) {
let (read, write): (tcp::ReadHalf, tcp::WriteHalf) = sock.split();
(framed_cbor_read::<T>(read), framed_cbor_write::<T>(write))
}
/**
* framed_cbor_read wraps a TcpStream, producing a framed protocol suitable for reading CBOR values of type T.
*/
fn framed_cbor_read<'de, T: Deserialize<'de>>(sock: tcp::ReadHalf) -> FramedCborRead<T> {
let length_delimited = FramedRead::new(sock, LengthDelimitedCodec::new());
tokio_serde::SymmetricallyFramed::new(length_delimited, SymmetricalCbor::<T>::default())
}
/**
* framed_cbor_write wraps a TcpStream, producing a framed protocol suitable for writing CBOR values of type T.
*/
fn framed_cbor_write<T: Serialize>(sock: tcp::WriteHalf) -> FramedCborWrite<T> {
let length_delimited = FramedWrite::new(sock, LengthDelimitedCodec::new());
tokio_serde::SymmetricallyFramed::new(length_delimited, SymmetricalCbor::<T>::default())
}
// *** below is all tokio-serde boilerplate for setting up CBOR codecs
use bytes::{buf::BufExt, Bytes, BytesMut};
use derivative::Derivative;
use std::{io, marker::PhantomData, pin::Pin};
use tokio_serde::{Deserializer, Serializer};
type SymmetricalCbor<Item> = Cbor<Item, Item>;
#[derive(Derivative)]
#[derivative(Default(bound = ""))]
struct Cbor<Item, SinkItem> {
ghost: PhantomData<(Item, SinkItem)>,
}
impl<Item, SinkItem> Deserializer<Item> for Cbor<Item, SinkItem>
where
for<'a> Item: Deserialize<'a>,
{
type Error = io::Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> {
Ok(serde_cbor::from_reader(std::io::Cursor::new(src).reader())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?)
}
}
impl<Item, SinkItem> Serializer<SinkItem> for Cbor<Item, SinkItem>
where
SinkItem: Serialize,
{
type Error = io::Error;
fn serialize(self: Pin<&mut Self>, src: &SinkItem) -> Result<Bytes, Self::Error> {
Ok(serde_cbor::to_vec(src)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
.into())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment