Created
July 28, 2018 14:13
-
-
Save jkilpatr/902955c0890df97b61ee604266c43b47 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use actix::prelude::*; | |
use actix::{Actor, Context, StreamHandler}; | |
use byteorder::{BigEndian, ReadBytesExt}; | |
use bytes::BufMut; | |
use bytes::BytesMut; | |
use failure::Error; | |
use futures::future::ok; | |
use futures::future::FutureResult; | |
use futures::stream::SplitSink; | |
use futures::Future; | |
use futures::{Sink, Stream}; | |
use rita_common::tunnel_manager::{GetListen, TunnelManager}; | |
use settings::RitaCommonSettings; | |
use std::io; | |
use std::io::Cursor; | |
use std::net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6, UdpSocket}; | |
use tokio::net::UdpFramed; | |
use tokio::net::UdpSocket as TokioUdpSocket; | |
use tokio::reactor::Reactor; | |
use tokio_codec::{Decoder, Encoder}; | |
use rita_common::rita_loop::Tick; | |
use KI; | |
use SETTING; | |
#[derive(Debug, Fail)] | |
pub enum PeerListenerError { | |
#[fail(display = "Could not connect to Tunnel Manager")] | |
PeerListenerInterfaceError, | |
} | |
pub struct HelloCodec; | |
// Random header to verify | |
pub const MSG_IM_HERE: u32 = 0x5b6d4158; | |
impl Decoder for HelloCodec { | |
type Item = Ipv6Addr; | |
type Error = io::Error; | |
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Ipv6Addr>, io::Error> { | |
if buf.is_empty() { | |
trace!("Recieved an empty ImHere packet!"); | |
return Ok(None); | |
} | |
let mut pointer = Cursor::new(&buf); | |
let packet_magic = pointer.read_u32::<BigEndian>()?; | |
if packet_magic != MSG_IM_HERE { | |
trace!( | |
"Recieved an ImHere packet with an invalid magic: {:?}", | |
packet_magic | |
); | |
return Ok(None); | |
} | |
let packet_size = pointer.read_u16::<BigEndian>()?; | |
if packet_size != buf.len() as u16 { | |
trace!( | |
"Recieved an ImHere packet with an invalid size: {:?}", | |
packet_size | |
); | |
return Ok(None); | |
} | |
let peer_address_bytes = pointer.read_u128::<BigEndian>()?; | |
let mut peer_address_arr: [u16; 8] = [0xFFFF; 8]; | |
for i in 7..0 { | |
peer_address_arr[i] = (peer_address_bytes >> (i * 16) | 0xFFFF) as u16; | |
} | |
let peer_address = Ipv6Addr::new( | |
peer_address_arr[7], | |
peer_address_arr[6], | |
peer_address_arr[5], | |
peer_address_arr[4], | |
peer_address_arr[3], | |
peer_address_arr[2], | |
peer_address_arr[1], | |
peer_address_arr[0], | |
); | |
if peer_address.is_unspecified() | |
|| peer_address.is_loopback() | |
|| peer_address.is_multicast() | |
{ | |
trace!( | |
"Recieved a valid ImHere with an invalid ip address: {:?}", | |
peer_address, | |
); | |
return Ok(None); | |
} | |
Ok(Some(peer_address)) | |
} | |
} | |
impl Encoder for HelloCodec { | |
type Item = Ipv6Addr; | |
type Error = io::Error; | |
fn encode(&mut self, addr: Ipv6Addr, buf: &mut BytesMut) -> Result<(), io::Error> { | |
buf.reserve(22); | |
buf.put_u32_be(MSG_IM_HERE); | |
buf.put_u16_be(22); | |
let ipaddr_bytes: [u8; 16] = addr.octets(); | |
for i in 0..15 { | |
buf.put_u8(ipaddr_bytes[i]); | |
} | |
Ok(()) | |
} | |
} | |
fn encode_im_here(addr: Ipv6Addr) -> Vec<u8> { | |
let mut buf = Vec::new(); | |
buf.put_u32_be(MSG_IM_HERE); | |
buf.put_u16_be(22); | |
let ipaddr_bytes: [u8; 16] = addr.octets(); | |
for i in 0..15 { | |
buf.put_u8(ipaddr_bytes[i]); | |
} | |
buf | |
} | |
pub struct PeerListener { | |
sinks: Vec<(SplitSink<UdpFramed<HelloCodec>>, u32, String)>, | |
} | |
impl Default for PeerListener { | |
fn default() -> PeerListener { | |
PeerListener::new().unwrap() | |
} | |
} | |
impl Supervised for PeerListener {} | |
impl SystemService for PeerListener { | |
/// Creates a new instance of the peer listener actor by requesting a list of listen interfaces | |
/// from tunnel manager and then binding to the broadcast port on all listen interfaces. It is | |
/// then setup to be woken when any of these listen interfaces gets a ImHere packet to the | |
/// discovery port and broadcast address | |
fn service_started(&mut self, ctx: &mut Context<Self>) { | |
info!("PeerListener starting"); | |
let disc_ip = Ipv6Addr::new(0xff02, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x8); | |
let interfaces = SETTING.get_network().peer_interfaces.clone(); | |
// The streams will be attached to the actor and the sinks retained to send | |
// messages | |
let mut streams = Vec::new(); | |
let mut sinks = Vec::new(); | |
let port = SETTING.get_network().rita_hello_port; | |
//TODO piggy back this off of the Actix system Arbiter when that firms up some | |
let event_loop = Reactor::new().unwrap(); | |
let iface_list = interfaces; | |
for iface in iface_list.iter() { | |
trace!("Binding to {:?} for PeerListener", iface); | |
// Lookup interface index | |
let iface_index = if KI.get_iface_index(&iface).is_ok() { | |
KI.get_iface_index(&iface).unwrap() | |
} else { | |
0 | |
}; | |
// Bond to multicast discovery address on each listen port | |
let multicast_socket = SocketAddrV6::new(disc_ip, port.into(), 0, iface_index); | |
let link_local_multicast_socket = UdpSocket::bind(multicast_socket) | |
.expect("Failed to bind to peer discovery address!"); | |
let res = link_local_multicast_socket.set_multicast_loop_v6(true); | |
trace!("Set link local multicast loop with {:?}", res); | |
let res = link_local_multicast_socket.join_multicast_v6(&disc_ip, iface_index); | |
trace!("Set link local multicast v6 with {:?}", res); | |
let tokio_socket = | |
TokioUdpSocket::from_std(link_local_multicast_socket, &event_loop.handle()) | |
.unwrap(); | |
let (sink, stream) = UdpFramed::new(tokio_socket, HelloCodec).split(); | |
streams.push(stream); | |
sinks.push((sink, iface_index, iface.clone())); | |
} | |
// Setup streams from each interface to notify the peer listener actor | |
for item in streams { | |
ctx.add_stream(item.map(|(sender, data)| ImHerePacket::new(sender, data))); | |
} | |
for item in sinks { | |
self.sinks.push(item); | |
} | |
} | |
} | |
impl PeerListener { | |
pub fn new() -> Result<PeerListener, Error> { | |
Ok(PeerListener { sinks: Vec::new() }) | |
} | |
} | |
impl Handler<Tick> for PeerListener { | |
type Result = Result<(), Error>; | |
fn handle(&mut self, _: Tick, _ctx: &mut Context<Self>) -> Self::Result { | |
let port = SETTING.get_network().rita_hello_port; | |
trace!("About to send ImHere"); | |
let disc_ip = Ipv6Addr::new(0xff02, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x8); | |
for sink in self.sinks.iter_mut() { | |
trace!("Sending ImHere to {:?}", &sink.2); | |
let link_ip = KI.get_link_local_device_ip(&sink.2); | |
if link_ip.is_err() { | |
continue; | |
} | |
let link_ip = link_ip.unwrap(); | |
let source_socketaddr = SocketAddrV6::new(link_ip, port.into(), 0, sink.1); | |
let source_socket = UdpSocket::bind(source_socketaddr).expect(&format!( | |
"Failed to bind to link local address {:?} on {:?}", | |
link_ip, sink.1 | |
)); | |
let multicast_socketaddr = SocketAddrV6::new( | |
"ff02:0:0:0:0:0:1:8".parse().unwrap(), | |
port.into(), | |
0, | |
sink.1, | |
); | |
let res = source_socket.set_multicast_loop_v6(true); | |
trace!("Set link local multicast loop with {:?}", res); | |
let res = source_socket.join_multicast_v6(&disc_ip, sink.1); | |
trace!("Set link local multicast v6 with {:?}", res); | |
let result = | |
source_socket.send_to(&encode_im_here(link_ip.clone()), multicast_socketaddr); | |
trace!("Sending ImHere to broadcast gets {:?}", result); | |
let mut buf: [u8; 1500] = [0; 1500]; | |
let res = source_socket.recv_from(&mut buf); | |
trace!("Trying to read from the sending socket {:?}", res); | |
trace!("Data read from socket {:?}", buf.to_vec()); | |
} | |
Ok(()) | |
} | |
} | |
impl Actor for PeerListener { | |
type Context = Context<Self>; | |
} | |
#[derive(Message, Clone)] | |
pub struct ImHerePacket { | |
addr: Ipv6Addr, | |
socket: SocketAddr, | |
} | |
impl ImHerePacket { | |
pub fn new(addr: Ipv6Addr, con: SocketAddr) -> ImHerePacket { | |
ImHerePacket { | |
addr: addr, | |
socket: con, | |
} | |
} | |
} | |
impl StreamHandler<ImHerePacket, io::Error> for PeerListener { | |
fn handle(&mut self, msg: ImHerePacket, _: &mut Context<Self>) { | |
println!("Getting ImHere Packet: ({:?}, {:?})", msg.addr, msg.socket); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment