Skip to content

Instantly share code, notes, and snippets.

@flyaruu
Last active November 26, 2023 22:48
Show Gist options
  • Save flyaruu/11c7979a4a9d6649e2cbe853bf0cf52c to your computer and use it in GitHub Desktop.
Save flyaruu/11c7979a4a9d6649e2cbe853bf0cf52c to your computer and use it in GitHub Desktop.
Expanded example
use std::{
net::{Ipv4Addr, SocketAddr},
time::Duration, str::from_utf8,
};
use embedded_io_adapters::tokio_1::FromTokio;
use rust_mqtt::{
client::{client::MqttClient, client_config::ClientConfig},
packet::v5::reason_codes::ReasonCode,
utils::rng_generator::CountingRng,
};
use tokio::{net::TcpStream, join};
async fn send_10() {
let addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1883);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)
.unwrap();
let connection = FromTokio::<TcpStream>::new(connection);
let mut config = ClientConfig::new(
rust_mqtt::client::client_config::MqttVersion::MQTTv5,
CountingRng(20000),
);
config.add_max_subscribe_qos(rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1);
config.add_client_id("client");
// config.add_username(USERNAME);
// config.add_password(PASSWORD);
config.max_packet_size = 200;
let mut recv_buffer = [0; 220];
let mut write_buffer = [0; 220];
let mut client = MqttClient::<_, 5, _>::new(
connection,
&mut write_buffer,
80,
&mut recv_buffer,
80,
config,
);
client.connect_to_broker().await.unwrap();
for i in 0..10 {
println!("Message: {}",i);
let result = client
.send_message(
"hello",
b"hello2",
rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS0,
true,
)
.await;
match result {
Ok(_) => println!("done"),
Err(e) => println!("error: {}",e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn receive_10() {
let addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1883);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)
.unwrap();
let connection = FromTokio::<TcpStream>::new(connection);
let mut config = ClientConfig::new(
rust_mqtt::client::client_config::MqttVersion::MQTTv5,
CountingRng(20000),
);
config.add_max_subscribe_qos(rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1);
config.add_client_id("client");
// config.add_username(USERNAME);
// config.add_password(PASSWORD);
config.max_packet_size = 200;
let mut recv_buffer = [0; 220];
let mut write_buffer = [0; 220];
let mut client = MqttClient::<_, 5, _>::new(
connection,
&mut write_buffer,
80,
&mut recv_buffer,
80,
config,
);
client.connect_to_broker().await.unwrap();
for _ in 0..10 {
let result = client
.receive_message()
.await;
match result {
Ok((topic,msg)) => {
let string = from_utf8(msg).unwrap();
println!("RECEIVED: {} from topic: {}",string,topic);
},
Err(e) => println!("error: {}",e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let t1 = tokio::spawn(async {
send_10().await;
});
let t2 = tokio::spawn(async {
receive_10().await;
});
let (r1, r2) = join!(t1,t2);
r1.unwrap();
r2.unwrap();
println!("done!");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment