Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created May 13, 2023 14:02
Show Gist options
  • Save rust-play/ade87888d3c546c981005e64e0b77ecf to your computer and use it in GitHub Desktop.
Save rust-play/ade87888d3c546c981005e64e0b77ecf to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
use futures::stream::StreamExt;
use std::{sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
#[tokio::main]
async fn main() {
// construct your sensor up to this point
let handle = spawn_thing().await;
// use the handle to speak with the event task
let double = handle.double(4).await;
println!("4 doubled is {double}");
tokio::time::sleep(Duration::from_millis(250)).await;
let double = handle.double(325).await;
println!("325 doubled is {double}");
// another communication
handle.stop().await;
}
async fn spawn_thing() -> Handle {
// create your sensor type that wraps btleplug
let sensor = Sensor::default();
let bt_sensor = Arc::new(sensor);
// this could be used in the event task to access your sensor type
let _ev_sensor = Arc::clone(&bt_sensor);
// channel to send bluetooth data
let (bt_tx, mut bt_rx) = mpsc::channel(128);
// collect bluetooth data in the background
println!("spawning bluetooth task");
tokio::task::spawn(async move {
let mut notifications = bt_sensor.notifications();
while let Some(data) = notifications.next().await {
println!("sending data");
// send data over a channel
let Ok(_) = bt_tx.send(data).await else { break };
}
});
// listen for events and bluetooth data. select! on the channels
// so no data is lost from cancelled futures which could(?) happen
// if the notifications.next() from btleplug is cancelled.
println!("spawning event task");
let (event_tx, mut event_rx) = mpsc::channel(4);
tokio::task::spawn(async move {
loop {
tokio::select! {
// handle bluetooth data
Some(data) = bt_rx.recv() => {
println!("received bluetooth data: {data}");
}
// handle events
Some(event) = event_rx.recv() => {
match event {
Event::Stop => break,
Event::DoubleValue { initial, ret } => {
let _ = ret.send(initial * 2);
}
}
}
// both channels have closed
else => {
break;
}
}
}
});
Handle { event_tx }
}
// types of events to send to the event task. Abstract away behind
// the Handle type. *not user facing*
enum Event {
Stop,
DoubleValue {
initial: usize,
ret: oneshot::Sender<usize>,
},
}
// Your sensor type that wraps btleplug
#[derive(Default)]
struct Sensor {}
impl Sensor {
// made up NotificationStream presumably from btleplug
fn notifications(&self) -> btlemock::NotificationStream {
btlemock::NotificationStream::default()
}
}
// user facing struct to communicate with the sensor
struct Handle {
event_tx: mpsc::Sender<Event>,
}
impl Handle {
// stop the sensor
async fn stop(self) {
println!("========= stopping sensor ==========");
let _ = self.event_tx.send(Event::Stop).await;
}
// send the DoubleValue command
async fn double(&self, initial: usize) -> usize {
println!("========= doubling command to sensor ==========");
let (tx, rx) = oneshot::channel();
let event = Event::DoubleValue { initial, ret: tx };
let _ = self.event_tx.send(event).await;
let last = rx.await.unwrap();
last
}
}
// mocking of btleplug
mod btlemock {
use futures::stream::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tokio::time;
use tokio::time::Instant;
use tokio::time::Sleep;
pub struct NotificationStream {
sleep: Pin<Box<Sleep>>,
count: usize,
}
impl Default for NotificationStream {
fn default() -> Self {
Self {
sleep: Box::pin(time::sleep(Duration::from_millis(25))),
count: 0,
}
}
}
impl Stream for NotificationStream {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.sleep.as_mut().poll(cx) {
Poll::Ready(_) => {
self.sleep
.as_mut()
.reset(Instant::now() + Duration::from_millis(25));
self.count += 1;
Poll::Ready(Some(self.count - 1))
}
Poll::Pending => Poll::Pending,
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment