Skip to content

Instantly share code, notes, and snippets.

@aep
Created November 30, 2018 18:12
Show Gist options
  • Save aep/84a2732a11e844833974242225913242 to your computer and use it in GitHub Desktop.
Save aep/84a2732a11e844833974242225913242 to your computer and use it in GitHub Desktop.
use dns::DnsRecord;
use error::Error;
use clock;
use osaka::mio::net::UdpSocket;
use osaka::osaka;
use noise;
use proto;
use identity;
use keystore;
use std::time::{Duration};
use channel::{Channel, ChannelProgress, MAX_PACKET_SIZE};
use packet::{EncryptedPacket, RoutingKey};
use std::collections::HashMap;
use std::net::SocketAddr;
pub trait Driver {
fn new() -> Self;
}
enum AddressMode {
Discovering(HashMap<SocketAddr, (proto::path::Category, usize)>),
Established(SocketAddr, HashMap<SocketAddr, (proto::path::Category, usize)>),
}
struct UdpChannel {
chan: Channel,
addrs: AddressMode,
}
pub struct Endpoint<D: Driver> {
channels: HashMap<RoutingKey, UdpChannel>,
socket: UdpSocket,
driver: Driver,
}
impl<D: Driver> Endpoint<D> {
pub fn new(
noise: noise::Transport,
identity: identity::Identity,
socket: UdpSocket,
addr: SocketAddr,
) -> Self {
let mut channels = HashMap::new();
let debug_id = format!("{}::{}", noise.route(), identity);
channels.insert(noise.route(), UdpChannel{
chan: Channel::new(noise, debug_id),
addrs: AddressMode::Established(addr, HashMap::new()),
});
Self {
channels,
socket,
driver: Driver::new(),
}
}
#[osaka]
pub fn run(mut self) -> Result<(), Error> {
loop {
// receive one packet
let mut buf = vec![0; MAX_PACKET_SIZE];
match self.socket.recv_from(&mut buf) {
Err(e) => {
if e.kind() != std::io::ErrorKind::WouldBlock {
return Err(Error::Io(e));
}
}
Ok((len,from)) => {
match EncryptedPacket::decode(&buf[..len]) {
Err(e) => warn!("{}: {}", from, e),
Ok(pkt) => {
if let Some(chan) = self.channels.get_mut(&pkt.route) {
if let Err(e) = chan.chan.recv(pkt) {
warn!("{}: {}", from, e);
}
}
}
}
}
};
// work on all channels
let mut later = Duration::from_secs(600);
loop {
let mut again = false;
let mut killme = Vec::new();
for (route, chan) in &mut self.channels {
match chan.chan.progress()? {
ChannelProgress::Later(dur) => {
if dur < later {
later = dur;
}
}
ChannelProgress::SendPacket(pkt) => {
again = true;
match &chan.addrs {
AddressMode::Discovering(addrs) => {
for (addr, _) in addrs.iter() {
match self.socket.send_to(&pkt, addr) {
Ok(len) if len == pkt.len() => (),
e => trace!("send to {} didnt work {:?}", addr, e),
}
}
}
AddressMode::Established(addr, _) => match self.socket.send_to(&pkt, &addr) {
Ok(len) if len == pkt.len() => (),
e => error!("send didnt work {:?}", e),
},
}
}
ChannelProgress::ReceiveHeader(stream, frame) => {
again = true;
}
ChannelProgress::ReceiveStream(stream, frame) => {
again = true;
}
ChannelProgress::Close(stream) => {
again = true;
warn!("close");
}
ChannelProgress::Disconnect => {
warn!("disconnect");
killme.push(route.clone());
}
};
}
for killme in killme {
self.channels.remove(&killme);
}
if !again {
break;
}
}
yield osaka::Again::new(None, Some(later));
}
}
}
// -- builder
pub struct EndpointBuilder {
secret: identity::Secret,
}
impl EndpointBuilder {
pub fn new() -> Result<Self, Error> {
let s = keystore::Secrets::load()?;
Ok(Self {
secret: s.identity,
})
}
#[osaka]
pub fn connect<D: Driver> (self, poll: osaka::Poll, mut records: Vec<DnsRecord>)
-> Result<Endpoint<D>, Error>
{
loop {
let record = match records.pop() {
Some(v) => v,
None => return Err(Error::OutOfOptions),
};
info!("attempting connection with {}", &record.addr);
let timestamp = clock::dns_time(&record);
let (mut noise, pkt) = noise::initiate(Some(&record.x), &self.secret, timestamp)?;
let pkt = pkt.encode();
let sock = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).map_err(|e| Error::Io(e))?;
let token = poll
.register(&sock, mio::Ready::readable(), mio::PollOpt::level())
.unwrap();
let mut attempts = 0;
let r = loop {
attempts += 1;
if attempts > 4 {
break None;
}
let mut buf = vec![0; MAX_PACKET_SIZE];
if let Ok((len, _from)) = sock.recv_from(&mut buf) {
match EncryptedPacket::decode(&buf[..len]).and_then(|pkt| noise.recv_response(pkt)) {
Ok(identity) => {
let noise = noise.into_transport()?;
break Some((identity, noise));
}
Err(e) => {
warn!("EndpointFuture::WaitingForResponse: {}", e);
continue;
}
}
};
sock.send_to(&pkt, &record.addr)?;
yield osaka::Again::new(Some(token), Some(Duration::from_millis(2u64.pow(attempts) * 200)));
};
let (identity, noise) = match r {
Some(v) => v,
None => continue,
};
info!("established connection with {} :: {}", identity, noise.route());
return Ok(Endpoint::new(noise, identity, sock, record.addr));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment