/playground.rs Secret
Created
March 5, 2025 10:53
Code shared from the Rust Playground
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use bytes::Bytes; | |
use destream::{FromStream, SeqAccess}; | |
use futures_util::{stream, StreamExt as FUStreamExt}; | |
use tokio::sync::mpsc; | |
#[derive(Debug, Clone)] | |
struct Example { | |
name: String, | |
size: String, | |
} | |
impl FromStream for Example { | |
type Context = mpsc::Sender<Example>; | |
async fn from_stream<D: destream::de::Decoder>(context: Self::Context, decoder: &mut D) -> Result<Self, D::Error> { | |
decoder.decode_any(ExampleVisitor { context }).await | |
} | |
} | |
struct ExampleVisitor { | |
context: mpsc::Sender<Example> | |
} | |
impl destream::de::Visitor for ExampleVisitor { | |
type Value = Example; | |
fn expecting() -> &'static str { | |
"an Example" | |
} | |
async fn visit_map<A: destream::de::MapAccess>(self, mut access: A) -> Result<Self::Value, A::Error> { | |
let mut example = Example{ name: "".to_string(), size: "".to_string() }; | |
while let Some(key) = access.next_key::<String>(()).await? { | |
match key.as_str() { | |
"name" => { | |
example.name = access.next_value::<String>(()).await?; | |
}, | |
"size" => { | |
example.size = access.next_value::<String>(()).await?; | |
}, | |
_ => { | |
println!("Unknown key: {}", key); | |
} | |
} | |
} | |
println!("Mapped example {:?}", example); | |
self.context.send(example).await.unwrap(); | |
Ok(Example { | |
name: "Invalid: This was streamed to the context.".to_string(), | |
size: "Invalid: This was streamed to the context.".to_string(), | |
}) | |
} | |
async fn visit_seq<A: SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> { | |
println!("visit_seq"); | |
loop { | |
match seq.next_element::<Example>(self.context.clone()).await? { | |
Some(example) => { | |
println!("Got example {:?}", example); | |
} | |
None => { | |
break; | |
} | |
} | |
} | |
Ok(Example { | |
name: "Invalid: This was streamed to the context.".to_string(), | |
size: "Invalid: This was streamed to the context.".to_string(), | |
}) | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let example = r#" | |
[ | |
{ "name": "cartman", "size": "festively plump" }, | |
{ "name": "rejected", "size": "fat and sassy" } | |
] | |
"#; | |
let stream = FUStreamExt::map(stream::iter(example.bytes().into_iter().clone()).chunks(10), Bytes::from); | |
let (sender, mut receiver) = mpsc::channel::<Example>(32); | |
tokio::spawn(async move { | |
let example: Example = destream_json::decode(sender, stream).await.unwrap(); | |
println!("Done with useless example because I'm bad at rust: {:?}", example) | |
}); | |
while let Some(example) = receiver.recv().await { | |
println!("Received example from channel {:?}", example); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment