Skip to content

Instantly share code, notes, and snippets.

@ernestas-poskus
Forked from Darksonn/killswitch.rs
Created February 7, 2019 14:39
Show Gist options
  • Save ernestas-poskus/232b08693e3d8629d250c3517028b86f to your computer and use it in GitHub Desktop.
Save ernestas-poskus/232b08693e3d8629d250c3517028b86f to your computer and use it in GitHub Desktop.
Kill switch for hyper sockets
#[derive(Clone)]
pub struct KillSwitch {
kill: Arc<AtomicBool>,
}
impl KillSwitch {
pub fn new() -> Self {
KillSwitch {
kill: Arc::new(AtomicBool::new(false)),
}
}
/// Kill all clones of this KillSwitch. Note that this replaces self
/// with a new KillSwitch, meaning that futher calls to `clone` won't
/// contain a killed connection.
pub fn kill(&mut self) {
self.kill.store(true, Ordering::Relaxed);
self.kill = Arc::new(AtomicBool::new(false));
}
pub fn is_killed(&self) -> bool {
self.kill.load(Ordering::Relaxed)
}
}
pub struct ConnWrapper {
conn: Option<Connection<TcpStream, Service>>,
kill: KillSwitch,
timeout: Interval,
kill_on_timeout: bool,
timeouts: usize,
}
impl ConnWrapper {
pub fn wrap(conn: Connection<TcpStream, Service>, kill: &KillSwitch) -> Self {
ConnWrapper {
conn: Some(conn),
kill: kill.clone(),
kill_on_timeout: false,
timeout: Interval::new_interval(Duration::from_millis(1000)),
timeouts: 0,
}
}
}
impl Future for ConnWrapper {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<()>, ()> {
match self.timeout.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(_)) => {
if self.kill_on_timeout {
self.conn = None;
return Ok(Async::Ready(()));
} else {
task::current().notify();
self.timeouts += 1;
}
},
Err(err) => {
error!("timer error: {}", err);
self.conn = None;
return Err(());
},
}
if !self.kill_on_timeout {
if self.timeouts > 1000 || self.kill.is_killed() {
/// We should kill this connection. Start a 100 ms timeout and kill
/// it if it doesn't finish before the timeout.
self.timeout = Interval::new_interval(Duration::from_millis(100));
self.kill_on_timeout = true;
if let Some(conn) = &mut self.conn {
conn.graceful_shutdown();
}
task::current().notify();
return Ok(Async::NotReady);
}
}
match &mut self.conn {
Some(conn) => match conn.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(())) => Ok(Async::Ready(())),
Err(err) => {
if err.is_closed() || err.is_canceled() || err.is_connect() {
/// Just ignore it if the client closes the connection.
Ok(Async::Ready(()))
} else {
if let Some(inner) = err.cause2() {
if let Some(ioerr) = inner.downcast_ref::<std::io::Error>() {
let kind = ioerr.kind();
if kind == ErrorKind::ConnectionReset {
/// Just ignore it if the client closes the connection.
return Ok(Async::Ready(()));
} else if kind == ErrorKind::ConnectionAborted {
/// Just ignore it if the client closes the connection.
return Ok(Async::Ready(()));
} else {
error!("socket error: {} (kind: {:?})", err, kind);
return Err(());
}
}
}
error!("socket error: {}", err);
Err(())
}
},
},
None => {
Ok(Async::Ready(()))
},
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment