Skip to content

Instantly share code, notes, and snippets.

@sigevsky
Last active February 6, 2022 20:28
Show Gist options
  • Save sigevsky/459febd44d0c4b29243269fc0505596e to your computer and use it in GitHub Desktop.
Save sigevsky/459febd44d0c4b29243269fc0505596e to your computer and use it in GitHub Desktop.
use futures_delay_queue::{delay_queue, DelayHandle, ErrorAlreadyExpired};
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::oneshot::Receiver;
use tokio::time::Duration;
struct Lease<A> {
val: A,
dur: Duration,
compl_handle: Receiver<()>,
delay_handle: DelayHandle,
}
impl<A> Lease<A> {
async fn create<F>(a: A, d: Duration, mut f: F) -> Lease<A>
where
F: FnMut() + Send + 'static,
{
let (q, rec) = delay_queue();
let (sx, rx) = oneshot::channel();
let h = q.insert((), d);
tokio::task::spawn(async move {
if let Some(_) = rec.receive().await {
let _ = sx.send(());
f();
};
()
});
Lease {
val: a,
dur: d,
delay_handle: h,
compl_handle: rx,
}
}
fn get(&mut self) -> Option<&A> {
match self.compl_handle.try_recv() {
Err(TryRecvError::Empty) => Some(&self.val),
_ => None,
}
}
async fn lease(self) -> Result<Lease<A>, ErrorAlreadyExpired> {
let nd = self.dur.clone();
self.lease_dur(nd).await
}
async fn lease_dur(self, nd: Duration) -> Result<Lease<A>, ErrorAlreadyExpired> {
let nh = self.delay_handle.reset(nd.clone()).await?;
Ok(Lease {
val: self.val,
dur: nd,
delay_handle: nh,
compl_handle: self.compl_handle,
})
}
}
#[cfg(test)]
mod tests {
use crate::lease::Lease;
use futures_delay_queue::ErrorAlreadyExpired;
use std::borrow::BorrowMut;
use std::sync::{Arc, Mutex};
#[tokio::test]
async fn should_fail_to_get_leased_value() {
let is_completed = Arc::new(Mutex::new(false));
let ac = Arc::clone(&is_completed);
let mut l = Lease::create(42, tokio::time::Duration::from_millis(100), move || {
*ac.lock().unwrap() = true
})
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
assert_eq!(None, l.get());
assert_eq!(true, *is_completed.lock().unwrap());
}
#[tokio::test]
async fn should_lease_and_get() {
let is_completed = Arc::new(Mutex::new(false));
let ac = Arc::clone(&is_completed);
let mut l0 = Lease::create(42, tokio::time::Duration::from_millis(100), move || {
*ac.lock().unwrap() = true
})
.await;
let r0 = l0.get().unwrap();
assert_eq!(42, *r0);
assert_eq!(false, *is_completed.lock().unwrap());
let mut l1 = l0
.lease_dur(tokio::time::Duration::from_millis(200))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let r1 = l1.get().unwrap();
assert_eq!(42, *r1);
assert_eq!(false, *is_completed.lock().unwrap());
}
#[tokio::test]
async fn should_fail_to_lease_expired() {
let is_completed = Arc::new(Mutex::new(false));
let ac = Arc::clone(&is_completed);
let mut l0 = Lease::create(42, tokio::time::Duration::from_millis(100), move || {
*ac.lock().unwrap() = true
})
.await;
let r0 = l0.get().unwrap();
assert_eq!(42, *r0);
assert_eq!(false, *is_completed.lock().unwrap());
tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
assert!(l0
.lease_dur(tokio::time::Duration::from_millis(200))
.await
.is_err())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment