Skip to content

Instantly share code, notes, and snippets.

@MathieuDuponchelle
Created March 16, 2022 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MathieuDuponchelle/046ac18340bfb0d23fe548d543951e8b to your computer and use it in GitHub Desktop.
Save MathieuDuponchelle/046ac18340bfb0d23fe548d543951e8b to your computer and use it in GitHub Desktop.
use async_std::task;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::ready;
use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::collections::HashMap;
pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct Controller {
#[pin]
stream: Option<Box<dyn Stream<Item=(String, IncomingMessage)> + Unpin + Send>>,
items: VecDeque<(String, OutgoingMessage)>,
producers: HashMap<String, String>,
}
}
#[derive(serde_derive::Deserialize, serde_derive::Serialize)]
pub enum IncomingMessage {
Foo,
}
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)]
pub enum OutgoingMessage {
Bar,
Baz,
}
impl Controller {
fn new(
stream: Option<Box<dyn Stream<Item = (String, IncomingMessage)> + Unpin + Send>>,
) -> Self {
Self {
stream,
items: VecDeque::new(),
producers: HashMap::new(),
}
}
pub fn handle(
mut self: Pin<&mut Self>,
peer_id: &str,
msg: IncomingMessage,
) -> VecDeque<(String, OutgoingMessage)> {
let mut ret = VecDeque::new();
let this = self.as_mut().project();
this.producers.insert("peer".to_string(), "peer".to_string());
match msg {
IncomingMessage::Foo => {
ret.push_back((peer_id.to_string(), OutgoingMessage::Bar));
ret.push_back((peer_id.to_string(), OutgoingMessage::Baz));
}
}
ret
}
}
impl Stream for Controller {
type Item = (String, OutgoingMessage);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let this = self.as_mut().project();
if let Some(item) = this.items.pop_front() {
break Poll::Ready(Some(item));
}
if let Some(stream) = this.stream.as_pin_mut() {
match ready!(stream.poll_next(cx)) {
Some((peer_id, msg)) => {
let items = self.as_mut().handle(&peer_id, msg);
let this = self.as_mut().project();
let _ = std::mem::replace(this.items, items);
}
None => {
break Poll::Ready(None);
}
}
} else {
break Poll::Ready(None);
}
}
}
}
struct Server {
tx: Option<mpsc::Sender<(String, String)>>,
receive_task_handle: Option<task::JoinHandle<()>>,
}
impl Server {
fn spawn<
I: for<'a> Deserialize<'a>,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, I)> + Send>>) -> St,
St: Stream,
>(
factory: Factory,
) -> Self
where
St::Item: Serialize + std::fmt::Debug,
St: Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel::<(String, String)>(1000);
let mut handler =
factory(Box::pin(rx.map(|(peer_id, msg)| {
(peer_id, serde_json::from_str::<I>(&msg).unwrap())
})));
let receive_task_handle = task::spawn(async move {
while let Some(msg) = handler.next().await {
eprintln!("Got message: {:?}", msg);
}
});
Self {
tx: Some(tx),
receive_task_handle: Some(receive_task_handle),
}
}
async fn accept(&mut self) {
if let Some(mut tx) = self.tx.clone() {
tx.send((
"peer".to_string(),
serde_json::to_string(&MyIncomingMessage::Base(IncomingMessage::Foo))
.unwrap(),
))
.await
.unwrap();
}
}
}
impl Drop for Server {
fn drop(&mut self) {
let receive_task_handle = self.receive_task_handle.take();
if let Some(mut tx) = self.tx.take() {
task::block_on(async move {
tx.close_channel();
if let Some(handle) = receive_task_handle {
handle.await
}
});
}
}
}
fn main() {
let mut server = Server::spawn(|stream| MyController::new(Some(Box::new(stream))));
task::block_on(server.accept());
}
pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct MyController {
#[pin]
stream: Option<Box<dyn Stream<Item=(String, MyIncomingMessage)> + Unpin + Send>>,
#[pin]
base: Controller,
items: VecDeque<(String, MyOutgoingMessage)>,
}
}
#[derive(serde_derive::Deserialize, serde_derive::Serialize)]
pub enum MyIncomingMessageInner {
Babar,
}
#[derive(serde_derive::Deserialize, serde_derive::Serialize)]
pub enum MyIncomingMessage {
Base(IncomingMessage),
Custom(MyIncomingMessageInner),
}
#[derive(serde_derive::Deserialize, serde_derive::Serialize, Debug)]
pub enum MyOutgoingMessageInner {
FooFoo,
}
#[derive(Debug, serde_derive::Serialize, serde_derive::Deserialize)]
pub enum MyOutgoingMessage {
Base(OutgoingMessage),
Custom(MyOutgoingMessageInner),
}
impl MyController {
fn new(stream: Option<Box<dyn Stream<Item = (String, MyIncomingMessage)> + Unpin + Send>>) -> Self {
let base = Controller::new(None);
Self {
stream,
base,
items: VecDeque::new(),
}
}
pub fn handle(
self: Pin<&mut Self>,
peer_id: &str,
msg: MyIncomingMessage,
) -> VecDeque<(String, MyOutgoingMessage)> {
let this = self.project();
let mut ret = VecDeque::new();
match msg {
MyIncomingMessage::Base(msg) => {
ret = this
.base
.handle(peer_id, msg)
.drain(..)
.map(|(peer_id, msg)| (peer_id, MyOutgoingMessage::Base(msg)))
.collect();
}
MyIncomingMessage::Custom(msg) => match msg {
MyIncomingMessageInner::Babar => {
ret.push_back((
peer_id.to_string(),
MyOutgoingMessage::Custom(MyOutgoingMessageInner::FooFoo),
));
}
},
}
ret
}
}
impl Stream for MyController {
type Item = (String, MyOutgoingMessage);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let this = self.as_mut().project();
if let Some(item) = this.items.pop_front() {
break Poll::Ready(Some(item));
}
if let Some(stream) = this.stream.as_pin_mut() {
match ready!(stream.poll_next(cx)) {
Some((peer_id, msg)) => {
let items = self.as_mut().handle(&peer_id, msg);
let this = self.as_mut().project();
let _ = std::mem::replace(this.items, items);
}
None => {
break Poll::Ready(None);
}
}
} else {
break Poll::Ready(None);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment