Skip to content

Instantly share code, notes, and snippets.

@MiSawa
Created March 27, 2022 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MiSawa/4e4086bd649f4639f390b93ca214f7f0 to your computer and use it in GitHub Desktop.
Save MiSawa/4e4086bd649f4639f390b93ca214f7f0 to your computer and use it in GitHub Desktop.
Trying something for `--stream`
use std::sync::mpsc::{sync_channel, SyncSender};
use anyhow::{anyhow, Result};
#[derive(Clone, Debug)]
enum Index {
Array(usize),
Map(String),
}
type Path = Vec<Index>;
#[derive(Debug)]
enum PrimitiveValue {
Null,
Boolean(bool),
Number(f64),
String(String),
}
struct Value {
path: Path,
value: Option<PrimitiveValue>,
}
impl Value {
fn print(&self) {
print!("[");
print!("[");
for (i, v) in self.path.iter().enumerate() {
if i != 0 {
print!(",");
}
match v {
Index::Array(i) => print!("{i}"),
Index::Map(s) => print!("{s:?}"),
}
}
print!("]");
if let Some(value) = &self.value {
print!(",");
match value {
PrimitiveValue::Null => print!("null"),
PrimitiveValue::Boolean(v) => print!("{v}"),
PrimitiveValue::Number(v) => print!("{v}"),
PrimitiveValue::String(v) => print!("{v:?}"),
}
}
print!("]");
println!("");
}
}
struct StreamState<'a> {
sender: SyncSender<Value>,
path: &'a mut Path,
}
impl<'a> StreamState<'a> {
fn emit_value(&mut self, value: PrimitiveValue) {
self.sender
.send(Value {
path: self.path.clone(),
value: Some(value),
})
.ok(); // Discarding err since this indicates that the recever has already been dropped so they should already know what to do.
}
fn emit_close(&self) {
self.sender
.send(Value {
path: self.path.clone(),
value: None,
})
.ok(); // Discarding err since this indicates that the recever has already been dropped so they should already know what to do.
}
}
impl<'de, 'a> serde::de::Visitor<'de> for &mut StreamState<'a> {
type Value = ();
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
formatter,
"null, boolean, number, string, array, or map keyed with string"
)
}
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::Boolean(v));
Ok(())
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::Number(v as f64));
Ok(())
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::Number(v as f64));
Ok(())
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::Number(v as f64));
Ok(())
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_string(v.into())
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::String(v));
Ok(())
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.emit_value(PrimitiveValue::Null);
Ok(())
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_none()
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut i = 0;
self.path.push(Index::Array(i));
while let Some(_) = seq.next_element_seed(&mut *self)? {
self.path.pop();
i += 1;
self.path.push(Index::Array(i));
}
self.path.pop();
i -= 1;
self.path.push(Index::Array(i));
self.emit_close();
self.path.pop();
Ok(())
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
struct Str;
impl<'de> serde::de::DeserializeSeed<'de> for Str {
type Value = String;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;
impl<'de> serde::de::Visitor<'de> for V {
type Value = String;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "string as the key of a map")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.into())
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v)
}
}
deserializer.deserialize_any(V)
}
}
self.path.push(Index::Map("".into()));
while let Some(key) = map.next_key_seed(Str)? {
self.path.pop();
self.path.push(Index::Map(key));
map.next_value_seed(&mut *self)?;
}
self.emit_close();
self.path.pop();
Ok(())
}
}
impl<'de, 'a> serde::de::DeserializeSeed<'de> for &mut StreamState<'a> {
type Value = ();
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_any(self)?;
Ok(())
}
}
struct Stream;
impl<'de> serde::Deserialize<'de> for Stream {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut path = vec![];
// Actually this sync_channel(1) call should be done only once, sender should be
// cloned, and recv has to be passed to the main thread.
let (sender, _recv) = sync_channel(8);
let mut visitor = StreamState {
sender,
path: &mut path,
};
deserializer.deserialize_any(&mut visitor)?;
Ok(Self)
}
}
fn main() -> Result<()> {
let (sender, recv) = sync_channel(1);
let handler = std::thread::spawn(move || {
for de in serde_yaml::Deserializer::from_reader(std::io::stdin()).into_iter() {
use serde::de::DeserializeSeed;
let mut path = vec![];
let mut state = StreamState {
sender: sender.clone(),
path: &mut path,
};
state.deserialize(de).expect("Deserialization failed");
}
});
while let Ok(v) = recv.recv() {
v.print()
}
handler
.join()
.map_err(|e| anyhow!("Failed to join {e:?}"))?;
// for v in serde_json::de::Deserializer::from_reader(stdin).into_iter::<Stream>() {
// if let Err(e) = v {
// eprintln!("{:?}", e);
// }
// }
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment