Skip to content

Instantly share code, notes, and snippets.

@pentlander
Created April 7, 2017 04:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pentlander/a2b90421d3f8b9502bca169a844696da to your computer and use it in GitHub Desktop.
Save pentlander/a2b90421d3f8b9502bca169a844696da to your computer and use it in GitHub Desktop.
Gossip Tokio
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate bytes;
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate tokio_timer;
use std::str;
use std::io;
use std::io::Write;
use std::net::{SocketAddr, IpAddr, ToSocketAddrs};
use std::time::Duration;
use std::sync::Arc;
use std::iter;
use futures::stream;
use futures::future;
use futures::{Future, BoxFuture};
use futures::sink::{Sink, BoxSink};
use futures::stream::{Stream, BoxStream};
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed};
use tokio_timer::Timer;
type MemberList = Vec<Member>;
#[derive(Clone)]
struct GossiperState {
member_list: MemberList,
dead_list: MemberList,
}
#[derive(Clone)]
struct Gossiper {
state: Arc<GossiperState>,
gossip_time: u64,
fail_time: u64,
}
impl Gossiper {
fn new(addr: SocketAddr) -> Gossiper {
let member = Member::new(addr);
Gossiper {
state: Arc::new(GossiperState {
member_list: vec![member],
dead_list: vec![],
}),
gossip_time: 1000,
fail_time: 10_000,
}
}
fn run(&mut self, core: &mut Core) {
let handle = core.handle();
let socket = UdpSocket::bind(&self.get_member().addr, &handle).unwrap();
let (sink, stream) = socket.framed(JsonCodec).split();
//let () = sink;
handle.spawn(sink.send_all(self.heartbeat2()).then(|_| Ok(())));
core.run(self.serve(Box::new(stream))).unwrap();
}
fn get_member(&self) -> &Member {
&self.state.member_list[0]
}
fn heartbeat2(&mut self) -> BoxStream<(SocketAddr, Message), io::Error> {
let timer = Timer::default();
let state_stream = stream::unfold(self.state.clone(), |state| {
//let () = state;
Some(Ok((state.clone(), state)))
});
let test = timer.interval(Duration::from_millis(self.gossip_time)).zip(state_stream)
.map(|(_, state)| {
//let () = state;
state.member_list.clone()
})
.then(|result| {
let msg = ("0.0.0.0:2115".parse().unwrap(), Message::new(result.unwrap()));
println!("Writing bytes");
Ok::<_, io::Error>(msg)
});
test.boxed()
}
fn serve(&self, stream: BoxStream<(SocketAddr, Message), io::Error>) -> BoxFuture<(), ()> {
stream.then(|result| match result {
Ok((addr, msg)) => { println!("{:?}", msg); Ok(()) }
_ => Ok(())
}).for_each(|_| Ok(())).boxed()
}
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
struct Member {
addr: SocketAddr,
heartbeat: u32,
}
impl Member {
fn new(addr: SocketAddr) -> Member {
Member {
addr: addr,
heartbeat: 0,
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
struct Message {
member_list: Vec<Member>,
}
impl Message {
fn new(member_list: Vec<Member>) -> Message {
Message {
member_list: member_list,
}
}
}
struct JsonCodec;
impl UdpCodec for JsonCodec {
type In = (SocketAddr, Message);
type Out = (SocketAddr, Message);
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
serde_json::from_slice(buf)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))
}
fn encode(&mut self, (addr, msg): Self::Out, buf: &mut Vec<u8>) -> SocketAddr {
let ser = serde_json::to_vec(&msg).expect("Unable to encode message");
buf.extend(ser);
addr
}
}
pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = String;
type Out = String;
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
Ok(str::from_utf8(buf).unwrap().to_owned())
}
fn encode(&mut self, msg: Self::Out, into: &mut Vec<u8>) -> SocketAddr {
let _ = into.write_all(msg.as_bytes()).unwrap();
println!("Wrote bytes");
let addr = "0.0.0.0:2115".parse().unwrap();
addr
}
}
fn main() {
let mut event_loop = Core::new().unwrap();
let addr = "0.0.0.0:2116".parse().unwrap();
let mut g = Gossiper::new(addr);
g.run(&mut event_loop);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment