Last active
January 1, 2024 06:16
-
-
Save matthewjberger/9a03ecefc6793f965484a114a99059a6 to your computer and use it in GitHub Desktop.
Tiny pubsub broker in rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// [dependencies] | |
// uuid = { version = "1.6.1", features = ["v4"] } | |
use std::collections::{HashMap, VecDeque}; | |
#[derive(Clone, Debug)] | |
pub enum Message { | |
String(String), | |
} | |
pub struct Client { | |
pub id: uuid::Uuid, | |
pub event_queue: std::cell::RefCell<VecDeque<Message>>, | |
pub ring_buffer_size: usize, | |
} | |
pub type ClientHandle = std::rc::Rc<std::cell::RefCell<Client>>; | |
impl Client { | |
pub fn new(ring_buffer_size: usize) -> ClientHandle { | |
std::rc::Rc::new(std::cell::RefCell::new(Self { | |
id: uuid::Uuid::new_v4(), | |
event_queue: std::cell::RefCell::new(VecDeque::new()), | |
ring_buffer_size, | |
})) | |
} | |
pub fn next_message(&self) -> Option<Message> { | |
self.event_queue.borrow_mut().pop_front() | |
} | |
} | |
#[derive(Default)] | |
pub struct Broker { | |
subscribers: HashMap<String, Vec<std::rc::Weak<std::cell::RefCell<Client>>>>, | |
} | |
impl Broker { | |
pub fn subscribe(&mut self, topic: &str, client: &ClientHandle) { | |
self.subscribers | |
.entry(topic.to_string()) | |
.or_insert_with(Vec::new) | |
.push(std::rc::Rc::downgrade(client)); | |
} | |
pub fn unsubscribe(&mut self, topic: &str, client_id: uuid::Uuid) { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
subscribers.retain(|subscriber| { | |
subscriber | |
.upgrade() | |
.map_or(false, |client| client.borrow().id != client_id) | |
}); | |
} | |
} | |
pub fn publish(&mut self, topic: &str, message: Message) { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
subscribers.retain(|subscriber_weak| { | |
if let Some(subscriber) = subscriber_weak.upgrade() { | |
let subscriber = subscriber.borrow_mut(); | |
if subscriber.event_queue.borrow().len() == subscriber.ring_buffer_size { | |
subscriber.event_queue.borrow_mut().pop_front(); | |
} | |
subscriber | |
.event_queue | |
.borrow_mut() | |
.push_back(message.clone()); | |
true | |
} else { | |
false | |
} | |
}); | |
} | |
} | |
} | |
fn main() { | |
// Create a broker | |
let mut broker = Broker::default(); | |
// Create clients | |
let client1 = Client::new(10); | |
let client2 = Client::new(10); | |
// Subscribe clients to topics | |
broker.subscribe("rust", &client1); | |
broker.subscribe("programming", &client2); | |
// Publish messages | |
broker.publish("rust", Message::String("Hello Rust!".to_string())); | |
broker.publish( | |
"programming", | |
Message::String("Hello Programming!".to_string()), | |
); | |
// Display messages received by clients | |
println!("Client 1 received: {:?}", client1.borrow().next_message()); | |
println!("Client 2 received: {:?}", client2.borrow().next_message()); | |
// Unsubscribe client1 and cleanup topics | |
broker.unsubscribe("rust", client1.borrow().id); | |
// Try publishing to the cleaned up topic | |
broker.publish( | |
"rust", | |
Message::String("This should not be received".to_string()), | |
); | |
// Since client1 is unsubscribed and the topic 'rust' is cleaned up, there should be no message | |
println!( | |
"Client 1 received after unsubscribing: {:?}", | |
client1.borrow().next_message() | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment