Skip to content

Instantly share code, notes, and snippets.

@reportingsjr
Created April 15, 2017 16:23
Show Gist options
  • Save reportingsjr/314d418e3870370f8ceaba7c6b751d7b to your computer and use it in GitHub Desktop.
Save reportingsjr/314d418e3870370f8ceaba7c6b751d7b to your computer and use it in GitHub Desktop.
Rust TcpStream issues
use std::io::prelude::*;
use std::net::{TcpStream, Shutdown};
extern crate byteorder;
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
#[macro_use]
extern crate bitflags;
// Packet types
const CONNECT : u8 = 1;
const CONNACK : u8 = 2;
const PUBLISH : u8 = 3;
const PUBACK : u8 = 4;
const PUBREC : u8 = 5;
const PUBREL : u8 = 6;
const PUBCOMP : u8 = 7;
const SUBSCRIBE : u8 = 8;
const SUBACK : u8 = 9;
const UNSUBSCRIBE : u8 = 10;
const UNSUBACK : u8 = 11;
const PINGREQ : u8 = 12;
const PINGRESP : u8 = 13;
const DISCONNECT : u8 = 14;
struct FixedHeader {
packet_type : u8,
remaining_length : u8,
}
fn main() {
let mut stream = TcpStream::connect("127.0.0.1:1883").unwrap();
let client_id : &'static str = "test_client";
connect(&mut stream, client_id);
let mut connack_buffer : [u8; 4] = [0; 4];
stream.read(&mut connack_buffer[..]).unwrap();
match connack_buffer[0] {
0x20 => (),
_ => panic!("Packet directly after connection attempt was not a CONNACK. It was {:x}.", connack_buffer[0]),
}
match connack_buffer[3] {
0x00 => println!("Recieved a CONNACK"), //connection accepted
0x01 => panic!("Unacceptable protocol version"),
0x02 => panic!("Identifier rejected"),
0x03 => panic!("Server unavailable"),
0x04 => panic!("bad username or password"),
0x05 => panic!("not authorized"),
_ => panic!("Invalid connect return code."),
}
// SUBSCRIBE packet
subscribe(&mut stream, "#");
let mut suback_header : [u8; 1] = [0; 1];
stream.read(&mut suback_header).unwrap();
match suback_header[0] {
0b10010000 => println!("Received a SUBACK."),
_ => panic!("Packet after SUBSCRIBE was not a SUBACK. It was {:x}", suback_header[0]),
}
let mut remaining_length : [u8; 1] = [0];
stream.read(&mut remaining_length).unwrap();
println!("Remaining length (bytes) of SUBACK packet (2 + number of topics subscribed to): {}.", remaining_length[0]);
match stream.read_u16::<BigEndian>().unwrap() {
10u16 => println!("Correct packet ID from SUBACK."),
id => panic!("Wrong packet ID from SUBACK: {:b}.", id),
}
// should loop for remaining_length - 2
let mut suback_return : [u8; 1] = [0];
stream.read(&mut suback_return).unwrap();
match suback_return[0] {
0 => println!("Subbed to topic with QoS 0."),
1 => println!("Subbed to topic with QoS 1."),
2 => println!("Subbed to topic with QoS 2."),
0x80 => println!("Failed to sub to topic."),
_ => panic!("Invalid suback return code: {:x}", suback_return[0]),
}
loop {
let mut nextPacket : [u8; 1] = [0];
match stream.read(&mut nextPacket) {
Ok(n) if n > 0 => println!("Received byte: {:}", nextPacket[0] as char),
Ok(..) => {},
Err(e) => println!("Couldn't read from TcpStream due to error: {}.", e),
}
}
disconnect(&mut stream);
stream.shutdown(Shutdown::Both);
}
fn fixed_header_packet(packet_type : u8) -> u8 {
let packet_flags = match packet_type {
PUBLISH => 0b0000, // we're not ready for this yet so just return 0
PUBREL | SUBSCRIBE | UNSUBSCRIBE => 0b0010,
_ => 0b000,
};
let merged_type : u8 = (packet_type << 4) | packet_flags;
merged_type
}
fn connect(stream : &mut TcpStream, client_id : &'static str) {
let mqtt : &'static str = "MQTT";
// Write the CONNECT header packet
stream.write(&[fixed_header_packet(CONNECT)]).unwrap();
let mut length = 2 + 4 + 1 + 1 + 2;
length += 2 + client_id.len();
// write the variable length
stream.write(&[length as u8]).unwrap();
// write the protocol name
stream.write_u16::<BigEndian>(mqtt.len() as u16).unwrap();
stream.write(mqtt.as_bytes()).unwrap();
// write the protocol level
stream.write(&[4 as u8]).unwrap();
// write the connect flags
stream.write(&[0b00000010 as u8]).unwrap();
// write the keep_alive length
stream.write_u16::<BigEndian>(30 as u16).unwrap();
// write the client id
stream.write_u16::<BigEndian>(client_id.len() as u16).unwrap();
stream.write(client_id.as_bytes()).unwrap();
}
fn subscribe(stream: &mut TcpStream, topic : &'static str) {
stream.write(&[fixed_header_packet(SUBSCRIBE)]).unwrap();
// length of the variable header (2) + topic length + topic length bytes (2) + QoS
let length = 2 + topic.len() + 2 + 1;
stream.write(&[length as u8]).unwrap();
// variable header with a packet identifier of 10 (which shouldn't really be hardcoded)
let variable_header = 10u16;
stream.write_u16::<BigEndian>(variable_header).unwrap();
// Repeat these three writes for the number of topics being subscribed to
stream.write_u16::<BigEndian>(topic.len() as u16).unwrap();
stream.write(topic.as_bytes()).unwrap();
// QoS
stream.write(&[0b00000000 as u8]).unwrap();
}
fn disconnect(stream : &mut TcpStream) {
stream.write(&[0b11100000 as u8]).unwrap();
stream.write(&[0b00000000 as u8]).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment