Skip to content

Instantly share code, notes, and snippets.

Created January 11, 2018 00:02
Show Gist options
  • Save anonymous/1d591a67ae5cddcc118aa42eef04af39 to your computer and use it in GitHub Desktop.
Save anonymous/1d591a67ae5cddcc118aa42eef04af39 to your computer and use it in GitHub Desktop.
Rust code shared from the playground
extern crate futures;
extern crate tokio_core;
extern crate serde_json;
use std::cell::Cell;
use std::io;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::rc::Rc;
use futures::{Future, Stream, Sink};
use futures::sink::BoxSink;
use futures::stream;
use tokio_core::reactor::*;
use tokio_core::net::{UdpSocket, UdpCodec};
/*
This is intended to be a simplified example of what I am trying to do with UDP packets using
tokio_core. The problem is that my program basically requires for the send buffer to be available
to many objects so that they can all send packets. However, in order to send any packets, a call to wait() must be issued
in case the previous call had not finished flushing yet, and this would interrupt the event loop.
See more comments below.
*/
pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
into.extend(buf);
addr
}
}
struct Context {
/// A shared sink which can be used to send a packet
/// This needs to be an option for the small synchronous amount of time it takes to
/// swap in the returned sink
sink: Cell<Option<BoxSink<(SocketAddr, Vec<u8>), io::Error>>>
}
struct PeerHandler {
context: Rc<Context>,
addr: SocketAddr,
count: u64
}
impl PeerHandler {
pub fn new(context: Rc<Context>, addr: SocketAddr) -> PeerHandler {
PeerHandler {
context,
addr,
count: 0
}
}
pub fn handle_frame(&mut self, _msg: Vec<u8>) {
self.count += 1;
// this is where my problem is: how do you use this future without calling wait()?
// I call wait here so that the program is functional, but normally I think that would be bad in an event loop.
let newf = self.context.sink.replace(None).unwrap().send((self.addr, self.count.to_string().into_bytes())).wait().unwrap();
self.context.sink.set(Some(newf));
}
}
fn main() {
println!("Start");
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
// Bind both our sockets and then figure out what ports we got.
let a = UdpSocket::bind(&addr, &handle).unwrap();
let b = UdpSocket::bind(&addr, &handle).unwrap();
let c = UdpSocket::bind(&addr, &handle).unwrap();
let a_addr = a.local_addr().unwrap();
// We're parsing each socket with the `LineCodec` defined above, and then we
// `split` each codec into the sink/stream halves.
let (a_sink, a_stream) = a.framed(LineCodec).split();
let (b_sink, b_stream) = b.framed(LineCodec).split();
let (c_sink, c_stream) = c.framed(LineCodec).split();
// Set up the "server"
let context = Rc::new(Context {
sink: Cell::new(Some(Box::new(a_sink)))
});
let mut peers: HashMap<SocketAddr, PeerHandler> = HashMap::new();
let a = a_stream.for_each(move |(addr, msg)| {
println!("[a] recv: {}", String::from_utf8_lossy(&msg));
if !peers.contains_key(&addr) {
// new client
let p = PeerHandler::new(Rc::clone(&context), addr.clone());
peers.insert(addr.clone(), p);
}
peers.get_mut(&addr).unwrap().handle_frame(msg);
Ok(())
});
// Start off by sending a ping from b to a, which will increment a counter
// associated with the socket
let array = [
(a_addr, b"PING".to_vec()),
(a_addr, b"PING".to_vec()),
(a_addr, b"PING".to_vec()),
(a_addr, b"PING".to_vec()),
];
let st = stream::iter_ok::<_, io::Error>(array.iter().cloned());
let b = st.forward(b_sink).and_then(|_| {
b_stream.take(4).for_each(move |(addr, msg)| {
println!("[b] recv: {}", String::from_utf8_lossy(&msg));
Ok(())
}).then(|_| Ok(()))
});
// also start the c client, who will send less packets
let st = stream::iter_ok::<_, io::Error>(array.iter().take(2).cloned());
let c = st.forward(c_sink).and_then(|_| {
c_stream.take(2).for_each(move |(addr, msg)| {
println!("[c] recv: {}", String::from_utf8_lossy(&msg));
Ok(())
}).then(|_| Ok(()))
});
// Spawn the sender of pongs and then wait for our pinger to finish.
handle.spawn(a.then(|_| Ok(())));
core.run(b.join(c)).unwrap();
//core.run(c).unwrap();
println!("Successful Exit")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment