Created
April 7, 2017 04:27
-
-
Save pentlander/a2b90421d3f8b9502bca169a844696da to your computer and use it in GitHub Desktop.
Gossip Tokio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate serde_derive; | |
extern crate serde_json; | |
extern crate bytes; | |
extern crate futures; | |
extern crate tokio_io; | |
extern crate tokio_core; | |
extern crate tokio_timer; | |
use std::str; | |
use std::io; | |
use std::io::Write; | |
use std::net::{SocketAddr, IpAddr, ToSocketAddrs}; | |
use std::time::Duration; | |
use std::sync::Arc; | |
use std::iter; | |
use futures::stream; | |
use futures::future; | |
use futures::{Future, BoxFuture}; | |
use futures::sink::{Sink, BoxSink}; | |
use futures::stream::{Stream, BoxStream}; | |
use tokio_core::reactor::{Core, Handle}; | |
use tokio_core::net::{UdpSocket, UdpCodec, UdpFramed}; | |
use tokio_timer::Timer; | |
type MemberList = Vec<Member>; | |
#[derive(Clone)] | |
struct GossiperState { | |
member_list: MemberList, | |
dead_list: MemberList, | |
} | |
#[derive(Clone)] | |
struct Gossiper { | |
state: Arc<GossiperState>, | |
gossip_time: u64, | |
fail_time: u64, | |
} | |
impl Gossiper { | |
fn new(addr: SocketAddr) -> Gossiper { | |
let member = Member::new(addr); | |
Gossiper { | |
state: Arc::new(GossiperState { | |
member_list: vec![member], | |
dead_list: vec![], | |
}), | |
gossip_time: 1000, | |
fail_time: 10_000, | |
} | |
} | |
fn run(&mut self, core: &mut Core) { | |
let handle = core.handle(); | |
let socket = UdpSocket::bind(&self.get_member().addr, &handle).unwrap(); | |
let (sink, stream) = socket.framed(JsonCodec).split(); | |
//let () = sink; | |
handle.spawn(sink.send_all(self.heartbeat2()).then(|_| Ok(()))); | |
core.run(self.serve(Box::new(stream))).unwrap(); | |
} | |
fn get_member(&self) -> &Member { | |
&self.state.member_list[0] | |
} | |
fn heartbeat2(&mut self) -> BoxStream<(SocketAddr, Message), io::Error> { | |
let timer = Timer::default(); | |
let state_stream = stream::unfold(self.state.clone(), |state| { | |
//let () = state; | |
Some(Ok((state.clone(), state))) | |
}); | |
let test = timer.interval(Duration::from_millis(self.gossip_time)).zip(state_stream) | |
.map(|(_, state)| { | |
//let () = state; | |
state.member_list.clone() | |
}) | |
.then(|result| { | |
let msg = ("0.0.0.0:2115".parse().unwrap(), Message::new(result.unwrap())); | |
println!("Writing bytes"); | |
Ok::<_, io::Error>(msg) | |
}); | |
test.boxed() | |
} | |
fn serve(&self, stream: BoxStream<(SocketAddr, Message), io::Error>) -> BoxFuture<(), ()> { | |
stream.then(|result| match result { | |
Ok((addr, msg)) => { println!("{:?}", msg); Ok(()) } | |
_ => Ok(()) | |
}).for_each(|_| Ok(())).boxed() | |
} | |
} | |
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] | |
struct Member { | |
addr: SocketAddr, | |
heartbeat: u32, | |
} | |
impl Member { | |
fn new(addr: SocketAddr) -> Member { | |
Member { | |
addr: addr, | |
heartbeat: 0, | |
} | |
} | |
} | |
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] | |
struct Message { | |
member_list: Vec<Member>, | |
} | |
impl Message { | |
fn new(member_list: Vec<Member>) -> Message { | |
Message { | |
member_list: member_list, | |
} | |
} | |
} | |
struct JsonCodec; | |
impl UdpCodec for JsonCodec { | |
type In = (SocketAddr, Message); | |
type Out = (SocketAddr, Message); | |
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { | |
serde_json::from_slice(buf) | |
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err)) | |
} | |
fn encode(&mut self, (addr, msg): Self::Out, buf: &mut Vec<u8>) -> SocketAddr { | |
let ser = serde_json::to_vec(&msg).expect("Unable to encode message"); | |
buf.extend(ser); | |
addr | |
} | |
} | |
pub struct LineCodec; | |
impl UdpCodec for LineCodec { | |
type In = String; | |
type Out = String; | |
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> { | |
Ok(str::from_utf8(buf).unwrap().to_owned()) | |
} | |
fn encode(&mut self, msg: Self::Out, into: &mut Vec<u8>) -> SocketAddr { | |
let _ = into.write_all(msg.as_bytes()).unwrap(); | |
println!("Wrote bytes"); | |
let addr = "0.0.0.0:2115".parse().unwrap(); | |
addr | |
} | |
} | |
fn main() { | |
let mut event_loop = Core::new().unwrap(); | |
let addr = "0.0.0.0:2116".parse().unwrap(); | |
let mut g = Gossiper::new(addr); | |
g.run(&mut event_loop); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment