Skip to content

Instantly share code, notes, and snippets.

@McSquanchy
Created November 5, 2023 12:54
Show Gist options
  • Save McSquanchy/2c0e24db31777819a5748b7fe2f5620c to your computer and use it in GitHub Desktop.
Save McSquanchy/2c0e24db31777819a5748b7fe2f5620c to your computer and use it in GitHub Desktop.
MessageGroup
use std::collections::HashMap;
use std::fmt::Debug;
use std::{io, thread};
use std::net::SocketAddr;
use std::thread::JoinHandle;
use crossbeam_channel::{Receiver, Sender, unbounded};
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
pub trait Message {
fn encode(&self) -> Vec<u8>;
fn decode(buf: &[u8]) -> Self;
}
pub trait MessageProducer: Send + Sync {
fn consume_buffer(&mut self, buf: &[u8]);
}
pub struct MessageThread<T> where T: Send + Sync + Message + Sized + Debug + 'static {
tx: Sender<T>
}
impl<T> MessageThread<T> where T: Send + Sync + Message + Sized + Debug + 'static {
fn new(tx: Sender<T>) -> Self {
Self {
tx
}
}
}
impl<T> MessageProducer for MessageThread<T> where T: Send + Sync + Message + Sized + Debug + 'static {
fn consume_buffer(&mut self, buf: &[u8]) {
self.tx.send(T::decode(&buf)).expect("TODO: panic message");
}
}
pub struct MessageGroup {
uuid_to_producer: HashMap<Token, Box<dyn MessageProducer>>,
token_to_socket: HashMap<Token, UdpSocket>,
next_token: usize,
thread_handle: Option<JoinHandle<()>>
}
impl MessageGroup {
fn new() -> Self {
Self {
uuid_to_producer: HashMap::new(),
token_to_socket: HashMap::new(),
next_token: 0_usize,
thread_handle: None
}
}
fn add<T: Message>(&mut self,socket_addr: SocketAddr) -> Receiver<T> where T: Send + Sync + Message + Sized + Debug + 'static {
let (tx, rx) = unbounded();
let token = Token(self.next_token);
self.next_token +=1;
self.uuid_to_producer.insert(token, Box::new( MessageThread::new(tx)));
self.token_to_socket.insert(token, UdpSocket::bind(socket_addr).unwrap());
rx
}
fn run(&mut self) {
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(self.token_to_socket.len());
for (token, socket) in &mut self.token_to_socket {
poll.registry()
.register(socket, token.clone(), Interest::READABLE).unwrap();
}
let mut buf = [0; 1 << 16];
thread::spawn(|| {
loop {
if let Err(err) = poll.poll(&mut events, None) {
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
}
for event in events.iter() {
if let Some(socket) = self.token_to_socket.get(&event.token()) {
match socket.recv_from(&mut buf) {
Ok((packet_size, _)) => {
let producer = self.uuid_to_producer.get_mut(&event.token()).unwrap().as_mut();
producer.consume_buffer(&buf[..packet_size]);
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
}
Err(_) => {
}
}
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment