Skip to content

Instantly share code, notes, and snippets.

@rrichardson
Created January 2, 2018 07:57
Show Gist options
  • Save rrichardson/c74bf33c7c0d69634f1f2f7ec3940965 to your computer and use it in GitHub Desktop.
Save rrichardson/c74bf33c7c0d69634f1f2f7ec3940965 to your computer and use it in GitHub Desktop.
sender stream termination
extern crate futures;
extern crate time;
extern crate tokio_core;
extern crate tokio_timer;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use time::Duration as TDuration;
use futures::Stream;
use futures::sink::Sink;
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio_core::reactor::Core;
use tokio_timer::Timer;
fn main() {
let running = Arc::new(AtomicBool::new(true));
{
let (mut tx, rx) = mpsc::unbounded::<String>();
let hdl = run(rx, running.clone());
for i in 0..20 {
tx.unbounded_send(format!("msg {}", i)).unwrap();
thread::sleep(Duration::from_millis(200));
}
println!("done sending messages.. dropping tx");
tx.close().unwrap();
let _ = tx;
//running.store(false, Ordering::Relaxed);
hdl.join().unwrap();
}
println!("after join");
}
pub fn run(rx: UnboundedReceiver<String>, running: Arc<AtomicBool>) -> JoinHandle<()> {
let hdl = thread::spawn(move || {
let mut core = Core::new().unwrap();
/*
let timer = Timer::default();
let timeout = timer
.interval(
TDuration::milliseconds(1000)
.to_std()
.expect("Failed to convert to std Duration"),
)
.map(|_| None)
.map_err(|e| e.into()); */
//.select(timeout)
let stream = rx.map(|x| Some(x)).for_each(move |msg: Option<String>| {
println!("received :: {:?}", msg);
if !running.load(Ordering::Relaxed) {
Err(())
} else {
Ok(())
}
});
core.run(stream).expect("core.run errored out");
println!("mq_client stream shut down (presumably due to a closed tx channel)");
});
hdl
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment