Created
April 17, 2017 02:16
-
-
Save reportingsjr/66ee30b75efcf1afb16e74ada554a8f4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io::prelude::*; | |
use std::net::{TcpStream, Shutdown}; | |
extern crate byteorder; | |
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt}; | |
#[macro_use] | |
extern crate bitflags; | |
// Packet types | |
const CONNECT : u8 = 1; | |
const CONNACK : u8 = 2; | |
const PUBLISH : u8 = 3; | |
const PUBACK : u8 = 4; | |
const PUBREC : u8 = 5; | |
const PUBREL : u8 = 6; | |
const PUBCOMP : u8 = 7; | |
const SUBSCRIBE : u8 = 8; | |
const SUBACK : u8 = 9; | |
const UNSUBSCRIBE : u8 = 10; | |
const UNSUBACK : u8 = 11; | |
const PINGREQ : u8 = 12; | |
const PINGRESP : u8 = 13; | |
const DISCONNECT : u8 = 14; | |
struct FixedHeader { | |
packet_type : u8, | |
remaining_length : u8, | |
} | |
struct mqttClient { | |
stream: TcpStream, | |
} | |
fn main() { | |
let mut client = mqttClient { stream: TcpStream::connect("127.0.0.1:1883").unwrap() }; | |
let client_id : &'static str = "test_client"; | |
client.connect(client_id); | |
let mut connack_buffer : [u8; 4] = [0; 4]; | |
client.stream.read(&mut connack_buffer[..]).unwrap(); | |
match connack_buffer[0] { | |
0x20 => (), | |
_ => panic!("Packet directly after connection attempt was not a CONNACK. It was {:x}.", connack_buffer[0]), | |
} | |
match connack_buffer[3] { | |
0x00 => println!("Recieved a CONNACK"), //connection accepted | |
0x01 => panic!("Unacceptable protocol version"), | |
0x02 => panic!("Identifier rejected"), | |
0x03 => panic!("Server unavailable"), | |
0x04 => panic!("bad username or password"), | |
0x05 => panic!("not authorized"), | |
_ => panic!("Invalid connect return code."), | |
} | |
// SUBSCRIBE packet | |
subscribe(&mut client.stream, "#"); | |
let mut suback_header : [u8; 1] = [0; 1]; | |
client.stream.read(&mut suback_header).unwrap(); | |
match suback_header[0] { | |
0b10010000 => println!("Received a SUBACK."), | |
_ => panic!("Packet after SUBSCRIBE was not a SUBACK. It was {:x}", suback_header[0]), | |
} | |
let mut remaining_length : [u8; 1] = [0]; | |
client.stream.read(&mut remaining_length).unwrap(); | |
println!("Remaining length (bytes) of SUBACK packet (2 + number of topics subscribed to): {}.", remaining_length[0]); | |
match client.stream.read_u16::<BigEndian>().unwrap() { | |
10u16 => println!("Correct packet ID from SUBACK."), | |
id => panic!("Wrong packet ID from SUBACK: {:b}.", id), | |
} | |
// should loop for remaining_length - 2 | |
let mut suback_return : [u8; 1] = [0]; | |
client.stream.read(&mut suback_return).unwrap(); | |
match suback_return[0] { | |
0 => println!("Subbed to topic with QoS 0."), | |
1 => println!("Subbed to topic with QoS 1."), | |
2 => println!("Subbed to topic with QoS 2."), | |
0x80 => println!("Failed to sub to topic."), | |
_ => panic!("Invalid suback return code: {:x}", suback_return[0]), | |
} | |
loop { | |
let mut nextPacket : [u8; 1] = [0]; | |
match client.stream.read(&mut nextPacket) { | |
Ok(n) if n > 0 => println!("Received byte: {:}", nextPacket[0] as char), | |
Ok(..) => {}, | |
Err(e) => println!("Couldn't read from TcpStream due to error: {}.", e), | |
} | |
} | |
disconnect(&mut client.stream); | |
client.stream.shutdown(Shutdown::Both); | |
} | |
fn fixed_header_packet(packet_type : u8) -> u8 { | |
let packet_flags = match packet_type { | |
PUBLISH => 0b0000, // we're not ready for this yet so just return 0 | |
PUBREL | SUBSCRIBE | UNSUBSCRIBE => 0b0010, | |
_ => 0b000, | |
}; | |
let merged_type : u8 = (packet_type << 4) | packet_flags; | |
merged_type | |
} | |
impl mqttClient { | |
fn connect(&self, client_id : &'static str) { | |
let mqtt : &'static str = "MQTT"; | |
// Write the CONNECT header packet | |
self.stream.write(&[fixed_header_packet(CONNECT)]).unwrap(); | |
let mut length = 2 + 4 + 1 + 1 + 2; | |
length += 2 + client_id.len(); | |
// write the variable length | |
self.stream.write(&[length as u8]).unwrap(); | |
// write the protocol name | |
self.stream.write_u16::<BigEndian>(mqtt.len() as u16).unwrap(); | |
self.stream.write(mqtt.as_bytes()).unwrap(); | |
// write the protocol level | |
self.stream.write(&[4 as u8]).unwrap(); | |
// write the connect flags | |
self.stream.write(&[0b00000010 as u8]).unwrap(); | |
// write the keep_alive length | |
self.stream.write_u16::<BigEndian>(0 as u16).unwrap(); | |
// write the client id | |
self.stream.write_u16::<BigEndian>(client_id.len() as u16).unwrap(); | |
self.stream.write(client_id.as_bytes()).unwrap(); | |
} | |
} | |
fn subscribe(stream: &mut TcpStream, topic : &'static str) { | |
stream.write(&[fixed_header_packet(SUBSCRIBE)]).unwrap(); | |
// length of the variable header (2) + topic length + topic length bytes (2) + QoS | |
let length = 2 + topic.len() + 2 + 1; | |
stream.write(&[length as u8]).unwrap(); | |
// variable header with a packet identifier of 10 (which shouldn't really be hardcoded) | |
let variable_header = 10u16; | |
stream.write_u16::<BigEndian>(variable_header).unwrap(); | |
// Repeat these three writes for the number of topics being subscribed to | |
stream.write_u16::<BigEndian>(topic.len() as u16).unwrap(); | |
stream.write(topic.as_bytes()).unwrap(); | |
// QoS | |
stream.write(&[0b00000000 as u8]).unwrap(); | |
} | |
fn disconnect(stream : &mut TcpStream) { | |
stream.write(&[0b11100000 as u8]).unwrap(); | |
stream.write(&[0b00000000 as u8]).unwrap(); | |
} | |
fn ping(stream: &mut TcpStream) { | |
// | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment