Created
May 3, 2023 21:39
-
-
Save helderjnpinto/bd9015913be98b51bcabd779fd991d0f to your computer and use it in GitHub Desktop.
MQTT rust wip
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::io::{ self, Read, Write }; | |
use std::net::{ TcpListener, TcpStream }; | |
use std::sync::{ Arc, Mutex }; | |
use std::thread; | |
#[derive(Clone)] | |
struct Broker { | |
subscribers: HashMap<String, Vec<Arc<Mutex<TcpStream>>>>, | |
} | |
impl Broker { | |
fn new() -> Self { | |
Self { | |
subscribers: HashMap::new(), | |
} | |
} | |
fn handle_connection(&mut self, mut stream: TcpStream) -> io::Result<()> { | |
loop { | |
let mut buf = [0; 1024]; | |
let bytes_read = stream.read(&mut buf)?; | |
if bytes_read == 0 { | |
return Ok(()); | |
} | |
let packet_type = buf[0] >> 4; | |
println!("Packet type {}", packet_type); | |
return match packet_type { | |
1 => self.handle_connect(&mut stream, &buf[1..bytes_read]), | |
2 => self.handle_connack(&mut stream, &buf[1..bytes_read]), | |
3 => self.handle_publish(&buf[1..bytes_read]), | |
4 => self.handle_puback(&buf[1..bytes_read]), | |
5 => self.handle_pubrec(&buf[1..bytes_read]), | |
6 => self.handle_pubrel(&buf[1..bytes_read]), | |
7 => self.handle_pubcomp(&buf[1..bytes_read]), | |
8 | 14 => self.handle_subscribe(&mut stream, &buf[1..bytes_read]), | |
9 => self.handle_suback(&buf[1..bytes_read]), | |
10 => self.handle_unsubscribe(&mut stream, &buf[1..bytes_read]), | |
11 => self.handle_unsuback(&buf[1..bytes_read]), | |
12 => self.handle_pingreq(&mut stream), | |
13 => self.handle_pingresp(&mut stream), | |
15 => self.handle_disconnect(&mut stream), | |
_ => { | |
println!("Unknown packet type {}", packet_type); | |
// Unknown packet type | |
return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown packet type")); | |
} | |
}; | |
} | |
} | |
fn handle_pingreq(&self, stream: &mut TcpStream) -> io::Result<()> { | |
println!("PINGREQ received"); | |
let response = [0xd0, 0x00]; | |
stream.write_all(&response)?; | |
Ok(()) | |
} | |
fn handle_pingresp(&self, _stream: &mut TcpStream) -> io::Result<()> { | |
println!("PINGRESP received"); | |
Ok(()) | |
} | |
fn handle_disconnect(&self, _stream: &mut TcpStream) -> io::Result<()> { | |
println!("DISCONNECT received"); | |
Ok(()) | |
} | |
fn handle_connack(&self, _stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> { | |
println!("handle_connack: packet={:?}", packet); | |
// We don't need to do anything with the CONNACK packet, since we're not | |
// checking for any authentication or keepalive functionality. | |
Ok(()) | |
} | |
fn handle_puback(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("PUBACK received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_pubrec(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("PUBREC received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_pubrel(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("PUBREL received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_pubcomp(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("PUBCOMP received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_suback(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("SUBACK received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_unsubscribe(&mut self, _stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
let topic_len = ((packet[2] as u16) << 8) | (packet[3] as u16); | |
let topic = String::from_utf8_lossy(&packet[4..4 + (topic_len as usize)]).to_string(); | |
println!("UNSUBSCRIBE: topic={}", topic); | |
if let Some(subscribers) = self.subscribers.get_mut(&topic) { | |
subscribers.retain(|subscriber| { | |
let mut s = subscriber.lock().unwrap(); | |
if s.peer_addr().is_ok() { | |
s.write_all(&create_unsuback_packet(packet_id).unwrap()).unwrap(); | |
true | |
} else { | |
false | |
} | |
}); | |
self.subscribers.remove(&topic); | |
} | |
Ok(()) | |
} | |
fn handle_unsuback(&self, packet: &[u8]) -> io::Result<()> { | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
println!("UNSUBACK received for packet ID {}", packet_id); | |
Ok(()) | |
} | |
fn handle_connect(&self, stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> { | |
println!("handle_connect: packet={:?}", packet); | |
// We dont need to do anything with the CONNECT packet, since we dont | |
// support any authentication or keepalive functionality. | |
// Instead, we can just send a CONNACK packet to acknowledge the connection. | |
let response = [0x20, 0x02, 0x00, 0x00]; | |
stream.write_all(&response)?; | |
Ok(()) | |
} | |
fn handle_subscribe(&mut self, stream: &mut TcpStream, packet: &[u8]) -> io::Result<()> { | |
// Parse the SUBSCRIBE packet | |
let packet_id = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
let topic_len = ((packet[2] as u16) << 8) | (packet[3] as u16); | |
let topic = String::from_utf8_lossy(&packet[4..4 + (topic_len as usize)]).to_string(); | |
let qos = packet[4 + (topic_len as usize)]; | |
println!("SUBSCRIBE: topic={}, QoS={}", topic, qos); | |
// Add the subscriber to the list of subscribers for this topic | |
let subscriber = Arc::new(Mutex::new(stream.try_clone()?)); | |
let subscribers = self.subscribers.entry(topic).or_insert_with(Vec::new); | |
subscribers.push(subscriber.clone()); | |
// Send a SUBACK packet to acknowledge the subscription | |
let response = create_suback_packet(packet_id)?; | |
stream.write_all(&response)?; | |
Ok(()) | |
} | |
fn handle_publish(&self, packet: &[u8]) -> io::Result<()> { | |
// Parse the PUBLISH packet | |
let topic_len = ((packet[0] as u16) << 8) | (packet[1] as u16); | |
let topic = String::from_utf8_lossy(&packet[2..2 + (topic_len as usize)]).to_string(); | |
let message = String::from_utf8_lossy(&packet[2 + (topic_len as usize)..]).to_string(); | |
println!("PUBLISH: topic={}, message={}", topic, message); | |
// Publish the message to all subscribers | |
if let Some(subscribers) = self.subscribers.get(&topic) { | |
let publish_packet = create_publish_packet(&topic, &message)?; | |
for subscriber in subscribers { | |
let mut stream = subscriber.lock().unwrap(); | |
stream.write_all(&publish_packet)?; | |
} | |
} | |
Ok(()) | |
} | |
fn run(&mut self, addr: &str) -> io::Result<()> { | |
let listener = TcpListener::bind(addr)?; | |
for stream in listener.incoming() { | |
match stream { | |
Ok(stream) => { | |
let mut broker = self.clone(); | |
thread::spawn(move || { | |
let handle_result = broker.handle_connection(stream); | |
match handle_result { | |
Ok(()) => {} | |
Err(err) => { println!("Handle error {:?}", err) } | |
} | |
}); | |
} | |
Err(e) => { | |
eprintln!("Error accepting connection: {}", e); | |
} | |
} | |
} | |
Ok(()) | |
} | |
} | |
fn create_publish_packet(topic: &str, message: &str) -> io::Result<Vec<u8>> { | |
let topic_len = topic.len() as u16; | |
let message_len = message.len() as u16; | |
print!("Create pub message len {}", message_len); | |
let mut packet = Vec::new(); | |
packet.push(0x30); // Fixed header: PUBLISH | |
packet.push(((topic_len >> 8) & 0xff) as u8); | |
packet.push((topic_len & 0xff) as u8); | |
packet.extend_from_slice(topic.as_bytes()); | |
packet.extend_from_slice(message.as_bytes()); | |
Ok(packet) | |
} | |
fn create_suback_packet(packet_id: u16) -> io::Result<Vec<u8>> { | |
let mut packet = Vec::new(); | |
packet.push(0x90); // Fixed header: SUBACK | |
packet.push(0x02); // Remaining length: 2 | |
packet.push((packet_id >> 8) as u8); | |
packet.push((packet_id & 0xff) as u8); | |
packet.push(0x00); // QoS: 0 | |
Ok(packet) | |
} | |
fn create_unsuback_packet(packet_id: u16) -> io::Result<Vec<u8>> { | |
let mut packet = Vec::new(); | |
packet.push(0xb0); // Fixed header: UNSUBACK | |
packet.push(0x02); // Remaining length: 2 | |
packet.push((packet_id >> 8) as u8); | |
packet.push((packet_id & 0xff) as u8); | |
Ok(packet) | |
} | |
fn main() -> io::Result<()> { | |
let mut broker = Broker::new(); | |
broker.run("127.0.0.1:1883")?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment