-
-
Save hniksic/0e310274b93810e14b9550aeee546456 to your computer and use it in GitHub Desktop.
Workaround with channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use serde::de::{Deserializer, SeqAccess, Visitor}; | |
use serde::Deserialize; | |
use serde_json; | |
use std::io::Read; | |
use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; | |
use std::{fmt, thread}; | |
type DeserializeResult = Result<MyJson, String>; | |
#[derive(Deserialize, Debug)] | |
struct MyJson { | |
val1: String, | |
val2: Vec<i32>, | |
} | |
struct MyJsonIterator { | |
receiver: Receiver<DeserializeResult>, | |
} | |
struct MyJsonVisitor { | |
sender: SyncSender<DeserializeResult>, | |
} | |
impl Iterator for MyJsonIterator { | |
type Item = DeserializeResult; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.receiver.recv().ok() //ok() because a RecvError implies we are done | |
} | |
} | |
impl MyJsonIterator { | |
pub fn new(reader: impl Read + Send + 'static) -> Self { | |
let (sender, receiver) = sync_channel::<DeserializeResult>(0); | |
thread::spawn(move || { | |
let mut deserializer = serde_json::Deserializer::from_reader(reader); | |
if let Err(e) = deserializer.deserialize_seq(MyJsonVisitor { | |
sender: sender.clone(), | |
}) { | |
let _ = sender.send(Err(e.to_string())); //let _ = because error from calling send just means receiver has disconnected | |
} | |
}); | |
Self { receiver } | |
} | |
} | |
impl<'de> Visitor<'de> for MyJsonVisitor { | |
type Value = (); | |
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { | |
formatter.write_str("array of MyJson") | |
} | |
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> | |
where | |
A: SeqAccess<'de>, | |
{ | |
while let Some(val) = seq.next_element::<MyJson>()? { | |
if self.sender.send(Ok(val)).is_err() { | |
break; //receiver has disconnected. | |
} | |
} | |
Ok(()) | |
} | |
} | |
fn main() { | |
let data = setup_test_data(); | |
let iter = MyJsonIterator::new(std::io::Cursor::new(data)); | |
for my_json in iter { | |
let _ = my_json.unwrap(); | |
} | |
} | |
fn setup_test_data() -> Vec<u8> { | |
let mut data = vec![]; | |
data.push(b'['); | |
for _ in 0..1_000_000 { | |
data.extend_from_slice( | |
br#" | |
{ | |
"val1": "one", | |
"val2": [ | |
0 | |
] | |
}, | |
{ | |
"val1": "two", | |
"val2": [ | |
1 | |
] | |
}, | |
{ | |
"val1": "three", | |
"val2": [ | |
2 | |
] | |
}, | |
"#, | |
); | |
} | |
data.extend_from_slice( | |
br#"{ | |
"val1": "final", | |
"val2": [ | |
3 | |
] | |
} | |
]"#, | |
); | |
data | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment