Last active
May 16, 2022 08:40
-
-
Save songzhi/70738652a353854824b16c05a07c9347 to your computer and use it in GitHub Desktop.
sensor reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use axum::{ | |
extract::ws::{Message, WebSocket, WebSocketUpgrade}, | |
response::IntoResponse, | |
routing::get, | |
Router, | |
}; | |
use once_cell::sync::Lazy; | |
use std::{future::Future, net::SocketAddr, time::Duration}; | |
use tokio::{ | |
io::{AsyncReadExt, AsyncWriteExt}, | |
sync::watch, | |
}; | |
use tokio_serial::SerialStream; | |
#[derive(Debug, Clone, Copy, Default)] | |
struct DataItem { | |
temperature: f32, | |
humidity: f32, | |
} | |
impl DataItem { | |
fn new(temperature: f32, humidity: f32) -> Self { | |
Self { | |
temperature, | |
humidity, | |
} | |
} | |
} | |
/// 负责定时读取传感器数据并广播给客户端 | |
struct SensorReader { | |
tx: watch::Sender<DataItem>, | |
port: Option<SerialStream>, | |
} | |
impl SensorReader { | |
fn create() -> watch::Receiver<DataItem> { | |
let (tx, rx) = watch::channel(DataItem::default()); | |
tokio::spawn(Self { tx, port: None }.main_loop()); | |
rx | |
} | |
async fn main_loop(mut self) { | |
loop { | |
// 尝试读取传感器数据 | |
if let Some(data) = self.read_data().await { | |
// 将数据广播给客户端 | |
self.tx.send(data).unwrap(); | |
} else { | |
// 读取失败,等待下一次重新打开串口 | |
self.port.take(); | |
} | |
// 等待一秒 | |
tokio::time::sleep(Duration::from_millis(1000)).await; | |
} | |
} | |
/// 获取串口名 | |
async fn port_name() -> Option<String> { | |
tokio::task::spawn_blocking(|| Some(tokio_serial::available_ports().ok()?.pop()?.port_name)) | |
.await | |
.ok() | |
.flatten() | |
} | |
/// 拿到串口 | |
async fn port(&mut self) -> &mut SerialStream { | |
// 如果串口已经存在,直接返回 | |
if self.port.is_some() { | |
return self.port.as_mut().unwrap(); | |
} | |
// 如果串口不存在,尝试打开,如果打开失败,等待一段时间后重试 | |
self.port.replace( | |
Self::try_until_success(|| async { | |
let port_name = Self::port_name().await?; | |
tokio::task::spawn_blocking(|| { | |
SerialStream::open(&tokio_serial::new(port_name, 9600)).ok() | |
}) | |
.await | |
.ok() | |
.flatten() | |
}) | |
.await, | |
); | |
self.port.as_mut().unwrap() | |
} | |
async fn read_data(&mut self) -> Option<DataItem> { | |
// 每次要发送的数据 | |
static DATA_FOR_SEND: &[u8; 8] = &[1, 3, 0, 0, 0, 2, 0xC4, 0x0B]; | |
// 拿到串口,一直等到拿到为止 | |
let port = self.port().await; | |
// 发送数据 | |
port.write_all(DATA_FOR_SEND).await.ok()?; | |
// 读取数据 | |
let mut buf = [0u8; 9]; | |
port.read_exact(&mut buf).await.ok()?; | |
// 从数据中解析出温湿度 | |
let humidity = ((buf[3] as u16) << 8) | (buf[4] as u16); | |
let temperature = ((buf[5] as u16) << 8) | (buf[6] as u16); | |
Some(DataItem::new( | |
temperature as f32 / 10.0, | |
humidity as f32 / 10.0, | |
)) | |
} | |
async fn try_until_success<T, Fut, F>(f: F) -> T | |
where | |
Fut: Future<Output = Option<T>>, | |
F: Fn() -> Fut, | |
{ | |
loop { | |
if let Some(result) = f().await { | |
return result; | |
} | |
tokio::time::sleep(Duration::from_secs(1)).await; | |
} | |
} | |
} | |
/// 传感器数据接收端 | |
static SENSOR_READER_RECEIVER: Lazy<watch::Receiver<DataItem>> = Lazy::new(SensorReader::create); | |
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse { | |
println!("New connection"); | |
ws.on_upgrade(handle_socket) | |
} | |
/// 处理 WebSocket 连接 | |
async fn handle_socket(mut socket: WebSocket) { | |
let mut rx = SENSOR_READER_RECEIVER.clone(); | |
// 开始循环读取数据 | |
while rx.changed().await.is_ok() { | |
// 拿到最新数据 | |
let DataItem { | |
temperature, | |
humidity, | |
} = *rx.borrow(); | |
// 将数据发送给客户端 | |
let res = socket | |
.send(Message::Text(format!("{} {}\n", temperature, humidity))) | |
.await; | |
// 如果发送失败,则关闭 WebSocket 连接 | |
if res.is_err() { | |
return; | |
} | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let app = Router::new().route("/ws", get(ws_handler)); | |
let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); | |
axum::Server::bind(&addr) | |
.serve(app.into_make_service()) | |
.await | |
.unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment