Skip to content

Instantly share code, notes, and snippets.

@leshow
Last active April 17, 2020 23:41
Show Gist options
  • Save leshow/8c5ffebba1368f6e465c3758d29837ee to your computer and use it in GitHub Desktop.
Save leshow/8c5ffebba1368f6e465c3758d29837ee to your computer and use it in GitHub Desktop.
/// ```bash
/// cargo run --release
/// # in another term
/// flame -p 9953
/// # in another term
/// flame -p 9953
/// ```
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::net::udp;
use std::{io, net::SocketAddr, sync::Arc};
use tokio::{net::UdpSocket, time};
struct Service {
state: Arc<State>,
}
impl Service {
async fn listen_udp(self: Arc<Self>, addr: SocketAddr) -> io::Result<()> {
let udp = UdpSocket::bind(addr).await?;
let (udp_recv, _udp_send) = udp.split();
let mut ctx_stream = UdpStream::new(udp_recv, self.state.clone());
while let Ok(ctx) = ctx_stream.context().await {
if self.state.live_msgs() <= 100_000 {
tokio::spawn(async move {
tokio::time::delay_for(time::Duration::from_secs(3)).await;
drop(ctx);
});
}
}
Ok(())
}
fn log_stats(self: Arc<Self>) -> io::Result<()> {
use std::time::Duration;
let srv = self.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
interval.tick().await;
loop {
println!("live msgs: {}", srv.state.live_msgs.load(Ordering::Acquire));
interval.tick().await;
}
});
Ok(())
}
}
#[tokio::main]
async fn main() {
let srv = Arc::new(Service {
state: Arc::new(State {
live_msgs: AtomicUsize::new(0),
}),
});
srv.clone().log_stats().unwrap();
srv.listen_udp("0.0.0.0:9953".parse().unwrap())
.await
.unwrap();
}
#[derive(Debug, Default)]
pub struct State {
live_msgs: AtomicUsize,
}
impl State {
pub fn inc_live_msgs(&self) -> usize {
self.live_msgs.fetch_add(1, Ordering::Acquire)
}
pub fn dec_live_msgs(&self) -> usize {
self.live_msgs.fetch_sub(1, Ordering::Acquire)
}
pub fn live_msgs(&self) -> usize {
self.live_msgs.load(Ordering::Acquire)
}
}
#[derive(Debug)]
pub struct MsgContext {
msg: Vec<u8>,
state: Arc<State>,
}
impl Drop for MsgContext {
fn drop(&mut self) {
self.state.dec_live_msgs();
}
}
impl MsgContext {
pub fn with_state(msg: Vec<u8>, state: Arc<State>) -> Self {
Self { msg, state }
}
}
#[derive(Debug)]
pub struct UdpStream {
stream: udp::RecvHalf,
state: Arc<State>,
}
impl UdpStream {
pub fn new(stream: udp::RecvHalf, state: Arc<State>) -> Self {
UdpStream { stream, state }
}
pub async fn context(&mut self) -> io::Result<MsgContext> {
self.state.inc_live_msgs();
let mut buf = [0u8; 4096];
let (len, _src) = self.stream.recv_from(&mut buf).await?;
let msg = buf[..len].to_vec();
Ok(MsgContext::with_state(msg, self.state.clone()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment