-
-
Save anarelion/d62c01e723403f0d2e64dda5063a3dda to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use super::buffer::PacketBuffer; | |
use std::collections::VecDeque; | |
use std::error::Error; | |
use std::net::SocketAddr; | |
use std::sync::{Arc, Weak}; | |
use tokio::net::udp::{RecvHalf, SendHalf}; | |
use tokio::net::UdpSocket; | |
use tokio::sync::RwLock; | |
use tokio::task::yield_now; | |
use tokio::time::{timeout, Duration}; | |
#[derive(Debug)] | |
pub struct UDPConnection { | |
max_packet_size: usize, | |
packet_queue: VecDeque<PacketBuffer>, | |
remote_addr: SocketAddr, | |
send_socket: SendHalf, | |
recv_socket: RecvHalf, | |
} | |
impl UDPConnection { | |
pub async fn new(remote_addr: SocketAddr, max_packet_size: usize) -> Arc<RwLock<Self>> { | |
let local_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); | |
let socket = UdpSocket::bind(local_addr).await.unwrap(); | |
socket.connect(&remote_addr).await.unwrap(); | |
let (recv_socket, send_socket) = socket.split(); | |
let connection = Arc::new(RwLock::new(UDPConnection { | |
max_packet_size, | |
packet_queue: VecDeque::new(), | |
remote_addr, | |
send_socket, | |
recv_socket, | |
})); | |
let weak_connection = Arc::downgrade(&connection); | |
tokio::spawn(async move { | |
Self::receive_loop(weak_connection).await; | |
}); | |
yield_now().await; | |
connection | |
} | |
async fn receive_loop(weak_self: Weak<RwLock<UDPConnection>>) { | |
info!("UDPConnection init receive loop"); | |
loop { | |
let connection_maybe = weak_self.upgrade(); | |
match connection_maybe { | |
None => break, | |
Some(connection) => { | |
let result = timeout( | |
Duration::from_millis(10), | |
Self::receive(connection.clone()), | |
) | |
.await; | |
if let Ok(buffer) = result { | |
if !buffer.is_empty() { | |
let packet_buffer = PacketBuffer::with_payload(buffer); | |
debug!(" IN : {:?}", packet_buffer); | |
let mut c = connection.write().await; | |
c.packet_queue.push_back(packet_buffer); | |
} | |
} | |
} | |
} | |
} | |
info!("UDPConnection exit receive loop"); | |
} | |
async fn receive(connection: Arc<RwLock<UDPConnection>>) -> Vec<u8> { | |
let mut c = connection.write().await; | |
let mut buffer = vec![0u8; c.max_packet_size]; | |
let size = c.recv_socket.recv(buffer.as_mut_slice()).await.unwrap(); | |
buffer.truncate(size); | |
buffer | |
} | |
pub async fn send( | |
connection: Arc<RwLock<UDPConnection>>, | |
packet: PacketBuffer, | |
) -> Result<(), Box<dyn Error>> { | |
let mut c = connection.write().await; | |
debug!("OUT : {:?}", packet); | |
c.send_socket.send(packet.get_payload().as_slice()).await?; | |
Ok(()) | |
} | |
pub async fn pop_packet(connection: Arc<RwLock<UDPConnection>>) -> Option<PacketBuffer> { | |
let mut c = connection.write().await; | |
c.packet_queue.pop_front() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment