Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@anarelion
Created June 23, 2020 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anarelion/d62c01e723403f0d2e64dda5063a3dda to your computer and use it in GitHub Desktop.
Save anarelion/d62c01e723403f0d2e64dda5063a3dda to your computer and use it in GitHub Desktop.
use super::buffer::PacketBuffer;
use std::collections::VecDeque;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
use tokio::sync::RwLock;
use tokio::task::yield_now;
use tokio::time::{timeout, Duration};
#[derive(Debug)]
pub struct UDPConnection {
max_packet_size: usize,
packet_queue: VecDeque<PacketBuffer>,
remote_addr: SocketAddr,
send_socket: SendHalf,
recv_socket: RecvHalf,
}
impl UDPConnection {
pub async fn new(remote_addr: SocketAddr, max_packet_size: usize) -> Arc<RwLock<Self>> {
let local_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let socket = UdpSocket::bind(local_addr).await.unwrap();
socket.connect(&remote_addr).await.unwrap();
let (recv_socket, send_socket) = socket.split();
let connection = Arc::new(RwLock::new(UDPConnection {
max_packet_size,
packet_queue: VecDeque::new(),
remote_addr,
send_socket,
recv_socket,
}));
let weak_connection = Arc::downgrade(&connection);
tokio::spawn(async move {
Self::receive_loop(weak_connection).await;
});
yield_now().await;
connection
}
async fn receive_loop(weak_self: Weak<RwLock<UDPConnection>>) {
info!("UDPConnection init receive loop");
loop {
let connection_maybe = weak_self.upgrade();
match connection_maybe {
None => break,
Some(connection) => {
let result = timeout(
Duration::from_millis(10),
Self::receive(connection.clone()),
)
.await;
if let Ok(buffer) = result {
if !buffer.is_empty() {
let packet_buffer = PacketBuffer::with_payload(buffer);
debug!(" IN : {:?}", packet_buffer);
let mut c = connection.write().await;
c.packet_queue.push_back(packet_buffer);
}
}
}
}
}
info!("UDPConnection exit receive loop");
}
async fn receive(connection: Arc<RwLock<UDPConnection>>) -> Vec<u8> {
let mut c = connection.write().await;
let mut buffer = vec![0u8; c.max_packet_size];
let size = c.recv_socket.recv(buffer.as_mut_slice()).await.unwrap();
buffer.truncate(size);
buffer
}
pub async fn send(
connection: Arc<RwLock<UDPConnection>>,
packet: PacketBuffer,
) -> Result<(), Box<dyn Error>> {
let mut c = connection.write().await;
debug!("OUT : {:?}", packet);
c.send_socket.send(packet.get_payload().as_slice()).await?;
Ok(())
}
pub async fn pop_packet(connection: Arc<RwLock<UDPConnection>>) -> Option<PacketBuffer> {
let mut c = connection.write().await;
c.packet_queue.pop_front()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment