Last active
April 17, 2020 23:41
-
-
Save leshow/8c5ffebba1368f6e465c3758d29837ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// ```bash | |
/// cargo run --release | |
/// # in another term | |
/// flame -p 9953 | |
/// # in another term | |
/// flame -p 9953 | |
/// ``` | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use tokio::net::udp; | |
use std::{io, net::SocketAddr, sync::Arc}; | |
use tokio::{net::UdpSocket, time}; | |
struct Service { | |
state: Arc<State>, | |
} | |
impl Service { | |
async fn listen_udp(self: Arc<Self>, addr: SocketAddr) -> io::Result<()> { | |
let udp = UdpSocket::bind(addr).await?; | |
let (udp_recv, _udp_send) = udp.split(); | |
let mut ctx_stream = UdpStream::new(udp_recv, self.state.clone()); | |
while let Ok(ctx) = ctx_stream.context().await { | |
if self.state.live_msgs() <= 100_000 { | |
tokio::spawn(async move { | |
tokio::time::delay_for(time::Duration::from_secs(3)).await; | |
drop(ctx); | |
}); | |
} | |
} | |
Ok(()) | |
} | |
fn log_stats(self: Arc<Self>) -> io::Result<()> { | |
use std::time::Duration; | |
let srv = self.clone(); | |
tokio::spawn(async move { | |
let mut interval = time::interval(Duration::from_secs(1)); | |
interval.tick().await; | |
loop { | |
println!("live msgs: {}", srv.state.live_msgs.load(Ordering::Acquire)); | |
interval.tick().await; | |
} | |
}); | |
Ok(()) | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let srv = Arc::new(Service { | |
state: Arc::new(State { | |
live_msgs: AtomicUsize::new(0), | |
}), | |
}); | |
srv.clone().log_stats().unwrap(); | |
srv.listen_udp("0.0.0.0:9953".parse().unwrap()) | |
.await | |
.unwrap(); | |
} | |
#[derive(Debug, Default)] | |
pub struct State { | |
live_msgs: AtomicUsize, | |
} | |
impl State { | |
pub fn inc_live_msgs(&self) -> usize { | |
self.live_msgs.fetch_add(1, Ordering::Acquire) | |
} | |
pub fn dec_live_msgs(&self) -> usize { | |
self.live_msgs.fetch_sub(1, Ordering::Acquire) | |
} | |
pub fn live_msgs(&self) -> usize { | |
self.live_msgs.load(Ordering::Acquire) | |
} | |
} | |
#[derive(Debug)] | |
pub struct MsgContext { | |
msg: Vec<u8>, | |
state: Arc<State>, | |
} | |
impl Drop for MsgContext { | |
fn drop(&mut self) { | |
self.state.dec_live_msgs(); | |
} | |
} | |
impl MsgContext { | |
pub fn with_state(msg: Vec<u8>, state: Arc<State>) -> Self { | |
Self { msg, state } | |
} | |
} | |
#[derive(Debug)] | |
pub struct UdpStream { | |
stream: udp::RecvHalf, | |
state: Arc<State>, | |
} | |
impl UdpStream { | |
pub fn new(stream: udp::RecvHalf, state: Arc<State>) -> Self { | |
UdpStream { stream, state } | |
} | |
pub async fn context(&mut self) -> io::Result<MsgContext> { | |
self.state.inc_live_msgs(); | |
let mut buf = [0u8; 4096]; | |
let (len, _src) = self.stream.recv_from(&mut buf).await?; | |
let msg = buf[..len].to_vec(); | |
Ok(MsgContext::with_state(msg, self.state.clone())) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment