-
-
Save rust-play/ade87888d3c546c981005e64e0b77ecf to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::stream::StreamExt; | |
use std::{sync::Arc, time::Duration}; | |
use tokio::sync::{mpsc, oneshot}; | |
#[tokio::main] | |
async fn main() { | |
// construct your sensor up to this point | |
let handle = spawn_thing().await; | |
// use the handle to speak with the event task | |
let double = handle.double(4).await; | |
println!("4 doubled is {double}"); | |
tokio::time::sleep(Duration::from_millis(250)).await; | |
let double = handle.double(325).await; | |
println!("325 doubled is {double}"); | |
// another communication | |
handle.stop().await; | |
} | |
async fn spawn_thing() -> Handle { | |
// create your sensor type that wraps btleplug | |
let sensor = Sensor::default(); | |
let bt_sensor = Arc::new(sensor); | |
// this could be used in the event task to access your sensor type | |
let _ev_sensor = Arc::clone(&bt_sensor); | |
// channel to send bluetooth data | |
let (bt_tx, mut bt_rx) = mpsc::channel(128); | |
// collect bluetooth data in the background | |
println!("spawning bluetooth task"); | |
tokio::task::spawn(async move { | |
let mut notifications = bt_sensor.notifications(); | |
while let Some(data) = notifications.next().await { | |
println!("sending data"); | |
// send data over a channel | |
let Ok(_) = bt_tx.send(data).await else { break }; | |
} | |
}); | |
// listen for events and bluetooth data. select! on the channels | |
// so no data is lost from cancelled futures which could(?) happen | |
// if the notifications.next() from btleplug is cancelled. | |
println!("spawning event task"); | |
let (event_tx, mut event_rx) = mpsc::channel(4); | |
tokio::task::spawn(async move { | |
loop { | |
tokio::select! { | |
// handle bluetooth data | |
Some(data) = bt_rx.recv() => { | |
println!("received bluetooth data: {data}"); | |
} | |
// handle events | |
Some(event) = event_rx.recv() => { | |
match event { | |
Event::Stop => break, | |
Event::DoubleValue { initial, ret } => { | |
let _ = ret.send(initial * 2); | |
} | |
} | |
} | |
// both channels have closed | |
else => { | |
break; | |
} | |
} | |
} | |
}); | |
Handle { event_tx } | |
} | |
// types of events to send to the event task. Abstract away behind | |
// the Handle type. *not user facing* | |
enum Event { | |
Stop, | |
DoubleValue { | |
initial: usize, | |
ret: oneshot::Sender<usize>, | |
}, | |
} | |
// Your sensor type that wraps btleplug | |
#[derive(Default)] | |
struct Sensor {} | |
impl Sensor { | |
// made up NotificationStream presumably from btleplug | |
fn notifications(&self) -> btlemock::NotificationStream { | |
btlemock::NotificationStream::default() | |
} | |
} | |
// user facing struct to communicate with the sensor | |
struct Handle { | |
event_tx: mpsc::Sender<Event>, | |
} | |
impl Handle { | |
// stop the sensor | |
async fn stop(self) { | |
println!("========= stopping sensor =========="); | |
let _ = self.event_tx.send(Event::Stop).await; | |
} | |
// send the DoubleValue command | |
async fn double(&self, initial: usize) -> usize { | |
println!("========= doubling command to sensor =========="); | |
let (tx, rx) = oneshot::channel(); | |
let event = Event::DoubleValue { initial, ret: tx }; | |
let _ = self.event_tx.send(event).await; | |
let last = rx.await.unwrap(); | |
last | |
} | |
} | |
// mocking of btleplug | |
mod btlemock { | |
use futures::stream::Stream; | |
use std::future::Future; | |
use std::pin::Pin; | |
use std::task::Context; | |
use std::task::Poll; | |
use std::time::Duration; | |
use tokio::time; | |
use tokio::time::Instant; | |
use tokio::time::Sleep; | |
pub struct NotificationStream { | |
sleep: Pin<Box<Sleep>>, | |
count: usize, | |
} | |
impl Default for NotificationStream { | |
fn default() -> Self { | |
Self { | |
sleep: Box::pin(time::sleep(Duration::from_millis(25))), | |
count: 0, | |
} | |
} | |
} | |
impl Stream for NotificationStream { | |
type Item = usize; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
match self.sleep.as_mut().poll(cx) { | |
Poll::Ready(_) => { | |
self.sleep | |
.as_mut() | |
.reset(Instant::now() + Duration::from_millis(25)); | |
self.count += 1; | |
Poll::Ready(Some(self.count - 1)) | |
} | |
Poll::Pending => Poll::Pending, | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment