Skip to content

Instantly share code, notes, and snippets.

@valsteen
Last active September 12, 2022 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save valsteen/103aac191afa881d88829bb9e3699784 to your computer and use it in GitHub Desktop.
Save valsteen/103aac191afa881d88829bb9e3699784 to your computer and use it in GitHub Desktop.
Self-feeding processing stream proof of concept
[package]
name = "processing_stream"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.24"
tokio = { version = "1.21.0", features = ["full"] }
derive_more = { version = "0.99.17", features = ["deref", "deref_mut"] }
serde_json = { version = "1.0.85" }
[[bin]]
name = "processing_stream"
path = "processing_stream.rs"
/// This demonstrates a processing queue that can send back elements to the queue, using
/// streams https://docs.rs/futures/latest/futures/stream/trait.Stream.html
///
/// It uses an unbounded channel that is seeded by the starting tasks, and the sender
/// is passed along the element inside the channel. The channel remains open
/// as long as there are references to the sender.
///
/// Once the queue is empty and the processor doesn't send back a task, the sender reference
/// count is zero and the channel closes, allowing the receiver to close and exit the loop once
/// it is empty.
///
/// Concurrency limit is controlled with flatten_unordered
use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::time::{Duration, SystemTime};
use tokio::time::sleep;
use derive_more::{Deref, DerefMut};
use serde_json::{json, Value};
#[tokio::main]
async fn main() {
let max_concurrency = 10;
let mut tree = json!([
1,
2,
[3, [4, 5, [6, 7]], 8],
[9, 10],
11,
12,
13,
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]]
]);
// Simple wrapper to allow channels that send their own sender type
// https://doc.rust-lang.org/reference/types.html#recursive-types
// "Recursive types must include a nominal type in the recursion"
#[derive(Clone, Deref, DerefMut)]
struct SenderWrapper<T>(UnboundedSender<(T, SenderWrapper<T>)>);
let (sender, receiver) = unbounded();
// Seed the channel with the root-level elements.
// The sender is dropped from the main loop, and references to it only exists as
// items waiting in the channel.
// Once all items are consumed, this will close the channel.
{
let mut sender = SenderWrapper(sender);
for item in tree.as_array_mut().unwrap().drain(..) {
let sender_clone = sender.clone();
sender.send((item, sender_clone)).await.unwrap();
}
}
let start_time = SystemTime::now();
// processing loop. receiver can receive back tasks sent from the processing function
receiver
.for_each_concurrent(max_concurrency, move |(value, mut sender)| async move {
if let Some(next) = process(value).await {
for item in next {
let sender_clone = sender.clone();
sender.send((item, sender_clone)).await.unwrap()
}
}
})
.await;
let duration = start_time.elapsed().unwrap().as_secs_f32();
println!("Finished in {duration:.2} seconds");
}
async fn process(value: Value) -> Option<Vec<Value>> {
match value {
Value::Number(n) => {
println!(">> Processing final value {n}");
sleep(Duration::from_secs(25 - n.as_u64().unwrap())).await;
println!(">> Processed final value {n}");
None
}
Value::Array(a) => {
let len = a.len();
println!("@@ got list of {len}, rescheduling them");
sleep(Duration::from_secs(len as u64)).await;
Some(a)
}
_ => unreachable!(),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment