Skip to content

Instantly share code, notes, and snippets.

@andreivasiliu
Created February 3, 2019 22:01
Show Gist options
  • Save andreivasiliu/aa3ec2e18aa378e068e6ee05ca419702 to your computer and use it in GitHub Desktop.
Save andreivasiliu/aa3ec2e18aa378e068e6ee05ca419702 to your computer and use it in GitHub Desktop.
use bytes::{Bytes, BytesMut, BufMut};
use futures::future::{self, Either};
use futures::sync::mpsc;
use futures::Poll;
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;
use tokio::codec::{FramedRead, LinesCodec};
use tokio::io::{write_all, AsyncRead, ReadHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::{Async, Future, Stream};
use tokio::runtime::current_thread::Runtime;
type Tx = mpsc::UnboundedSender<Bytes>;
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
impl Default for Shared {
fn default() -> Self {
Shared {
peers: HashMap::default(),
}
}
}
struct Peer {
name: String,
lines: FramedRead<ReadHalf<TcpStream>, LinesCodec>,
// output: WriteHalf<TcpStream>,
state: Rc<RefCell<Shared>>,
address: SocketAddr,
}
impl Peer {
fn new(
name: String,
state: Rc<RefCell<Shared>>,
address: SocketAddr,
lines: FramedRead<ReadHalf<TcpStream>, LinesCodec>,
) -> Self {
Peer {
name,
lines,
state,
address,
}
}
}
impl Drop for Peer {
fn drop(&mut self) {
self.state.borrow_mut().peers.remove(&self.address);
}
}
impl Future for Peer {
type Item = ();
type Error = tokio::io::Error;
fn poll(&mut self) -> Poll<(), tokio::io::Error> {
while let Async::Ready(line) = self.lines.poll()? {
eprintln!("Received line ({:?}) : {:?}", self.name, line);
if let Some(message) = line {
let mut line = BytesMut::new();
line.put(&self.name);
line.put(": ");
line.put(&message);
line.put("\r\n");
let line = line.freeze();
for peer_tx in self.state.borrow_mut().peers.values_mut() {
peer_tx.unbounded_send(line.clone()).unwrap();
}
} else {
return Ok(Async::Ready(()));
}
}
Ok(Async::NotReady)
}
}
// SecBug: 16MB might be too much if we allow >200 connections
fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
let address = socket.peer_addr().unwrap();
let (read_stream, write_stream) = socket.split();
let line_codec = LinesCodec::new_with_max_length(16 * 1024 * 1024);
let lines = FramedRead::new(read_stream, line_codec);
let (tx, rx) = mpsc::unbounded();
state.borrow_mut().peers.insert(address, tx);
let sender = rx
.fold(write_stream, |output, bytes| {
write_all(output, bytes)
.map_err(|err| eprintln!("error = {:?}", err))
.and_then(|(output, _bytes)| future::ok(output))
})
.map(|_write_stream| ());
tokio::runtime::current_thread::spawn(sender);
let task = lines
.into_future()
.map_err(|(err, _stuff)| err)
.and_then(move |(name, lines)| {
let name = match name {
Some(name) => name,
None => return Either::A(future::ok(())),
};
eprintln!("{:?} is joining the chat", name);
let peer = Peer::new(name, state, address, lines);
Either::B(peer)
})
.map_err(|err| {
eprintln!("connection error = {:?}", err);
});
tokio::runtime::current_thread::spawn(task);
}
pub fn main_loop() {
let listening_address = "127.0.0.1:1523".parse().unwrap();
let listener = TcpListener::bind(&listening_address).unwrap();
let state = Rc::new(RefCell::new(Shared::default()));
let server = listener
.incoming()
.for_each(move |socket| {
process(socket, state.clone());
Ok(())
})
.map_err(|err| {
eprintln!("accept error = {:?}", err);
});
eprintln!("starting server on {:?}", listening_address);
let mut runtime = Runtime::new().unwrap();
runtime.block_on(server).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment