Skip to content

Instantly share code, notes, and snippets.

@peterschwarz
Forked from rust-play/playground.rs
Last active February 20, 2022 20:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peterschwarz/ed27da74894118c043f183df52217244 to your computer and use it in GitHub Desktop.
Save peterschwarz/ed27da74894118c043f183df52217244 to your computer and use it in GitHub Desktop.
A priority channel based on the standard MPSC channels
use std::cell::RefCell;
use std::cmp::Ord;
use std::collections::BinaryHeap;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
pub struct PriorityReceiver<T>
where
T: Ord,
{
internal: Receiver<T>,
received: RefCell<BinaryHeap<T>>,
}
impl<T> PriorityReceiver<T>
where
T: Ord,
{
fn new(internal: Receiver<T>) -> Self {
Self {
internal,
received: RefCell::new(BinaryHeap::new()),
}
}
pub fn recv(&self) -> Result<T, RecvError> {
let mut heap = self.received.borrow_mut();
heap.extend(self.internal.try_iter());
if !heap.is_empty() {
return Ok(heap.pop().unwrap());
}
self.internal.recv()
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
let mut heap = self.received.borrow_mut();
heap.extend(self.internal.try_iter());
if !heap.is_empty() {
return Ok(heap.pop().unwrap());
}
self.internal.try_recv()
}
pub fn iter(&self) -> Iter<T> {
Iter { rx: self }
}
pub fn try_iter(&self) -> TryIter<T> {
TryIter{ rx: self }
}
}
pub struct Iter<'a, T>
where
T: Ord + 'a,
{
rx: &'a PriorityReceiver<T>
}
impl<'a, T> Iterator for Iter<'a, T>
where
T: Ord + 'a,
{
type Item = T;
fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
}
pub struct TryIter<'a, T>
where
T: Ord + 'a,
{
rx: &'a PriorityReceiver<T>
}
impl<'a, T> Iterator for TryIter<'a, T>
where
T: Ord + 'a,
{
type Item = T;
fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
}
pub fn priority_channel<T>() -> (Sender<T>, PriorityReceiver<T>)
where
T: Ord,
{
let (sender, internal) = channel();
(sender, PriorityReceiver::new(internal))
}
#[cfg(test)]
mod test {
use super::*;
use std::cmp::{Ord, Ordering, PartialOrd};
#[test]
fn priority_recv() {
let (tx, rx) = priority_channel();
tx.send(1).unwrap();
tx.send(5).unwrap();
tx.send(2).unwrap();
assert_eq!(Ok(5), rx.recv());
assert_eq!(Ok(2), rx.recv());
tx.send(6).unwrap();
assert_eq!(Ok(6), rx.recv());
assert_eq!(Ok(1), rx.recv());
}
#[test]
fn priority_iter() {
let rx = {
let (tx, rx) = priority_channel();
tx.send(1).unwrap();
tx.send(10).unwrap();
tx.send(100).unwrap();
rx
};
assert_eq!(vec![100, 10, 1], rx.iter().collect::<Vec<_>>());
}
#[test]
fn priority_enum() {
let (tx, rx) = priority_channel();
tx.send(Event::Byte(1)).unwrap();
tx.send(Event::Short(5)).unwrap();
tx.send(Event::Int(2)).unwrap();
assert_eq!(Ok(Event::Byte(1)), rx.recv());
assert_eq!(Ok(Event::Short(5)), rx.recv());
tx.send(Event::Stop).unwrap();
assert_eq!(Ok(Event::Stop), rx.recv());
assert_eq!(Ok(Event::Int(2)), rx.recv());
}
#[derive(Debug, PartialEq, Eq)]
enum Event {
Byte(i8),
Short(i16),
Int(i32),
Stop
}
impl PartialOrd for Event {
fn partial_cmp(&self, other: &Event) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Event {
fn cmp(&self, other: &Event) -> Ordering {
if self == &Event::Stop {
Ordering::Greater
} else if other == &Event::Stop {
Ordering::Less
} else {
Ordering::Equal
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment