Created
April 15, 2017 16:23
-
-
Save reportingsjr/314d418e3870370f8ceaba7c6b751d7b to your computer and use it in GitHub Desktop.
Rust TcpStream issues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io::prelude::*; | |
use std::net::{TcpStream, Shutdown}; | |
extern crate byteorder; | |
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt}; | |
#[macro_use] | |
extern crate bitflags; | |
// Packet types | |
const CONNECT : u8 = 1; | |
const CONNACK : u8 = 2; | |
const PUBLISH : u8 = 3; | |
const PUBACK : u8 = 4; | |
const PUBREC : u8 = 5; | |
const PUBREL : u8 = 6; | |
const PUBCOMP : u8 = 7; | |
const SUBSCRIBE : u8 = 8; | |
const SUBACK : u8 = 9; | |
const UNSUBSCRIBE : u8 = 10; | |
const UNSUBACK : u8 = 11; | |
const PINGREQ : u8 = 12; | |
const PINGRESP : u8 = 13; | |
const DISCONNECT : u8 = 14; | |
struct FixedHeader { | |
packet_type : u8, | |
remaining_length : u8, | |
} | |
fn main() { | |
let mut stream = TcpStream::connect("127.0.0.1:1883").unwrap(); | |
let client_id : &'static str = "test_client"; | |
connect(&mut stream, client_id); | |
let mut connack_buffer : [u8; 4] = [0; 4]; | |
stream.read(&mut connack_buffer[..]).unwrap(); | |
match connack_buffer[0] { | |
0x20 => (), | |
_ => panic!("Packet directly after connection attempt was not a CONNACK. It was {:x}.", connack_buffer[0]), | |
} | |
match connack_buffer[3] { | |
0x00 => println!("Recieved a CONNACK"), //connection accepted | |
0x01 => panic!("Unacceptable protocol version"), | |
0x02 => panic!("Identifier rejected"), | |
0x03 => panic!("Server unavailable"), | |
0x04 => panic!("bad username or password"), | |
0x05 => panic!("not authorized"), | |
_ => panic!("Invalid connect return code."), | |
} | |
// SUBSCRIBE packet | |
subscribe(&mut stream, "#"); | |
let mut suback_header : [u8; 1] = [0; 1]; | |
stream.read(&mut suback_header).unwrap(); | |
match suback_header[0] { | |
0b10010000 => println!("Received a SUBACK."), | |
_ => panic!("Packet after SUBSCRIBE was not a SUBACK. It was {:x}", suback_header[0]), | |
} | |
let mut remaining_length : [u8; 1] = [0]; | |
stream.read(&mut remaining_length).unwrap(); | |
println!("Remaining length (bytes) of SUBACK packet (2 + number of topics subscribed to): {}.", remaining_length[0]); | |
match stream.read_u16::<BigEndian>().unwrap() { | |
10u16 => println!("Correct packet ID from SUBACK."), | |
id => panic!("Wrong packet ID from SUBACK: {:b}.", id), | |
} | |
// should loop for remaining_length - 2 | |
let mut suback_return : [u8; 1] = [0]; | |
stream.read(&mut suback_return).unwrap(); | |
match suback_return[0] { | |
0 => println!("Subbed to topic with QoS 0."), | |
1 => println!("Subbed to topic with QoS 1."), | |
2 => println!("Subbed to topic with QoS 2."), | |
0x80 => println!("Failed to sub to topic."), | |
_ => panic!("Invalid suback return code: {:x}", suback_return[0]), | |
} | |
loop { | |
let mut nextPacket : [u8; 1] = [0]; | |
match stream.read(&mut nextPacket) { | |
Ok(n) if n > 0 => println!("Received byte: {:}", nextPacket[0] as char), | |
Ok(..) => {}, | |
Err(e) => println!("Couldn't read from TcpStream due to error: {}.", e), | |
} | |
} | |
disconnect(&mut stream); | |
stream.shutdown(Shutdown::Both); | |
} | |
fn fixed_header_packet(packet_type : u8) -> u8 { | |
let packet_flags = match packet_type { | |
PUBLISH => 0b0000, // we're not ready for this yet so just return 0 | |
PUBREL | SUBSCRIBE | UNSUBSCRIBE => 0b0010, | |
_ => 0b000, | |
}; | |
let merged_type : u8 = (packet_type << 4) | packet_flags; | |
merged_type | |
} | |
fn connect(stream : &mut TcpStream, client_id : &'static str) { | |
let mqtt : &'static str = "MQTT"; | |
// Write the CONNECT header packet | |
stream.write(&[fixed_header_packet(CONNECT)]).unwrap(); | |
let mut length = 2 + 4 + 1 + 1 + 2; | |
length += 2 + client_id.len(); | |
// write the variable length | |
stream.write(&[length as u8]).unwrap(); | |
// write the protocol name | |
stream.write_u16::<BigEndian>(mqtt.len() as u16).unwrap(); | |
stream.write(mqtt.as_bytes()).unwrap(); | |
// write the protocol level | |
stream.write(&[4 as u8]).unwrap(); | |
// write the connect flags | |
stream.write(&[0b00000010 as u8]).unwrap(); | |
// write the keep_alive length | |
stream.write_u16::<BigEndian>(30 as u16).unwrap(); | |
// write the client id | |
stream.write_u16::<BigEndian>(client_id.len() as u16).unwrap(); | |
stream.write(client_id.as_bytes()).unwrap(); | |
} | |
fn subscribe(stream: &mut TcpStream, topic : &'static str) { | |
stream.write(&[fixed_header_packet(SUBSCRIBE)]).unwrap(); | |
// length of the variable header (2) + topic length + topic length bytes (2) + QoS | |
let length = 2 + topic.len() + 2 + 1; | |
stream.write(&[length as u8]).unwrap(); | |
// variable header with a packet identifier of 10 (which shouldn't really be hardcoded) | |
let variable_header = 10u16; | |
stream.write_u16::<BigEndian>(variable_header).unwrap(); | |
// Repeat these three writes for the number of topics being subscribed to | |
stream.write_u16::<BigEndian>(topic.len() as u16).unwrap(); | |
stream.write(topic.as_bytes()).unwrap(); | |
// QoS | |
stream.write(&[0b00000000 as u8]).unwrap(); | |
} | |
fn disconnect(stream : &mut TcpStream) { | |
stream.write(&[0b11100000 as u8]).unwrap(); | |
stream.write(&[0b00000000 as u8]).unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment