Skip to content

Instantly share code, notes, and snippets.

@hniksic

hniksic/main.rs Secret

Created August 4, 2021 20:33
Show Gist options
  • Save hniksic/0e310274b93810e14b9550aeee546456 to your computer and use it in GitHub Desktop.
Save hniksic/0e310274b93810e14b9550aeee546456 to your computer and use it in GitHub Desktop.
Workaround with channels
use serde::de::{Deserializer, SeqAccess, Visitor};
use serde::Deserialize;
use serde_json;
use std::io::Read;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::{fmt, thread};
type DeserializeResult = Result<MyJson, String>;
#[derive(Deserialize, Debug)]
struct MyJson {
val1: String,
val2: Vec<i32>,
}
struct MyJsonIterator {
receiver: Receiver<DeserializeResult>,
}
struct MyJsonVisitor {
sender: SyncSender<DeserializeResult>,
}
impl Iterator for MyJsonIterator {
type Item = DeserializeResult;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok() //ok() because a RecvError implies we are done
}
}
impl MyJsonIterator {
pub fn new(reader: impl Read + Send + 'static) -> Self {
let (sender, receiver) = sync_channel::<DeserializeResult>(0);
thread::spawn(move || {
let mut deserializer = serde_json::Deserializer::from_reader(reader);
if let Err(e) = deserializer.deserialize_seq(MyJsonVisitor {
sender: sender.clone(),
}) {
let _ = sender.send(Err(e.to_string())); //let _ = because error from calling send just means receiver has disconnected
}
});
Self { receiver }
}
}
impl<'de> Visitor<'de> for MyJsonVisitor {
type Value = ();
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("array of MyJson")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
while let Some(val) = seq.next_element::<MyJson>()? {
if self.sender.send(Ok(val)).is_err() {
break; //receiver has disconnected.
}
}
Ok(())
}
}
fn main() {
let data = setup_test_data();
let iter = MyJsonIterator::new(std::io::Cursor::new(data));
for my_json in iter {
let _ = my_json.unwrap();
}
}
fn setup_test_data() -> Vec<u8> {
let mut data = vec![];
data.push(b'[');
for _ in 0..1_000_000 {
data.extend_from_slice(
br#"
{
"val1": "one",
"val2": [
0
]
},
{
"val1": "two",
"val2": [
1
]
},
{
"val1": "three",
"val2": [
2
]
},
"#,
);
}
data.extend_from_slice(
br#"{
"val1": "final",
"val2": [
3
]
}
]"#,
);
data
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment