Skip to content

Instantly share code, notes, and snippets.

@commonsensesoftware
Last active October 30, 2023 22:38
Show Gist options
  • Save commonsensesoftware/9c8605a0610e9ef91a9d3a0d4ead9764 to your computer and use it in GitHub Desktop.
Save commonsensesoftware/9c8605a0610e9ef91a9d3a0d4ead9764 to your computer and use it in GitHub Desktop.
JSON Streaming in Rust

JSON Streaming in Rust

This provides a Gist of what's necessary to stream JSON content in Rust. This particular example uses the Flyweight pattern so that only a single data structure is enumerated and in memory at a time.

The consequence of this design is that you cannot implement Iterator (due to required lifetime annotations), which would enable using an idiomatic for loop. The enumeration, however, can just as easily be performed with an idiomatic while let loop. This example could be modified to yield a new struct per iteration and that would allow implementing the Iterator trait. This would still be efficient since the struct would be created and dropped during each enumerated item.

This example is heavily influenced by an alternate approach in Serde JSON #404. The key difference is using Deserialize::deserialize_in_place instead of StreamDeserializer and demonstrating how to deal with a JSON document where the sequence you want to enumerate is not the root element.

To run this example, clone the Gist and then use cargo run. You will see the following output:

nth id meta key   first name      last name
--- -- ---- ----- --------------- ---------------
0   42    1 12345 Bob             Smith
1   42    1 67890 John            Doe
2   42    1 13579 Bill            Mei
3   42    1 24680 Jane            Doe
4   42    1 97531 Jessica         Rabbit

Position 683 of 703

The reported position at the end demonstrates that deserialization completed before reaching the end of the file. Be aware that could mean the JSON is invalid. You would be required to continue driving or validating the JSON from this point forward to ensure its correctness, if you even want to.

[package]
name = "json-streaming"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "streamjson"
path = "main.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
use std::collections::HashMap;
#[derive(Default)]
pub struct ColumnMap {
columns: HashMap<String, u8>,
}
impl ColumnMap {
pub fn with_capacity(capacity: usize) -> Self {
Self {
columns: HashMap::with_capacity(capacity),
}
}
pub fn index<S: AsRef<str>>(&self, key: S) -> Option<&u8> {
self.columns.get(key.as_ref())
}
pub fn len(&self) -> usize {
self.columns.len()
}
pub fn is_empty(&self) -> bool {
self.columns.len() == 0
}
pub(crate) fn insert(&mut self, key: String, value: u8) {
self.columns.insert(key, value);
}
pub fn iter(&self) -> impl Iterator<Item = (&String, &u8)> + '_ {
self.columns.iter()
}
}
{
"id": "42",
"metadata": [
"meta-1"
],
"columns": {
"key": "0",
"fname": "1",
"lname": "2"
},
"rows": [
{
"0": "12345",
"1": "Bob",
"2": "Smith"
},
{
"0": "67890",
"1": "John",
"2": "Doe"
},
{
"0": "13579",
"1": "Bill",
"2": "Mei"
},
{
"0": "24680",
"1": "Jane",
"2": "Doe"
},
{
"0": "97531",
"1": "Jessica",
"2": "Rabbit"
}
],
"count": 5
}
use crate::columns::ColumnMap;
use crate::row::Row;
use serde::de::{self, Deserialize, Deserializer, MapAccess, Unexpected, Visitor};
use std::borrow::Cow;
use std::fmt;
use std::marker::PhantomData;
use std::str;
struct ColumnVisitor;
/// Implements a visitor that converts the 'columns' JSON object into a ColumnMap
impl<'de> Visitor<'de> for ColumnVisitor {
type Value = ColumnMap;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Columns")
}
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut columns = match access.size_hint() {
Some(capacity) => ColumnMap::with_capacity(capacity),
None => ColumnMap::default(),
};
while let Some((key, value)) = access.next_entry::<String, String>()? {
match value.parse::<u8>() {
Ok(index) => columns.insert(key, index),
_ => return Err(de::Error::invalid_type(Unexpected::Str(&value), &self)),
};
}
Ok(columns)
}
}
impl<'de> Deserialize<'de> for ColumnMap {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_map(ColumnVisitor)
}
}
enum Field {
Id,
Metadata,
Columns,
Rows,
}
const FIELDS: &'static [&'static str] = &["id", "metadata", "columns", "rows"];
struct FieldVisitor;
/// Implements a visitor to track well-known fields without allocating
impl<'de> Visitor<'de> for FieldVisitor {
type Value = Field;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_fmt(format_args!(
"Expected one of the following: {}",
FIELDS.join(", ")
))
}
fn visit_str<E>(self, value: &str) -> Result<Field, E>
where
E: de::Error,
{
match value {
"id" => Ok(Field::Id),
"metadata" => Ok(Field::Metadata),
"columns" => Ok(Field::Columns),
"rows" => Ok(Field::Rows),
_ => Err(de::Error::unknown_field(value, FIELDS)),
}
}
}
impl<'de> Deserialize<'de> for Field {
fn deserialize<D>(deserializer: D) -> Result<Field, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_identifier(FieldVisitor)
}
}
pub(crate) struct PartialJson {
pub id: String,
pub metadata: Vec<String>,
pub columns: ColumnMap,
}
impl Default for PartialJson {
fn default() -> Self {
Self {
id: String::with_capacity(0),
metadata: Vec::with_capacity(0),
columns: ColumnMap::with_capacity(0),
}
}
}
struct PartialJsonVisitor<'de, 'a>(&'a mut PartialJson, PhantomData<&'de ()>);
/// Implements a visitor that populates everything to, but not including, rows
impl<'de, 'a> Visitor<'de> for PartialJsonVisitor<'de, 'a> {
type Value = ();
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("JSON document")
}
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
where
V: MapAccess<'de>,
{
let doc = self.0;
while let Some(key) = map.next_key()? {
match key {
Field::Id => doc.id = map.next_value()?,
Field::Metadata => doc.metadata = map.next_value()?,
Field::Columns => doc.columns = map.next_value()?,
Field::Rows => {
// TODO: before moving on, validation can be done here
//
// if doc.id.is_empty() {
// return Err(de::Error::missing_field("Missing 'id'"));
// }
// this is where 'magic' happens with the visitor. we perform validation
// on the previously expected data members before we exit deserialization.
// at this point the underlying stream has read up to:
//
// {
// ...,
// "rows": [
// ↑ cursor is right before the array value
// ]
// }
//
break;
}
}
}
Ok(())
}
}
impl<'de> Deserialize<'de> for PartialJson {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Err(de::Error::custom("Use deserialize_in_place instead"))
}
fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
where
D: Deserializer<'de>,
{
// normally, 'deserialize' would be used to reify a struct. we can reify a partial
// struct, but the default behavior of the JSON deserialize will read to the end
// to verify format correctness. we side step that process by deserializing in-place
// and they bail out of the deserialization process early.
deserializer.deserialize_struct(
"PartialJson",
FIELDS,
PartialJsonVisitor(place, PhantomData),
)
}
}
struct ValuesVisitor<'a> {
row: &'a mut Row
}
impl<'a> ValuesVisitor<'a> {
fn new(row: &'a mut Row) -> Self {
Self { row }
}
}
/// Implements a visitor that process the values of row as a singleton array of values
impl<'de, 'a> Visitor<'de> for ValuesVisitor<'a> {
type Value = ();
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Row")
}
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
where
V: MapAccess<'de>,
{
let mut count = 0;
let expected = self.row.values.len();
// the JSON is an object, but the key stringified integer, so we
// can map it to a simple array instead
while let Some(key) = map.next_key::<usize>()? {
if key < expected {
let buffer = &mut self.row.values[key];
buffer.clear();
// all values should be JSON strings; however, we don't know
// if the text contains escape sequences. if the value is just
// normal string, then we'll get a borrowed string slice without
// allocating. if the string has an escape sequence, it has to
// be unescaped, which will allocate a new string to hold the
// unescaped value.
//
// {
// "normal": "normal",
// "escaped": "line 1\\nline 2",
// }
//
// we check the value length as well. a zero-length string
// (e.g. "") will never allocate.
//
// there isn't an immediately obvious way to capture the
// Cow values directly. if that is possible, there is an
// opportunity to improve things further, by skipping
// the copy operation
let value = map.next_value::<Cow<'a, str>>()?;
if !value.is_empty() {
match value {
Cow::Borrowed(normal_string) => buffer.push_str(normal_string),
Cow::Owned(unescaped_string) => buffer.push_str(unescaped_string.as_str()),
};
}
}
count += 1;
}
if count != expected {
return Err(de::Error::custom(format!(
"Row {} has a column count of {}, but there are {} column defined.",
self.row.index, count, expected,
)));
}
Ok(())
}
}
impl<'de> Deserialize<'de> for Row {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// if we really wanted to support it, creating a new Row
// per item would work
Err(de::Error::custom("Use deserialize_in_place instead"))
}
fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
where
D: Deserializer<'de>,
{
// big gains come from passing in a single Row struct as a monad and updating just its
// values. the first pass is expected to allocate a String buffer for each value.
// while we can't be sure, most columns use similar values so subsequent rows
// likely just copy bytes from the input stream to the existing string buffers.
// this should be a very small amount of memory (ex: < 1K)
deserializer.deserialize_map(ValuesVisitor::new(place))
}
}
use std::fmt::{self, Display};
#[derive(Debug, Clone)]
pub enum Error {
Syntax,
ExpectedString,
Custom(String),
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Syntax => write!(f, "Invalid JSON"),
Error::ExpectedString => write!(f, "Expected a valid JSON string"),
Error::Custom(msg) => write!(f, "{}", msg),
}
}
}
impl std::error::Error for Error {
fn description(&self) -> &str {
"Deserialization error"
}
}
impl serde::ser::Error for Error {
fn custom<T: Display>(msg: T) -> Self {
Error::Custom(msg.to_string())
}
}
impl serde::de::Error for Error {
fn custom<T: Display>(msg: T) -> Self {
Error::Custom(msg.to_string())
}
}
use crate::row::Row;
use serde::de::Deserialize;
use serde_json::Deserializer;
use std::io::{Cursor, Error, ErrorKind, Read, Result};
use std::slice;
/// The `Iterator` trait does not support a lifetime contract between the iterator
/// and the items it returns, which means it cannot be implemented. As a result
/// this iterator cannot be used with a `for` loop; however it can be used with an
/// idiomatic `while let` loop.
pub trait StreamingIterator<'a, T> {
fn next(&'a mut self) -> Option<Result<&'a T>>;
}
pub trait ReadOwner<T: Read> {
fn take_reader(self) -> T;
}
pub struct Iter<T: Read> {
row: Row,
reader: T,
first: bool,
}
impl<T: Read> Iter<T> {
pub(crate) fn new(row: Row, reader: T) -> Self {
Self { row, reader, first: true }
}
}
impl<T: Read> Iter<T> {
// big optimizations come here. we are streaming the results by
// deserializing one Row at time from the JSON array into a monad
// that we've already mostly reified using a forward-only stream.
//
// serde and serde_json have limited streaming abilities. it is
// imperative that we yield each Row as it is visited. most
// JSON deserializers, serde_json included, perform validation
// to ensure the JSON is well-formed. we yield items before we
// can perform this validation. as a consequence, it is possible
// that the JSON is malformed, but this is incredibly unlikely.
// if we encounter unexpected data or JSON in the process, those
// errors are surfaced. if we wanted to retain checking JSON
// the syntax and schema, we 'could' continue to drive deserialization
// to the end of the stream.
//
// this implementation is largely based on a proposed workaround
// for the streaming limitations in serde/serde_json:
//
// REF: https://github.com/serde-rs/json/issues/404
fn try_read<'a>(&'a mut self) -> Result<Option<&'a Row>> {
if self.first {
self.first = false;
if Self::next_char(&mut self.reader)? == b'[' {
let peek = Self::next_char(&mut self.reader)?;
if peek == b']' {
Ok(None)
} else {
match self.deserialize(Some(peek)) {
Some(error) => Err(error),
None => Ok(Some(&self.row)),
}
}
} else {
Err(Self::invalid_json("'[' expected, but not found"))
}
} else {
match Self::next_char(&mut self.reader)? {
b',' => match self.deserialize(None) {
Some(error) => Err(error),
None => Ok(Some(&self.row)),
},
b']' => Ok(None),
_ => Err(Self::invalid_json("',' or ']' expected, but not found")),
}
}
}
fn deserialize(&mut self, peek: Option<u8>) -> Option<Error> {
if let Some(ch) = peek {
// the stream has to be advanced to the first non-whitespace character within the JSON array;
// however doing this consumes the character and advances the reader. we have no way to go back,
// so we create temporary cursor that stitches the character we've consumed together with the
// rest of the stream
let mut reader = Cursor::new([ch]).chain(&mut self.reader);
Self::deserialize_in_place(&mut reader, &mut self.row)
} else {
self.row.index += 1;
Self::deserialize_in_place(&mut self.reader, &mut self.row)
}
}
fn deserialize_in_place<R: Read>(reader: &mut R, row: &mut Row) -> Option<Error> {
let mut json = Deserializer::from_reader(reader);
match Row::deserialize_in_place(&mut json, row) {
Err(error) => Some(Self::invalid_json(&error.to_string())),
_ => None,
}
}
fn next_char(mut reader: impl Read) -> Result<u8> {
let mut byte = 0u8;
loop {
reader.read_exact(slice::from_mut(&mut byte))?;
if !byte.is_ascii_whitespace() {
return Ok(byte);
}
}
}
fn invalid_json(msg: &str) -> Error {
Error::new(ErrorKind::InvalidData, msg)
}
}
impl<'a, T: Read> StreamingIterator<'a, Row> for Iter<T> {
fn next(&'a mut self) -> Option<Result<&'a Row>> {
self.try_read().transpose()
}
}
impl<T: Read> ReadOwner<T> for Iter<T> {
fn take_reader(self) -> T {
self.reader
}
}
pub mod columns;
mod de;
pub mod error;
pub mod iter;
pub mod row;
use de::PartialJson;
use iter::{Iter, ReadOwner, StreamingIterator};
use row::Row;
use serde::Deserialize;
use serde_json::Deserializer;
use std::{
fs::File,
io::{BufReader, Seek},
iter::repeat,
path::Path,
};
fn main() {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("data.json");
let file = File::open(path).unwrap();
let size = file.metadata().unwrap().len();
let mut reader = BufReader::new(file);
let mut partial = PartialJson::default();
let mut deserializer = Deserializer::from_reader(&mut reader);
// a JSON syntax error is expected here because we don't consume the entire stream
// to verify the JSON is well-formed. there is no specific way to handle this type
// of error so we ignore all syntax errors. any other other could be bubbled up.
if let Err(error) = PartialJson::deserialize_in_place(&mut deserializer, &mut partial) {
if !error.is_syntax() {
println!("{:?}", error);
return;
}
}
let count = partial.columns.len();
let values = repeat(String::with_capacity(0)).take(count).collect();
let monad = Row {
index: 0,
id: partial.id,
metadata: partial.metadata,
columns: partial.columns,
values,
};
let mut rows = Iter::new(monad, reader);
println!(
"\n{:<3} {:>2} {} {:<5} {:<15} {:<5}",
"nth", "id", "meta", "key", "first name", "last name"
);
println!("--- -- ---- ----- --------------- ---------------");
while let Some(result) = rows.next() {
if let Ok(row) = result {
println!(
"{:<3} {:>2} {:>4} {:>5} {:<15} {:<15}",
row.index,
row.id,
row.metadata.len(),
row.values[0],
row.values[1],
row.values[2]
);
}
}
// prove we didn't drive through all of the JSON (on purpose)
reader = rows.take_reader();
println!(
"\nPosition {} of {}",
reader.stream_position().unwrap(),
size
);
pause();
}
fn pause() {
println!("\nPress any key to continue...");
let mut line = String::new();
std::io::stdin().read_line(&mut line).ok();
}
use crate::columns::ColumnMap;
pub struct Row {
pub index: usize,
pub id: String,
pub metadata: Vec<String>,
pub columns: ColumnMap,
pub values: Vec<String>,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment