Skip to content

Instantly share code, notes, and snippets.

Created March 10, 2018 16:40
Show Gist options
  • Save AljoschaMeyer/203b9300973c0332e3188c2504327e39 to your computer and use it in GitHub Desktop.
Save AljoschaMeyer/203b9300973c0332e3188c2504327e39 to your computer and use it in GitHub Desktop.
What ps could look like if LocalExecutor::spawn_local didn't require a 'static bound.
//! Implements the [packet-stream protocol]( in rust.
extern crate atm_async_utils;
extern crate futures_core;
extern crate futures_executor;
extern crate futures_sink;
extern crate futures_io;
extern crate futures_util;
extern crate multi_producer_sink;
extern crate multi_consumer_stream;
extern crate packet_stream_codec;
extern crate async_ringbuffer;
use std::cell::RefCell;
use std::i32::MAX;
use std::rc::Rc;
use std::error::Error;
use std::fmt::{self, Display, Formatter};
use atm_async_utils::SendClose;
use futures_core::{Future, Stream, Poll};
use futures_core::Async::{Ready, Pending};
use futures_core::task::Context;
use futures_io::{AsyncRead, AsyncWrite, Error as IoError};
use futures_sink::Sink;
use futures_executor::LocalExecutor;
use futures_util::{FutureExt, SinkExt};
use futures_util::sink::{Send, close};
use multi_producer_sink::{mps, MPS, Done as MPSDone};
use multi_consumer_stream::*;
use packet_stream_codec::{PacketId, CodecSink, CodecStream, TYPE_BINARY, TYPE_STRING, TYPE_JSON,
END, STREAM, Metadata as CodecMetadata};
/// An enumeration representing the different types a packet can have.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PacketType {
/// Raw binary data.
/// A utf-8 encoded string.
/// A valid piece of json.
impl PacketType {
fn flags(&self) -> u8 {
match *self {
PacketType::Binary => TYPE_BINARY,
PacketType::String => TYPE_STRING,
PacketType::Json => TYPE_JSON,
/// The metadata associated with a packet.
/// The packet id is an implementation detail of packet-stream and is thus not
/// exposed.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Metadata {
/// The type of the packet.
pub packet_type: PacketType,
/// Whether the packet has the end/error flag set.
pub is_end: bool,
impl Metadata {
/// Create new `Metadata`.
pub fn new(packet_type: PacketType, is_end: bool) -> Metadata {
Metadata {
fn flags(&self) -> u8 {
if self.is_end {
self.packet_type.flags() | END
} else {
fn from_decoded_metadata(metadata: CodecMetadata) -> Metadata {
let packet_type = if metadata.is_buffer_packet() {
} else if metadata.is_string_packet() {
} else if metadata.is_json_packet() {
} else {
Metadata {
is_end: metadata.is_end_packet(),
fn to_codec_metadata(self, id: PacketId) -> CodecMetadata {
CodecMetadata {
flags: self.flags(),
/// A future that emits the wrapped writer of a packet-stream once the outgoing half of the stream
/// has been fully closed, it has errored, or once all references to it have been dropped.
pub struct Done<W, B>(MPSDone<CodecSink<W, B>>);
impl<W, B> Future for Done<W, B> {
type Item = W;
type Error = W;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
match self.0.poll(cx) {
Ok(Ready(sink)) => Ok(Ready(sink.into_inner())),
Ok(Pending) => Ok(Pending),
Err(sink) => Err(sink.into_inner()),
/// Take ownership of an AsyncRead and an AsyncWrite to create the two halves of a packet-stream,
/// as well as a future for notification when the read-half of the packet-stream has been closed,
/// errored or is not referenced anymore.
/// `R` is the `AsyncRead` for reading bytes from the peer, `W` is the
/// `AsyncWrite` for writing bytes to the peer, and `B` is the type that is used
/// as input for sending data.
pub fn packet_stream<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>>
(r: R,
w: W,
executor: LocalExecutor)
-> (PsIn<R, W, B>, PsOut<R, W, B>, Done<W, B>) {
let (shared, closed) = Shared::new(r, w, executor);
let shared = Rc::new(RefCell::new(shared));
(PsIn(Rc::clone(&shared)), PsOut(shared), closed)
/// Shared state between a PsIn, PsOut and requests, duplexes, etc.
struct Shared<R, W, B>
where R: AsyncRead
sink: MPS<CodecSink<W, B>>,
stream: MCS<CodecStream<R>,
fn(&(Box<[u8]>, CodecMetadata)) -> PacketId,
fn(&IoError) -> PacketId>,
// The id used for the next actively sent packet.
id_counter: PacketId,
// The next id to be accepted as an active packet from the peer.
accepted_id: PacketId,
executor: LocalExecutor,
fn get_id(item: &(Box<[u8]>, CodecMetadata)) -> PacketId {
fn const_zero(err: &IoError) -> PacketId {
impl<R, W, B> Shared<R, W, B>
where R: AsyncRead,
W: AsyncWrite,
B: AsRef<[u8]>
fn new(r: R, w: W, executor: LocalExecutor) -> (Shared<R, W, B>, Done<W, B>) {
let (sink, done) = mps(CodecSink::new(w));
(Shared {
stream: MCS::new(CodecStream::new(r), get_id, const_zero),
id_counter: 1,
accepted_id: 1,
fn next_id(&mut self) -> PacketId {
let ret = self.id_counter;
return ret;
fn increment_id(&mut self) {
if self.id_counter == MAX {
self.id_counter = 1;
} else {
self.id_counter += 1
fn increment_accepted(&mut self) {
if self.accepted_id == MAX {
self.accepted_id = 1;
} else {
self.accepted_id += 1
fn poll_next(&mut self,
cx: &mut Context)
-> Poll<Option<(Box<[u8]>, Metadata, IncomingPacket<R, W, B>)>, IoError> {
match try_ready!( {
Some((data, metadata)) => {
if == self.accepted_id {
if metadata.is_stream_packet() {
let sink_id = * -1;
let stream_id =;
Ok(Ready(Some((data, Metadata::from_decoded_metadata(metadata), IncomingPacket::Duplex(PsSink::new(self.sink.clone(), sink_id),
} else {
} else {
// Packet is neither an incoming request nor does it open
// a new stream, so ignore it
None => Ok(Ready(None)),
/// A stream of incoming requests from the peer.
pub struct PsIn<R: AsyncRead, W, B>(Rc<RefCell<Shared<R, W, B>>>);
impl<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>> Stream for PsIn<R, W, B> {
/// The payload of the packet that triggered this emission, the metadata of
/// the packet, and a way of interacting with the peer.
type Item = (Box<[u8]>, Metadata, IncomingPacket<R, W, B>);
type Error = IoError;
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>, Self::Error> {
/// Allows sending packets to the peer.
pub struct PsOut<R: AsyncRead, W, B>(Rc<RefCell<Shared<R, W, B>>>);
impl<R, W, B> PsOut<R, W, B>
where R: AsyncRead,
W: AsyncWrite,
B: AsRef<[u8]>
/// Obtain a future for sending a request to the peer.
pub fn request(&self, data: B, t: PacketType) -> Request<R, W, B> {
let shared_clone = Rc::clone(&self.0);
let mut ps = self.0.borrow_mut();
let id = ps.next_id();
Request::new(shared_clone, data, t, id)
// (Request::new(ps.sink.clone(), data, t, id),
// Response::new( * -1)))
/// Create a bidirectional channel multiplexed over the underlying
/// `AsyncRead`/`AsyncWrite` pair.
pub fn duplex(&self) -> (PsSink<W, B>, PsStream<R>) {
let mut ps = self.0.borrow_mut();
let id = ps.next_id();
(PsSink::new(ps.sink.clone(), id), PsStream::new( * -1)))
/// Close the packet-stream, indicating that no more packets will be sent.
/// This does not immediately close if there are still unfinished
/// `Request`s, `OutResponse`s or `PsSink`s. In that case, the closing
/// happens when the last of them finishes.
/// The error contains a `None` if an `Request`, `OutResponse` or
/// `PsSink` errored previously.
pub fn close(self) -> ClosePs<R, W, B> {
/// Future for closing the packet-stream, indicating that no more packets will be sent.
/// This does not immediately close if there are still unfinished
/// `Request`s, `OutResponse`s or `PsSink`s. In that case, the closing
/// happens when the last of them finishes.
/// The error contains a `None` if an `Request`, `OutResponse` or
/// `PsSink` errored previously.
pub struct ClosePs<R: AsyncRead, W, B>(Rc<RefCell<Shared<R, W, B>>>);
impl<R, W, B> Future for ClosePs<R, W, B>
where R: AsyncRead,
W: AsyncWrite,
B: AsRef<[u8]>
type Item = ();
type Error = Option<IoError>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
/// An incoming packet, initiated by the peer.
/// The enum variants carry values that allow interacting with the peer.
pub enum IncomingPacket<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>> {
/// An incoming request. You get an `InRequest`, the peer got an
/// `Request` and an `Response`.
Request(InRequest<W, B>),
/// A duplex connection initiated by the peer. Both peers get a `PsSink` and
/// a `PsStream`.
Duplex(PsSink<W, B>, PsStream<R>),
/// The sink half of a duplex multiplexed over the packet-stream.
pub struct PsSink<W: AsyncWrite, B: AsRef<[u8]>> {
sink: MPS<CodecSink<W, B>>,
id: PacketId,
impl<W, B> PsSink<W, B>
where W: AsyncWrite,
B: AsRef<[u8]>
fn new(sink: MPS<CodecSink<W, B>>, id: PacketId) -> PsSink<W, B> {
PsSink { sink, id }
impl<W, B> Sink for PsSink<W, B>
where W: AsyncWrite,
B: AsRef<[u8]>
type SinkItem = (B, Metadata);
/// The error contains a `None` if an `Request`, `OutResponse` or
/// `PsSink` errored previously.
type SinkError = Option<IoError>;
fn poll_ready(&mut self, cx: &mut Context) -> Poll<(), Self::SinkError> {
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
let flags = item.1.flags() | STREAM;
.start_send((item.0, CodecMetadata { flags, id: }))
fn poll_flush(&mut self, cx: &mut Context) -> Poll<(), Self::SinkError> {
fn poll_close(&mut self, cx: &mut Context) -> Poll<(), Self::SinkError> {
/// An error indicating what happend on a connection.
pub enum ConnectionError {
/// The underlying connection has been closed.
/// An error occured on reading.
impl Display for ConnectionError {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
match *self {
ConnectionError::ClosedEarly => write!(f, "ConnectionError::Closed"),
ConnectionError::Errored(err) => write!(f, "ConnectionError::Errored: {}", err),
impl Error for ConnectionError {
fn description(&self) -> &str {
match *self {
ConnectionError::ClosedEarly => "Peer closed its write-half of the packet-stream",
ConnectionError::Errored(err) => err.description(),
impl From<futures_io::Error> for ConnectionError {
fn from(err: IoError) -> ConnectionError {
type StreamHandle<R> = MCSHandle<CodecStream<R>,
fn(&(Box<[u8]>, CodecMetadata)) -> PacketId,
fn(&IoError) -> PacketId>;
/// The stream half of a duplex multiplexed over the packet-stream.
pub struct PsStream<R: AsyncRead> {
stream: StreamHandle<R>,
impl<R: AsyncRead> PsStream<R> {
fn new(stream: StreamHandle<R>) -> PsStream<R> {
PsStream { stream }
/// Note that the stream never emits `Ok(None)`.
impl<R: AsyncRead> Stream for PsStream<R> {
type Item = (Box<[u8]>, Metadata);
/// If the peer closes the packet-stream, this emits a
/// `ConnectionError::Closed`. If an error happens/happened on the underlying
/// `AsyncRead`, this emits a `ConnectionError::Errored`.
type Error = ConnectionError;
/// Note that the stream never emits `Ok(None)`.
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Self::Item>, Self::Error> {
match {
Ok(Ready(Some((data, codec_metadata)))) => {
Ok(Ready((Some((data, Metadata::from_decoded_metadata(codec_metadata))))))
Ok(Ready(None)) => Err(ConnectionError::ClosedEarly),
Ok(Pending) => Ok(Pending),
Err(err) => Err(err.into()),
/// A request initated by the peer. Drop to ignore it, or use `respond` to send
/// a response.
pub struct InRequest<W, B> {
sink: MPS<CodecSink<W, B>>,
id: PacketId,
impl<W, B> InRequest<W, B> {
fn new(sink: MPS<CodecSink<W, B>>, id: PacketId) -> InRequest<W, B> {
InRequest { sink, id }
impl<W: AsyncWrite, B: AsRef<[u8]>> InRequest<W, B> {
/// Returns a Future which completes once the given packet has been sent to
/// the peer.
pub fn respond(self, data: B, metadata: Metadata) -> OutResponse<W, B> {
OutResponse::new(self.sink, * -1, data, metadata)
/// An outgoing request, initated by this packet-stream.
/// Poll it to actually start sending the request.
pub struct Request<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>> {
shared: Rc<RefCell<Shared<R, W, B>>>,
send: Send<MPS<CodecSink<W, B>>>,
id: PacketId,
impl<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>> Request<R, W, B> {
fn new(shared: Rc<RefCell<Shared<R, W, B>>>,
data: B,
t: PacketType,
id: PacketId)
-> Request<R, W, B> {
Request {
send: shared
CodecMetadata {
flags: t.flags(),
impl<R: AsyncRead, W: AsyncWrite, B: AsRef<[u8]>> Future for Request<R, W, B> {
type Item = Response<R>;
/// The error contains a `None` if an `Request`, `OutResponse` or
/// `PsSink` errored previously.
type Error = Option<IoError>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
let sink = try_ready!(self.send.poll(cx));
let ps = self.shared.borrow_mut();
.spawn_local(close(sink).then(|res| match res {
Ok(sink) => Ok(()),
Err(_) => Ok(()),
.expect("got SpawnError");
Ok(Ready(Response::new( * -1))))
/// A response that will be received from the peer.
pub struct Response<R: AsyncRead>(StreamHandle<R>);
impl<R: AsyncRead> Response<R> {
fn new(stream: StreamHandle<R>) -> Response<R> {
impl<R: AsyncRead> Future for Response<R> {
type Item = (Box<[u8]>, Metadata);
/// If the peer closes the packet-stream, this emits a
/// `ConnectionError::Closed`. If an error happens/happened on the underlying
/// `AsyncRead`, this emits a `ConnectionError::Errored`.
type Error = ConnectionError;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
match self.0.poll_next(cx) {
Ok(Ready(Some((data, codec_metadata)))) => {
Ok(Ready(((data, Metadata::from_decoded_metadata(codec_metadata)))))
Ok(Ready(None)) => Err(ConnectionError::ClosedEarly),
Ok(Pending) => Ok(Pending),
Err(err) => Err(err.into()),
/// Future that completes when the response has been sent to the peer.
pub struct OutResponse<W: AsyncWrite, B: AsRef<[u8]>>(SendClose<MPS<CodecSink<W, B>>>);
impl<W: AsyncWrite, B: AsRef<[u8]>> OutResponse<W, B> {
fn new(sink: MPS<CodecSink<W, B>>,
id: PacketId,
data: B,
metadata: Metadata)
-> OutResponse<W, B> {
debug_assert!(id < 0);
OutResponse(SendClose::new(sink, (data, metadata.to_codec_metadata(id))))
impl<W: AsyncWrite, B: AsRef<[u8]>> Future for OutResponse<W, B> {
type Item = ();
/// The error contains a `None` if an `Request`, `OutResponse` or
/// `PsSink` errored previously.
type Error = Option<IoError>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
let _ = try_ready!(self.0.poll(cx));
mod tests {
use super::*;
use partial_io::{PartialAsyncRead, PartialAsyncWrite, PartialWithErrors};
use partial_io::quickcheck_types::GenInterruptedWouldBlock;
use quickcheck::{QuickCheck, StdGen};
use async_ringbuffer::*;
use rand;
use futures::stream::iter_ok;
use futures::future::ok;
fn requests() {
let rng = StdGen::new(rand::thread_rng(), 20);
let mut quickcheck = QuickCheck::new().gen(rng).tests(1000);
quickcheck.quickcheck(test_requests as
-> bool);
fn test_requests(buf_size_a: usize,
buf_size_b: usize,
write_ops_a: PartialWithErrors<GenInterruptedWouldBlock>,
read_ops_a: PartialWithErrors<GenInterruptedWouldBlock>,
write_ops_b: PartialWithErrors<GenInterruptedWouldBlock>,
read_ops_b: PartialWithErrors<GenInterruptedWouldBlock>)
-> bool {
let (writer_a, reader_a) = ring_buffer(buf_size_a + 1);
let writer_a = PartialAsyncWrite::new(writer_a, write_ops_a);
let reader_a = PartialAsyncRead::new(reader_a, read_ops_a);
let (writer_b, reader_b) = ring_buffer(buf_size_b + 1);
let writer_b = PartialAsyncWrite::new(writer_b, write_ops_b);
let reader_b = PartialAsyncRead::new(reader_b, read_ops_b);
let (a_in, a_out, _) = packet_stream(reader_a, writer_b);
let (b_in, b_out, _) = packet_stream(reader_b, writer_a);
let echo = b_in.for_each(|(data, metadata, in_packet)| match in_packet {
IncomingPacket::Request(in_request) => {
.respond(data, metadata)
.map_err(|_| unreachable!())
IncomingPacket::Duplex(_, _) => unreachable!(),
.and_then(|_| b_out.close().map_err(|_| unreachable!()));
let consume_a = a_in.for_each(|_| ok(()));
let (req0, res0) = a_out.request([0], PacketType::Binary);
let (req1, res1) = a_out.request([1], PacketType::Binary);
let (req2, res2) = a_out.request([2], PacketType::Binary);
let send_all = req0.join3(req1, req2).and_then(|_| a_out.close());
let receive_all = res0.join3(res1, res2)
.map(|((r0_data, r0_meta), (r1_data, r1_meta), (r2_data, r2_meta))| {
return r0_data == vec![0u8].into_boxed_slice() &&
r0_meta.packet_type == PacketType::Binary &&
r1_data == vec![1u8].into_boxed_slice() &&
r1_meta.packet_type == PacketType::Binary &&
r2_data == vec![2u8].into_boxed_slice() &&
r2_meta.packet_type == PacketType::Binary;
return echo.join4(consume_a.map_err(|_| unreachable!()),
send_all.map_err(|_| unreachable!()),
receive_all.map_err(|_| unreachable!()))
.map(|(_, _, _, worked)| worked)
fn duplexes() {
let rng = StdGen::new(rand::thread_rng(), 20);
let mut quickcheck = QuickCheck::new().gen(rng).tests(100);
quickcheck.quickcheck(test_duplexes as
-> bool);
fn test_duplexes(buf_size_a: usize,
buf_size_b: usize,
write_ops_a: PartialWithErrors<GenInterruptedWouldBlock>,
read_ops_a: PartialWithErrors<GenInterruptedWouldBlock>,
write_ops_b: PartialWithErrors<GenInterruptedWouldBlock>,
read_ops_b: PartialWithErrors<GenInterruptedWouldBlock>)
-> bool {
let (writer_a, reader_a) = ring_buffer(buf_size_a + 1);
let writer_a = PartialAsyncWrite::new(writer_a, write_ops_a);
let reader_a = PartialAsyncRead::new(reader_a, read_ops_a);
let (writer_b, reader_b) = ring_buffer(buf_size_b + 1);
let writer_b = PartialAsyncWrite::new(writer_b, write_ops_b);
let reader_b = PartialAsyncRead::new(reader_b, read_ops_b);
let (a_in, a_out, _) = packet_stream(reader_a, writer_b);
let (b_in, b_out, _) = packet_stream(reader_b, writer_a);
let non_end = Metadata::new(PacketType::Binary, false);
let end = Metadata::new(PacketType::Binary, true);
let echo = b_in.map_err(|_| unreachable!())
.for_each(|(_, _, in_packet)| match in_packet {
IncomingPacket::Request(_) => unreachable!(),
IncomingPacket::Duplex(sink, stream) => {
.map_err::<Option<IoError>, _>(|_| None)
.take_while(|&(_, metadata)| ok(!metadata.is_end))
.map(|(data, _)| (data, non_end))
.map(|_| ())
.and_then(|_| b_out.close().map_err(|_| unreachable!()));
let consume_a = a_in.for_each(|_| ok(()));
let (sink0_a, stream0_a) = a_out.duplex();
let (sink1_a, stream1_a) = a_out.duplex();
let (sink2_a, stream2_a) = a_out.duplex();
let send_0 = sink0_a.send_all(iter_ok::<_, IoError>(vec![(vec![0], non_end),
(vec![0], non_end),
(vec![42], end)]));
let send_1 = sink1_a.send_all(iter_ok::<_, IoError>(vec![(vec![1], non_end),
(vec![1], non_end),
(vec![1], non_end),
(vec![43], end)]));
let send_2 = sink2_a.send_all(iter_ok::<_, IoError>(vec![(vec![2], non_end),
(vec![2], non_end),
(vec![2], non_end),
(vec![2], non_end),
(vec![44], end)]));
let send_all = send_0.join3(send_1, send_2).and_then(|_| a_out.close());
let receive_0 = stream0_a
.fold(false, |_, (data, _)| ok(data == vec![0].into_boxed_slice()));
let receive_1 = stream1_a
|acc, (data, _)| ok(acc && data == vec![1].into_boxed_slice()));
let receive_2 = stream2_a
|acc, (data, _)| ok(acc && data == vec![2].into_boxed_slice()));
let receive_all = receive_0
.join3(receive_1, receive_2)
.map(|(a, b, c)| a && b && c);
return echo.join4(consume_a.map_err(|_| unreachable!()),
send_all.map_err(|_| unreachable!()),
receive_all.map_err(|_| unreachable!()))
.map(|(_, _, _, worked)| worked)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment