Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Last active January 1, 2024 06:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthewjberger/9a03ecefc6793f965484a114a99059a6 to your computer and use it in GitHub Desktop.
Save matthewjberger/9a03ecefc6793f965484a114a99059a6 to your computer and use it in GitHub Desktop.
Tiny pubsub broker in rust
// [dependencies]
// uuid = { version = "1.6.1", features = ["v4"] }
use std::collections::{HashMap, VecDeque};
#[derive(Clone, Debug)]
pub enum Message {
String(String),
}
pub struct Client {
pub id: uuid::Uuid,
pub event_queue: std::cell::RefCell<VecDeque<Message>>,
pub ring_buffer_size: usize,
}
pub type ClientHandle = std::rc::Rc<std::cell::RefCell<Client>>;
impl Client {
pub fn new(ring_buffer_size: usize) -> ClientHandle {
std::rc::Rc::new(std::cell::RefCell::new(Self {
id: uuid::Uuid::new_v4(),
event_queue: std::cell::RefCell::new(VecDeque::new()),
ring_buffer_size,
}))
}
pub fn next_message(&self) -> Option<Message> {
self.event_queue.borrow_mut().pop_front()
}
}
#[derive(Default)]
pub struct Broker {
subscribers: HashMap<String, Vec<std::rc::Weak<std::cell::RefCell<Client>>>>,
}
impl Broker {
pub fn subscribe(&mut self, topic: &str, client: &ClientHandle) {
self.subscribers
.entry(topic.to_string())
.or_insert_with(Vec::new)
.push(std::rc::Rc::downgrade(client));
}
pub fn unsubscribe(&mut self, topic: &str, client_id: uuid::Uuid) {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
subscribers.retain(|subscriber| {
subscriber
.upgrade()
.map_or(false, |client| client.borrow().id != client_id)
});
}
}
pub fn publish(&mut self, topic: &str, message: Message) {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
subscribers.retain(|subscriber_weak| {
if let Some(subscriber) = subscriber_weak.upgrade() {
let subscriber = subscriber.borrow_mut();
if subscriber.event_queue.borrow().len() == subscriber.ring_buffer_size {
subscriber.event_queue.borrow_mut().pop_front();
}
subscriber
.event_queue
.borrow_mut()
.push_back(message.clone());
true
} else {
false
}
});
}
}
}
fn main() {
// Create a broker
let mut broker = Broker::default();
// Create clients
let client1 = Client::new(10);
let client2 = Client::new(10);
// Subscribe clients to topics
broker.subscribe("rust", &client1);
broker.subscribe("programming", &client2);
// Publish messages
broker.publish("rust", Message::String("Hello Rust!".to_string()));
broker.publish(
"programming",
Message::String("Hello Programming!".to_string()),
);
// Display messages received by clients
println!("Client 1 received: {:?}", client1.borrow().next_message());
println!("Client 2 received: {:?}", client2.borrow().next_message());
// Unsubscribe client1 and cleanup topics
broker.unsubscribe("rust", client1.borrow().id);
// Try publishing to the cleaned up topic
broker.publish(
"rust",
Message::String("This should not be received".to_string()),
);
// Since client1 is unsubscribed and the topic 'rust' is cleaned up, there should be no message
println!(
"Client 1 received after unsubscribing: {:?}",
client1.borrow().next_message()
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment