Skip to content

Instantly share code, notes, and snippets.

@Heasummn
Created March 24, 2020 22:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Heasummn/1576f1a7efba2b6cad0dd0dbf5b8c57a to your computer and use it in GitHub Desktop.
Save Heasummn/1576f1a7efba2b6cad0dd0dbf5b8c57a to your computer and use it in GitHub Desktop.
impl Broker {
pub fn new() -> Broker {
Broker {
state: Arc::new(Mutex::new(BrokerState::new()))
}
}
pub async fn start_server(self, addr: IpAddr, port: u16) -> Result<(), Box<dyn Error>> {
let address = SocketAddr::new(addr, port);
let mut listener = TcpListener::bind(address).await?;
let state = Arc::clone(&self.state);
loop {
let (stream, _) = listener.accept().await?;
println!("New connection: {}", stream.peer_addr().unwrap());
let state_inner = Arc::clone(&state);
tokio::spawn(async move {
// connection succeeded
Broker::handle_client(state_inner, stream).await;
});
}
}
async fn handle_client(state: Arc<Mutex<BrokerState>>, stream: TcpStream) -> Result<(), Box<dyn Error>> {
let mut packets = Framed::new(stream, Codec::new());
let connect = match packets.next().await {
Some(Ok(Connect(packet))) => {
packets.send(ConnectAck{session_present: false, return_code: ConnectionAccepted}).await;
packet
},
_ => {
println!("Did not receive connect packet");
return Ok(());
}
};
println!("{:#?}", connect);
while let Some(Ok(packet)) = packets.next().await {
match packet {
Disconnect => return Ok(()),
PingRequest => {
println!("Ping");
packets.send(PingResponse).await;
},
Subscribe {packet_id: id, ..} => {
println!("{:#?}", packet);
packets.send(SubscribeAck {packet_id: id, status: vec!(mqtt_codec::SubscribeReturnCode::Success(mqtt_codec::QoS::ExactlyOnce))}).await;
}
_ => {
}
}
}
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment