Skip to content

Instantly share code, notes, and snippets.

@rrichardson
Created January 15, 2018 17:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rrichardson/2ac16dcc65d3bfa57d440ea4dfb75bd0 to your computer and use it in GitHub Desktop.
Save rrichardson/2ac16dcc65d3bfa57d440ea4dfb75bd0 to your computer and use it in GitHub Desktop.
idea for streaming
fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
use futures::sync::mpsc::{ unbounded, UnboundedSender, UnboundedReceiver };
use futures::{ Sink, Future, Stream };
use futures::stream::Sender;
use rusoto_core::reactor::DEFAULT_REACTOR;
use tokio_core::reactor::Core;
let client = Arc::new(KinesisClient::simple(Region::UsWest2));
let data = FauxData::new();
let (tx, rx) = unbounded();
std::thread::spawn(move ||{
let core = Core::new();
let prog = rx.buffer_unordered(50).map_err(Error::from).for_each(|x|{
info!("got an {:?}", x);
Ok(())
});
core.run(prog);
});
for rec in data {
tx = tx.send(rec).expect("failed to send to channel");
info!("zip");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment