Skip to content

Instantly share code, notes, and snippets.

@jkilpatr
Created July 28, 2018 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkilpatr/902955c0890df97b61ee604266c43b47 to your computer and use it in GitHub Desktop.
Save jkilpatr/902955c0890df97b61ee604266c43b47 to your computer and use it in GitHub Desktop.
use actix::prelude::*;
use actix::{Actor, Context, StreamHandler};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::BufMut;
use bytes::BytesMut;
use failure::Error;
use futures::future::ok;
use futures::future::FutureResult;
use futures::stream::SplitSink;
use futures::Future;
use futures::{Sink, Stream};
use rita_common::tunnel_manager::{GetListen, TunnelManager};
use settings::RitaCommonSettings;
use std::io;
use std::io::Cursor;
use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6, UdpSocket};
use tokio::net::UdpFramed;
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio::reactor::Reactor;
use tokio_codec::{Decoder, Encoder};
use rita_common::rita_loop::Tick;
use KI;
use SETTING;
#[derive(Debug, Fail)]
pub enum PeerListenerError {
#[fail(display = "Could not connect to Tunnel Manager")]
PeerListenerInterfaceError,
}
pub struct HelloCodec;
// Random header to verify
pub const MSG_IM_HERE: u32 = 0x5b6d4158;
impl Decoder for HelloCodec {
type Item = Ipv6Addr;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Ipv6Addr>, io::Error> {
if buf.is_empty() {
trace!("Recieved an empty ImHere packet!");
return Ok(None);
}
let mut pointer = Cursor::new(&buf);
let packet_magic = pointer.read_u32::<BigEndian>()?;
if packet_magic != MSG_IM_HERE {
trace!(
"Recieved an ImHere packet with an invalid magic: {:?}",
packet_magic
);
return Ok(None);
}
let packet_size = pointer.read_u16::<BigEndian>()?;
if packet_size != buf.len() as u16 {
trace!(
"Recieved an ImHere packet with an invalid size: {:?}",
packet_size
);
return Ok(None);
}
let peer_address_bytes = pointer.read_u128::<BigEndian>()?;
let mut peer_address_arr: [u16; 8] = [0xFFFF; 8];
for i in 7..0 {
peer_address_arr[i] = (peer_address_bytes >> (i * 16) | 0xFFFF) as u16;
}
let peer_address = Ipv6Addr::new(
peer_address_arr[7],
peer_address_arr[6],
peer_address_arr[5],
peer_address_arr[4],
peer_address_arr[3],
peer_address_arr[2],
peer_address_arr[1],
peer_address_arr[0],
);
if peer_address.is_unspecified()
|| peer_address.is_loopback()
|| peer_address.is_multicast()
{
trace!(
"Recieved a valid ImHere with an invalid ip address: {:?}",
peer_address,
);
return Ok(None);
}
Ok(Some(peer_address))
}
}
impl Encoder for HelloCodec {
type Item = Ipv6Addr;
type Error = io::Error;
fn encode(&mut self, addr: Ipv6Addr, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(22);
buf.put_u32_be(MSG_IM_HERE);
buf.put_u16_be(22);
let ipaddr_bytes: [u8; 16] = addr.octets();
for i in 0..15 {
buf.put_u8(ipaddr_bytes[i]);
}
Ok(())
}
}
fn encode_im_here(addr: Ipv6Addr) -> Vec<u8> {
let mut buf = Vec::new();
buf.put_u32_be(MSG_IM_HERE);
buf.put_u16_be(22);
let ipaddr_bytes: [u8; 16] = addr.octets();
for i in 0..15 {
buf.put_u8(ipaddr_bytes[i]);
}
buf
}
pub struct PeerListener {
sinks: Vec<(SplitSink<UdpFramed<HelloCodec>>, u32, String)>,
}
impl Default for PeerListener {
fn default() -> PeerListener {
PeerListener::new().unwrap()
}
}
impl Supervised for PeerListener {}
impl SystemService for PeerListener {
/// Creates a new instance of the peer listener actor by requesting a list of listen interfaces
/// from tunnel manager and then binding to the broadcast port on all listen interfaces. It is
/// then setup to be woken when any of these listen interfaces gets a ImHere packet to the
/// discovery port and broadcast address
fn service_started(&mut self, ctx: &mut Context<Self>) {
info!("PeerListener starting");
let disc_ip = Ipv6Addr::new(0xff02, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x8);
let interfaces = SETTING.get_network().peer_interfaces.clone();
// The streams will be attached to the actor and the sinks retained to send
// messages
let mut streams = Vec::new();
let mut sinks = Vec::new();
let port = SETTING.get_network().rita_hello_port;
//TODO piggy back this off of the Actix system Arbiter when that firms up some
let event_loop = Reactor::new().unwrap();
let iface_list = interfaces;
for iface in iface_list.iter() {
trace!("Binding to {:?} for PeerListener", iface);
// Lookup interface index
let iface_index = if KI.get_iface_index(&iface).is_ok() {
KI.get_iface_index(&iface).unwrap()
} else {
0
};
// Bond to multicast discovery address on each listen port
let multicast_socket = SocketAddrV6::new(disc_ip, port.into(), 0, iface_index);
let link_local_multicast_socket = UdpSocket::bind(multicast_socket)
.expect("Failed to bind to peer discovery address!");
let res = link_local_multicast_socket.set_multicast_loop_v6(true);
trace!("Set link local multicast loop with {:?}", res);
let res = link_local_multicast_socket.join_multicast_v6(&disc_ip, iface_index);
trace!("Set link local multicast v6 with {:?}", res);
let tokio_socket =
TokioUdpSocket::from_std(link_local_multicast_socket, &event_loop.handle())
.unwrap();
let (sink, stream) = UdpFramed::new(tokio_socket, HelloCodec).split();
streams.push(stream);
sinks.push((sink, iface_index, iface.clone()));
}
// Setup streams from each interface to notify the peer listener actor
for item in streams {
ctx.add_stream(item.map(|(sender, data)| ImHerePacket::new(sender, data)));
}
for item in sinks {
self.sinks.push(item);
}
}
}
impl PeerListener {
pub fn new() -> Result<PeerListener, Error> {
Ok(PeerListener { sinks: Vec::new() })
}
}
impl Handler<Tick> for PeerListener {
type Result = Result<(), Error>;
fn handle(&mut self, _: Tick, _ctx: &mut Context<Self>) -> Self::Result {
let port = SETTING.get_network().rita_hello_port;
trace!("About to send ImHere");
let disc_ip = Ipv6Addr::new(0xff02, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x8);
for sink in self.sinks.iter_mut() {
trace!("Sending ImHere to {:?}", &sink.2);
let link_ip = KI.get_link_local_device_ip(&sink.2);
if link_ip.is_err() {
continue;
}
let link_ip = link_ip.unwrap();
let source_socketaddr = SocketAddrV6::new(link_ip, port.into(), 0, sink.1);
let source_socket = UdpSocket::bind(source_socketaddr).expect(&format!(
"Failed to bind to link local address {:?} on {:?}",
link_ip, sink.1
));
let multicast_socketaddr = SocketAddrV6::new(
"ff02:0:0:0:0:0:1:8".parse().unwrap(),
port.into(),
0,
sink.1,
);
let res = source_socket.set_multicast_loop_v6(true);
trace!("Set link local multicast loop with {:?}", res);
let res = source_socket.join_multicast_v6(&disc_ip, sink.1);
trace!("Set link local multicast v6 with {:?}", res);
let result =
source_socket.send_to(&encode_im_here(link_ip.clone()), multicast_socketaddr);
trace!("Sending ImHere to broadcast gets {:?}", result);
let mut buf: [u8; 1500] = [0; 1500];
let res = source_socket.recv_from(&mut buf);
trace!("Trying to read from the sending socket {:?}", res);
trace!("Data read from socket {:?}", buf.to_vec());
}
Ok(())
}
}
impl Actor for PeerListener {
type Context = Context<Self>;
}
#[derive(Message, Clone)]
pub struct ImHerePacket {
addr: Ipv6Addr,
socket: SocketAddr,
}
impl ImHerePacket {
pub fn new(addr: Ipv6Addr, con: SocketAddr) -> ImHerePacket {
ImHerePacket {
addr: addr,
socket: con,
}
}
}
impl StreamHandler<ImHerePacket, io::Error> for PeerListener {
fn handle(&mut self, msg: ImHerePacket, _: &mut Context<Self>) {
println!("Getting ImHere Packet: ({:?}, {:?})", msg.addr, msg.socket);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment