Skip to content

Instantly share code, notes, and snippets.

@RadicalZephyr
Created August 28, 2019 20:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RadicalZephyr/f4ed10681cb55e61ac5aceef56adf767 to your computer and use it in GitHub Desktop.
Save RadicalZephyr/f4ed10681cb55e61ac5aceef56adf767 to your computer and use it in GitHub Desktop.
[package]
name = "tokio-channel-test"
version = "0.1.0"
authors = ["Geoff Shannon"]
edition = "2018"
[dependencies]
futures-preview = { version = "=0.3.0-alpha.18", features = ["async-await", "compat", "io-compat", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-test = "0.2.0-alpha.2"
#![allow(dead_code)]
use std::{pin::Pin, task::Context, time::Duration};
use futures::{prelude::*, ready, Poll};
use tokio::timer::DelayQueue;
const DEBOUNCE_TIME_MS: u64 = 500;
#[derive(Debug, PartialEq)]
enum Item {
A,
B,
}
struct Debounce<S> {
stream: S,
queue: DelayQueue<Item>,
}
impl<S> Debounce<S>
where
S: Stream<Item = Item>,
{
fn new(stream: S) -> Debounce<S> {
Debounce {
stream,
queue: DelayQueue::new(),
}
}
}
macro_rules! pin {
( $e:expr ) => {
Pin::new(&mut $e)
};
}
impl<S> Stream for Debounce<S>
where
S: Unpin + Stream<Item = Item>,
{
type Item = Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(item) = ready!(pin!(self.stream).poll_next(cx)) {
match item {
Item::A => {
let duration = Duration::from_millis(0);
self.queue.insert(item, duration);
}
Item::B => {
let duration = Duration::from_millis(DEBOUNCE_TIME_MS);
self.queue.insert(item, duration);
}
}
}
match ready!(self.queue.poll_next(cx)) {
Some(Ok(report)) => Poll::Ready(Some(report.into_inner())),
Some(Err(e)) => panic!("at the disco!: {}", e),
None => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task::MockTask};
#[test]
fn test_a() {
clock::mock(|_handle| {
let mut task = MockTask::new();
let (mut tx, rx) = mpsc::channel(1);
let mut debounce = Debounce::new(rx);
let mut send_result = Box::pin(tx.send(Item::A));
assert_ready_ok!(task.poll(pin!(send_result)));
let mut read_next = debounce.next();
let report = assert_ready!(task.poll(pin!(read_next))).unwrap();
assert_eq!(Item::A, report)
});
}
#[test]
fn test_b() {
clock::mock(|handle| {
let mut task = MockTask::new();
let (mut tx, rx) = mpsc::channel(1);
let mut debounce = Debounce::new(rx);
let mut send_result = Box::pin(tx.send(Item::B));
assert_ready_ok!(task.poll(pin!(send_result)));
let mut read_next = debounce.next();
assert_pending!(task.poll(pin!(read_next)));
handle.advance(Duration::from_secs(10));
let report = assert_ready!(task.poll(pin!(read_next))).unwrap();
assert_eq!(Item::A, report)
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment