Skip to content

Instantly share code, notes, and snippets.

@rust-play
Created March 5, 2025 10:53
Code shared from the Rust Playground
use bytes::Bytes;
use destream::{FromStream, SeqAccess};
use futures_util::{stream, StreamExt as FUStreamExt};
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
struct Example {
name: String,
size: String,
}
impl FromStream for Example {
type Context = mpsc::Sender<Example>;
async fn from_stream<D: destream::de::Decoder>(context: Self::Context, decoder: &mut D) -> Result<Self, D::Error> {
decoder.decode_any(ExampleVisitor { context }).await
}
}
struct ExampleVisitor {
context: mpsc::Sender<Example>
}
impl destream::de::Visitor for ExampleVisitor {
type Value = Example;
fn expecting() -> &'static str {
"an Example"
}
async fn visit_map<A: destream::de::MapAccess>(self, mut access: A) -> Result<Self::Value, A::Error> {
let mut example = Example{ name: "".to_string(), size: "".to_string() };
while let Some(key) = access.next_key::<String>(()).await? {
match key.as_str() {
"name" => {
example.name = access.next_value::<String>(()).await?;
},
"size" => {
example.size = access.next_value::<String>(()).await?;
},
_ => {
println!("Unknown key: {}", key);
}
}
}
println!("Mapped example {:?}", example);
self.context.send(example).await.unwrap();
Ok(Example {
name: "Invalid: This was streamed to the context.".to_string(),
size: "Invalid: This was streamed to the context.".to_string(),
})
}
async fn visit_seq<A: SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
println!("visit_seq");
loop {
match seq.next_element::<Example>(self.context.clone()).await? {
Some(example) => {
println!("Got example {:?}", example);
}
None => {
break;
}
}
}
Ok(Example {
name: "Invalid: This was streamed to the context.".to_string(),
size: "Invalid: This was streamed to the context.".to_string(),
})
}
}
#[tokio::main]
async fn main() {
let example = r#"
[
{ "name": "cartman", "size": "festively plump" },
{ "name": "rejected", "size": "fat and sassy" }
]
"#;
let stream = FUStreamExt::map(stream::iter(example.bytes().into_iter().clone()).chunks(10), Bytes::from);
let (sender, mut receiver) = mpsc::channel::<Example>(32);
tokio::spawn(async move {
let example: Example = destream_json::decode(sender, stream).await.unwrap();
println!("Done with useless example because I'm bad at rust: {:?}", example)
});
while let Some(example) = receiver.recv().await {
println!("Received example from channel {:?}", example);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment