Skip to content

Instantly share code, notes, and snippets.

@songzhi
Last active May 16, 2022 08:40
Show Gist options
  • Save songzhi/70738652a353854824b16c05a07c9347 to your computer and use it in GitHub Desktop.
Save songzhi/70738652a353854824b16c05a07c9347 to your computer and use it in GitHub Desktop.
sensor reader
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
Router,
};
use once_cell::sync::Lazy;
use std::{future::Future, net::SocketAddr, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::watch,
};
use tokio_serial::SerialStream;
#[derive(Debug, Clone, Copy, Default)]
struct DataItem {
temperature: f32,
humidity: f32,
}
impl DataItem {
fn new(temperature: f32, humidity: f32) -> Self {
Self {
temperature,
humidity,
}
}
}
/// 负责定时读取传感器数据并广播给客户端
struct SensorReader {
tx: watch::Sender<DataItem>,
port: Option<SerialStream>,
}
impl SensorReader {
fn create() -> watch::Receiver<DataItem> {
let (tx, rx) = watch::channel(DataItem::default());
tokio::spawn(Self { tx, port: None }.main_loop());
rx
}
async fn main_loop(mut self) {
loop {
// 尝试读取传感器数据
if let Some(data) = self.read_data().await {
// 将数据广播给客户端
self.tx.send(data).unwrap();
} else {
// 读取失败,等待下一次重新打开串口
self.port.take();
}
// 等待一秒
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
/// 获取串口名
async fn port_name() -> Option<String> {
tokio::task::spawn_blocking(|| Some(tokio_serial::available_ports().ok()?.pop()?.port_name))
.await
.ok()
.flatten()
}
/// 拿到串口
async fn port(&mut self) -> &mut SerialStream {
// 如果串口已经存在,直接返回
if self.port.is_some() {
return self.port.as_mut().unwrap();
}
// 如果串口不存在,尝试打开,如果打开失败,等待一段时间后重试
self.port.replace(
Self::try_until_success(|| async {
let port_name = Self::port_name().await?;
tokio::task::spawn_blocking(|| {
SerialStream::open(&tokio_serial::new(port_name, 9600)).ok()
})
.await
.ok()
.flatten()
})
.await,
);
self.port.as_mut().unwrap()
}
async fn read_data(&mut self) -> Option<DataItem> {
// 每次要发送的数据
static DATA_FOR_SEND: &[u8; 8] = &[1, 3, 0, 0, 0, 2, 0xC4, 0x0B];
// 拿到串口,一直等到拿到为止
let port = self.port().await;
// 发送数据
port.write_all(DATA_FOR_SEND).await.ok()?;
// 读取数据
let mut buf = [0u8; 9];
port.read_exact(&mut buf).await.ok()?;
// 从数据中解析出温湿度
let humidity = ((buf[3] as u16) << 8) | (buf[4] as u16);
let temperature = ((buf[5] as u16) << 8) | (buf[6] as u16);
Some(DataItem::new(
temperature as f32 / 10.0,
humidity as f32 / 10.0,
))
}
async fn try_until_success<T, Fut, F>(f: F) -> T
where
Fut: Future<Output = Option<T>>,
F: Fn() -> Fut,
{
loop {
if let Some(result) = f().await {
return result;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
/// 传感器数据接收端
static SENSOR_READER_RECEIVER: Lazy<watch::Receiver<DataItem>> = Lazy::new(SensorReader::create);
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
println!("New connection");
ws.on_upgrade(handle_socket)
}
/// 处理 WebSocket 连接
async fn handle_socket(mut socket: WebSocket) {
let mut rx = SENSOR_READER_RECEIVER.clone();
// 开始循环读取数据
while rx.changed().await.is_ok() {
// 拿到最新数据
let DataItem {
temperature,
humidity,
} = *rx.borrow();
// 将数据发送给客户端
let res = socket
.send(Message::Text(format!("{} {}\n", temperature, humidity)))
.await;
// 如果发送失败,则关闭 WebSocket 连接
if res.is_err() {
return;
}
}
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/ws", get(ws_handler));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment