Last active
April 30, 2022 07:19
-
-
Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.
Various approaches to optimize serialized data size
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::session::{SessionDeserialize, SessionSerialize}; | |
use serde::de::{Error, SeqAccess, Visitor}; | |
use serde::ser::SerializeStruct; | |
use serde::{Deserialize, Deserializer, Serialize, Serializer}; | |
use std::fmt::Formatter; | |
/// A sample record of time-series data. | |
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] | |
pub struct Record { | |
/// A number of milliseconds since UNIX_EPOCH. | |
/// | |
/// I'm using u64 for ease of comparison during tests, as `SystemTime::now()` | |
/// has < 1ms precision, which will be cut off by this approach. | |
timestamp: u64, | |
data: Vec<u8>, | |
} | |
impl Record { | |
pub fn new(timestamp: u64, data: Vec<u8>) -> Self { | |
Record { timestamp, data } | |
} | |
} | |
/// Session used during serialization/deserialization. | |
#[derive(Default)] | |
pub struct RecordSession { | |
latest: i64, | |
} | |
impl RecordSession { | |
fn timestamp_to_delta(&mut self, timestamp: u64) -> i64 { | |
let ms = timestamp as i64; | |
let delta = ms - self.latest; | |
self.latest = ms; | |
delta | |
} | |
fn timestamp_from_delta(&mut self, timestamp_delta: i64) -> u64 { | |
self.latest += timestamp_delta; | |
self.latest as u64 | |
} | |
} | |
impl SessionSerialize for Record { | |
type Session = RecordSession; | |
fn session_serialize<S>( | |
&self, | |
session: &mut Self::Session, | |
serializer: S, | |
) -> Result<S::Ok, S::Error> | |
where | |
S: Serializer, | |
{ | |
let mut s = serializer.serialize_struct("Record", 2)?; | |
let timestamp_delta = session.timestamp_to_delta(self.timestamp); | |
s.serialize_field("timestamp", ×tamp_delta)?; | |
s.serialize_field("data", &self.data)?; | |
s.end() | |
} | |
} | |
impl<'de> SessionDeserialize<'de> for Record { | |
type Session = RecordSession; | |
fn session_deserialize<D>( | |
session: &mut Self::Session, | |
deserializer: D, | |
) -> Result<Self, D::Error> | |
where | |
D: Deserializer<'de>, | |
{ | |
const FIELDS: &'static [&'static str] = &["timestamp", "data"]; | |
struct RecordVisitor<'s, 'de> { | |
session: &'s mut <Record as SessionDeserialize<'de>>::Session, | |
} | |
impl<'s, 'de> Visitor<'de> for RecordVisitor<'s, 'de> { | |
type Value = Record; | |
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { | |
write!(formatter, "struct Record") | |
} | |
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> | |
where | |
A: SeqAccess<'de>, | |
{ | |
let timestamp_delta = seq | |
.next_element()? | |
.ok_or_else(|| A::Error::invalid_length(0, &self))?; | |
let timestamp = self.session.timestamp_from_delta(timestamp_delta); | |
let data = seq | |
.next_element()? | |
.ok_or_else(|| A::Error::invalid_length(1, &self))?; | |
Ok(Record::new(timestamp, data)) | |
} | |
} | |
let visitor = RecordVisitor { session }; | |
deserializer.deserialize_struct("Record", FIELDS, visitor) | |
} | |
} | |
#[cfg(test)] | |
mod test { | |
use crate::delta_encoding::{Record, RecordSession}; | |
use crate::session::{SessionDeserialize, SessionSerialize}; | |
use serde::{Deserialize, Serialize}; | |
use std::io::Cursor; | |
use std::time::{SystemTime, UNIX_EPOCH}; | |
fn now() -> u64 { | |
SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.unwrap() | |
.as_millis() as u64 | |
} | |
fn generate_sample(len: usize) -> Vec<Record> { | |
(0..len) | |
.into_iter() | |
.map(|i| Record::new(now(), vec![i as u8])) | |
.collect() | |
} | |
#[test] | |
fn standard_serialize() { | |
let input = generate_sample(100); | |
let mut buf = Vec::new(); | |
// serialize 100 records using length-prefix encoding | |
let mut serializer = rmp_serde::Serializer::new(&mut buf); | |
input.len().serialize(&mut serializer).unwrap(); | |
for record in input.iter() { | |
record.serialize(&mut serializer).unwrap(); | |
} | |
println!("standard payload size: {} bytes", buf.len()); // 1201 bytes | |
// deserialize previously serialized input | |
let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf)); | |
let len = usize::deserialize(&mut deserializer).unwrap(); | |
for i in 0..len { | |
let record = Record::deserialize(&mut deserializer).unwrap(); | |
assert_eq!(record, input[i]); | |
} | |
} | |
#[test] | |
fn session_serialize() { | |
let input = generate_sample(100); | |
let mut buf = Vec::new(); | |
// serialize 100 records using length-prefix encoding with delta session | |
let mut session = RecordSession::default(); | |
let mut serializer = rmp_serde::Serializer::new(&mut buf); | |
input.len().serialize(&mut serializer).unwrap(); | |
for record in input.iter() { | |
record | |
.session_serialize(&mut session, &mut serializer) | |
.unwrap(); | |
} | |
println!("delta payload size: {} bytes", buf.len()); // 409 bytes | |
// deserialize previously serialized input using new session (as we emulate remote end) | |
let mut session = RecordSession::default(); | |
let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf)); | |
let len = usize::deserialize(&mut deserializer).unwrap(); | |
for i in 0..len { | |
let record = Record::session_deserialize(&mut session, &mut deserializer).unwrap(); | |
assert_eq!(record, input[i]); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment