Skip to content

Instantly share code, notes, and snippets.

@txus
Last active October 7, 2015 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save txus/db972a583cf37fc9a21a to your computer and use it in GitHub Desktop.
Save txus/db972a583cf37fc9a21a to your computer and use it in GitHub Desktop.
Pipe together standard Rust [sync] channels and cool comm::mpmc channels!
extern crate comm;
use std::sync::mpsc::{SyncSender, Receiver, SendError, RecvError};
use comm::{Error as CommError};
use comm::mpmc::bounded::Channel;
use std::thread;
use std::marker::Sized;
pub enum Xor<A, B> {
Left(A),
Right(B)
}
pub type StdChannelError<T> = Xor<RecvError, SendError<T>>;
pub enum PipeError<T> {
StdRecvError(RecvError),
StdSendError(SendError<T>),
Comm(CommError)
}
pub trait BlockingSender<T>: Sized
where T: Send {
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>>;
}
pub trait BlockingReceiver<T>: Sized
where T: Send {
fn blocking_receive(&self) -> Result<T, PipeError<T>>;
}
impl<'a, T> BlockingSender<T> for Channel<'a, T>
where T: Send {
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>> {
self.send_sync(msg).map_err(|(_, err)| PipeError::Comm(err))
}
}
impl<'a, T> BlockingReceiver<T> for Channel<'a, T>
where T: Send {
fn blocking_receive(&self) -> Result<T, PipeError<T>> {
self.recv_sync().map_err(PipeError::Comm)
}
}
impl<T: Send> BlockingSender<T> for SyncSender<T> {
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>> {
self.send(msg).map_err(PipeError::StdSendError)
}
}
impl<T: Send> BlockingReceiver<T> for Receiver<T> {
fn blocking_receive(&self) -> Result<T, PipeError<T>> {
self.recv().map_err(PipeError::StdRecvError)
}
}
pub fn pipe<A, B, T, U, F>(from: A, to: B, f: F) -> thread::JoinHandle<()>
where T: 'static + Send,
U: 'static + Send,
F: Fn(T) -> U + 'static + Send,
A: BlockingReceiver<T> + 'static + Send,
B: BlockingSender<U> + 'static + Send
{
thread::spawn(move || {
loop {
from.blocking_receive()
.map(&f).map_err(Xor::Left)
.and_then(|msg| to.blocking_send(msg).map_err(Xor::Right));
}
})
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::mpsc::{SyncSender, Receiver};
use std::sync::mpsc::sync_channel;
use comm::mpmc::bounded::Channel;
use std::thread;
#[test]
fn it_pipes_together_two_std_channels() {
let (tx1, rx1): (SyncSender<i32>, Receiver<i32>) = sync_channel(1);
let (tx2, rx2): (SyncSender<String>, Receiver<String>) = sync_channel(1);
pipe(rx1, tx2, |n: i32| n.to_string());
thread::spawn(move || {
let _ = tx1.send(42).unwrap();
});
assert_eq!("42", rx2.recv().unwrap());
}
#[test]
fn it_pipes_together_two_comm_channels() {
let chan1: Channel<i32> = Channel::new(1);
let chan2: Channel<String> = Channel::new(1);
let tx = chan1.clone();
let rx = chan2.clone();
pipe(chan1, chan2, |n: i32| n.to_string());
thread::spawn(move || {
let _ = tx.send_sync(42).unwrap();
});
assert_eq!("42", rx.recv_sync().unwrap());
}
#[test]
fn it_pipes_together_an_std_and_a_comm_channel() {
let (tx1, rx1): (SyncSender<i32>, Receiver<i32>) = sync_channel(1);
let chan2: Channel<String> = Channel::new(1);
let rx = chan2.clone();
pipe(rx1, chan2, |n: i32| n.to_string());
thread::spawn(move || {
let _ = tx1.send(42).unwrap();
});
assert_eq!("42", rx.recv_sync().unwrap());
}
#[test]
fn it_pipes_together_a_comm_and_an_std_channel() {
let chan1: Channel<i32> = Channel::new(1);
let (tx2, rx2): (SyncSender<String>, Receiver<String>) = sync_channel(1);
let tx = chan1.clone();
pipe(chan1, tx2, |n: i32| n.to_string());
thread::spawn(move || {
let _ = tx.send_sync(42).unwrap();
});
assert_eq!("42", rx2.recv().unwrap());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment