Skip to content

Instantly share code, notes, and snippets.

@nwisemanII
Last active November 15, 2023 14:31
Show Gist options
  • Save nwisemanII/0eedb3d90686513b36c2c4d04ee3a769 to your computer and use it in GitHub Desktop.
Save nwisemanII/0eedb3d90686513b36c2c4d04ee3a769 to your computer and use it in GitHub Desktop.
Rust Message bus
#![allow(unused)]
use std::collections::HashMap;
use std::collections::VecDeque;
use std::thread::JoinHandle;
use tokio::sync::Mutex; use std::sync::Arc;
type MessageQueue = VecDeque<Message>;
type MailboxMap = HashMap<String, MessageQueue>;
type MessageQueuePointer = Arc<Mutex<MessageQueue>>;
type MailboxMapPointer = Arc<Mutex<MailboxMap>>;
type DistribPointer = Arc<Mutex<Distributor>>;
// The grandaddy poobah
struct MessageBus {
// _inc_msq_q: MessageQueue,
// _rsp_q: MessageQueue,
// _sub_mb: MailboxMap,
// _pub_mb: MailboxMap,
incoming_message_queue: MessageQueuePointer,
response_queue: MessageQueuePointer,
subscriber_mailboxes: MailboxMapPointer,
publisher_mailboxes: MailboxMapPointer,
distributor: DistribPointer,
response_distributor: DistribPointer
}
impl MessageBus {
fn new() -> Self {
let mut _inc_msq_q: MessageQueue = VecDeque::new();
let mut incoming_message_queue: MessageQueuePointer = Arc::new(Mutex::new(_inc_msq_q));
let mut _sub_mb: MailboxMap = HashMap::new();
let mut subscriber_mailboxes: MailboxMapPointer = Arc::new(Mutex::new(_sub_mb));
let mut distributor: Distributor = Distributor::new(
"distributor",
incoming_message_queue.clone(),
subscriber_mailboxes.clone()
);
let mut _rsp_q: MessageQueue = VecDeque::new();
let mut response_queue: MessageQueuePointer = Arc::new(Mutex::new(_rsp_q));
let mut _pub_mb: MailboxMap = HashMap::new();
let mut publisher_mailboxes: MailboxMapPointer = Arc::new(Mutex::new(_pub_mb));
let mut response_distrib: Distributor = Distributor::new(
"response_distributor",
response_queue.clone(),
publisher_mailboxes.clone(),
);
Self {
// _inc_msq_q: _inc_msq_q.clone(),
// _rsp_q: _rsp_q.clone(),
// _sub_mb: _sub_mb.clone(),
// _pub_mb: _pub_mb.clone(),
incoming_message_queue: incoming_message_queue,
response_queue: response_queue,
subscriber_mailboxes: subscriber_mailboxes,
publisher_mailboxes: publisher_mailboxes,
distributor: Arc::new(Mutex::new(distributor)),
response_distributor: Arc::new(Mutex::new(response_distrib)),
}
}
async fn publish(&mut self, publisher: Publisher) {
let mut lock = self.publisher_mailboxes.lock().await;
lock.insert(publisher.name, VecDeque::new());
}
async fn subscribe(&mut self, subscriber: Subscriber) {
let mut lock = self.subscriber_mailboxes.lock().await;
lock.insert(subscriber.name, VecDeque::new());
}
async fn send(&mut self, message: Message) {
let mut lock = self.incoming_message_queue.lock().await;
lock.push_back(message);
}
}
struct Distributor {
_name: String,
inc_msg_queue: MessageQueuePointer,
endpoint_map: MailboxMapPointer,
}
impl Distributor {
fn new(name: &str, inc_msg_queue: MessageQueuePointer, endpoint_map: MailboxMapPointer) -> Self {
let cls = Self {
_name: name.to_string(),
inc_msg_queue: inc_msg_queue,
endpoint_map: endpoint_map,
};
// tokio::spawn(async {
// cls.distribute();
// });
cls
}
async fn distribute(&mut self) {
loop {
// println!("Distributing...");
let mut msg_q = self.inc_msg_queue.lock().await;
if let Some(message) = msg_q.pop_front() {
println!("Message (Popping off Inbox Q) : {:?}", message);
let mut endpoint_map = self.endpoint_map.lock().await;
if let Some(endpoint) = endpoint_map.get(&message.to) {
let mut endpoint = endpoint.to_owned();
println!("Message (in distrib) : {:?}", message);
endpoint.push_back(message.clone());
}
}
}
// tokio::spawn (async move {
// loop {
// if let Some(message) = &mut self.inc_msg_queue.pop_front() {
// if let Some(endpoint) = self.endpoint_map.get(&message.to) {
// endpoint.push_back(message);
// }
// }
// }
// });
}
}
// A message is a string - for now
#[derive(Debug, Clone)]
struct Message {
message: String,
to: String,
from: String,
}
// Basically who's going to be RECEIVING messages
struct Subscriber {
name: String
}
// Basically who's going to be SENDING messages
struct Publisher {
name: String
}
async fn run_distrib(distrib: DistribPointer) -> tokio::task::JoinHandle<()> {
let handle = tokio::spawn( async move {
let mut lock = distrib.lock().await;
lock.distribute().await;
});
handle
}
#[tokio::main]
async fn main() {
let mut bus = MessageBus::new();
let mut subscriber = Subscriber {
name: "subscriber".to_string()
};
let mut publisher = Publisher {
name: "publisher".to_string()
};
bus.subscribe(subscriber);
bus.publish(publisher);;
let mut message = Message {
message: "Hello, world!".to_string(),
to: "subscriber".to_string(),
from: "publisher".to_string(),
};
bus.send(message.clone()).await;
println!("Message: {:?}", message);
run_distrib(bus.distributor.clone()).await;
run_distrib(bus.response_distributor.clone()).await;
println!("Message: {:?}", message);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment