Skip to content

Instantly share code, notes, and snippets.

@helderjnpinto
Created May 3, 2023 21:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save helderjnpinto/bd9015913be98b51bcabd779fd991d0f to your computer and use it in GitHub Desktop.
Save helderjnpinto/bd9015913be98b51bcabd779fd991d0f to your computer and use it in GitHub Desktop.
MQTT rust wip
use std::collections::HashMap;
use std::io::{ self, Read, Write };
use std::net::{ TcpListener, TcpStream };
use std::sync::{ Arc, Mutex };
use std::thread;
#[derive(Clone)]
struct Broker {
subscribers: HashMap<String, Vec<Arc<Mutex<TcpStream>>>>,
}
impl Broker {
fn new() -> Self {
Self {
subscribers: HashMap::new(),
}
}
fn handle_connection(&mut self, mut stream: TcpStream) -> io::Result<()> {
loop {
let mut buf = [0; 1024];
let bytes_read = stream.read(&mut buf)?;
if bytes_read == 0 {
return Ok(());
}
let packet_type = buf[0] >> 4;
println!("Packet type {}", packet_type);
return match packet_type {
1 => self.handle_connect(&mut stream, &buf[1..bytes_read]),
2 => self.handle_connack(&mut stream, &buf[1..bytes_read]),
3 => self.handle_publish(&buf[1..bytes_read]),
4 => self.handle_puback(&buf[1..bytes_read]),
5 => self.handle_pubrec(&buf[1..bytes_read]),
6 => self.handle_pubrel(&buf[1..bytes_read]),
7 => self.handle_pubcomp(&buf[1..bytes_read]),
8 | 14 => self.handle_subscribe(&mut stream, &buf[1..bytes_read]),
9 => self.handle_suback(&buf[1..bytes_read]),
10 => self.handle_unsubscribe(&mut stream, &buf[1..bytes_read]),
11 => self.handle_unsuback(&buf[1..bytes_read]),
12 => self.handle_pingreq(&mut stream),
13 => self.handle_pingresp(&mut stream),
15 => self.handle_disconnect(&mut stream),
_ => {
println!("Unknown packet type {}", packet_type);
// Unknown packet type
return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown packet type"));
}
};
}
}
fn handle_pingreq(&self, stream: &mut TcpStream) -> io::Result<()> {
println!("PINGREQ received");
let response = [0xd0, 0x00];
stream.write_all(&response)?;
Ok(())
}
fn handle_pingresp(&self, _stream: &mut TcpStream) -> io::Result<()> {
println!("PINGRESP received");
Ok(())
}
fn handle_disconnect(&self, _stream: &mut TcpStream) -> io::Result<()> {
println!("DISCONNECT received");
Ok(())
}
fn handle_connack(&self, _stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> {
println!("handle_connack: packet={:?}", packet);
// We don't need to do anything with the CONNACK packet, since we're not
// checking for any authentication or keepalive functionality.
Ok(())
}
fn handle_puback(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("PUBACK received for packet ID {}", packet_id);
Ok(())
}
fn handle_pubrec(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("PUBREC received for packet ID {}", packet_id);
Ok(())
}
fn handle_pubrel(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("PUBREL received for packet ID {}", packet_id);
Ok(())
}
fn handle_pubcomp(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("PUBCOMP received for packet ID {}", packet_id);
Ok(())
}
fn handle_suback(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("SUBACK received for packet ID {}", packet_id);
Ok(())
}
fn handle_unsubscribe(&mut self, _stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
let topic_len = ((packet[2] as u16) << 8) | (packet[3] as u16);
let topic = String::from_utf8_lossy(&packet[4..4 + (topic_len as usize)]).to_string();
println!("UNSUBSCRIBE: topic={}", topic);
if let Some(subscribers) = self.subscribers.get_mut(&topic) {
subscribers.retain(|subscriber| {
let mut s = subscriber.lock().unwrap();
if s.peer_addr().is_ok() {
s.write_all(&create_unsuback_packet(packet_id).unwrap()).unwrap();
true
} else {
false
}
});
self.subscribers.remove(&topic);
}
Ok(())
}
fn handle_unsuback(&self, packet: &[u8]) -> io::Result<()> {
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
println!("UNSUBACK received for packet ID {}", packet_id);
Ok(())
}
fn handle_connect(&self, stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> {
println!("handle_connect: packet={:?}", packet);
// We dont need to do anything with the CONNECT packet, since we dont
// support any authentication or keepalive functionality.
// Instead, we can just send a CONNACK packet to acknowledge the connection.
let response = [0x20, 0x02, 0x00, 0x00];
stream.write_all(&response)?;
Ok(())
}
fn handle_subscribe(&mut self, stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> {
// Parse the SUBSCRIBE packet
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16);
let topic_len = ((packet[2] as u16) << 8) | (packet[3] as u16);
let topic = String::from_utf8_lossy(&packet[4..4 + (topic_len as usize)]).to_string();
let qos = packet[4 + (topic_len as usize)];
println!("SUBSCRIBE: topic={}, QoS={}", topic, qos);
// Add the subscriber to the list of subscribers for this topic
let subscriber = Arc::new(Mutex::new(stream.try_clone()?));
let subscribers = self.subscribers.entry(topic).or_insert_with(Vec::new);
subscribers.push(subscriber.clone());
// Send a SUBACK packet to acknowledge the subscription
let response = create_suback_packet(packet_id)?;
stream.write_all(&response)?;
Ok(())
}
fn handle_publish(&self, packet: &[u8]) -> io::Result<()> {
// Parse the PUBLISH packet
let topic_len = ((packet[0] as u16) << 8) | (packet[1] as u16);
let topic = String::from_utf8_lossy(&packet[2..2 + (topic_len as usize)]).to_string();
let message = String::from_utf8_lossy(&packet[2 + (topic_len as usize)..]).to_string();
println!("PUBLISH: topic={}, message={}", topic, message);
// Publish the message to all subscribers
if let Some(subscribers) = self.subscribers.get(&topic) {
let publish_packet = create_publish_packet(&topic, &message)?;
for subscriber in subscribers {
let mut stream = subscriber.lock().unwrap();
stream.write_all(&publish_packet)?;
}
}
Ok(())
}
fn run(&mut self, addr: &str) -> io::Result<()> {
let listener = TcpListener::bind(addr)?;
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let mut broker = self.clone();
thread::spawn(move || {
let handle_result = broker.handle_connection(stream);
match handle_result {
Ok(()) => {}
Err(err) => { println!("Handle error {:?}", err) }
}
});
}
Err(e) => {
eprintln!("Error accepting connection: {}", e);
}
}
}
Ok(())
}
}
fn create_publish_packet(topic: &str, message: &str) -> io::Result<Vec<u8>> {
let topic_len = topic.len() as u16;
let message_len = message.len() as u16;
print!("Create pub message len {}", message_len);
let mut packet = Vec::new();
packet.push(0x30); // Fixed header: PUBLISH
packet.push(((topic_len >> 8) & 0xff) as u8);
packet.push((topic_len & 0xff) as u8);
packet.extend_from_slice(topic.as_bytes());
packet.extend_from_slice(message.as_bytes());
Ok(packet)
}
fn create_suback_packet(packet_id: u16) -> io::Result<Vec<u8>> {
let mut packet = Vec::new();
packet.push(0x90); // Fixed header: SUBACK
packet.push(0x02); // Remaining length: 2
packet.push((packet_id >> 8) as u8);
packet.push((packet_id & 0xff) as u8);
packet.push(0x00); // QoS: 0
Ok(packet)
}
fn create_unsuback_packet(packet_id: u16) -> io::Result<Vec<u8>> {
let mut packet = Vec::new();
packet.push(0xb0); // Fixed header: UNSUBACK
packet.push(0x02); // Remaining length: 2
packet.push((packet_id >> 8) as u8);
packet.push((packet_id & 0xff) as u8);
Ok(packet)
}
fn main() -> io::Result<()> {
let mut broker = Broker::new();
broker.run("127.0.0.1:1883")?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment