Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active April 30, 2022 07:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.
Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.
Various approaches to optimize serialized data size
use crate::session::{SessionDeserialize, SessionSerialize};
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeStruct;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::Formatter;
/// A sample record of time-series data.
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct Record {
/// A number of milliseconds since UNIX_EPOCH.
///
/// I'm using u64 for ease of comparison during tests, as `SystemTime::now()`
/// has < 1ms precision, which will be cut off by this approach.
timestamp: u64,
data: Vec<u8>,
}
impl Record {
pub fn new(timestamp: u64, data: Vec<u8>) -> Self {
Record { timestamp, data }
}
}
/// Session used during serialization/deserialization.
#[derive(Default)]
pub struct RecordSession {
latest: i64,
}
impl RecordSession {
fn timestamp_to_delta(&mut self, timestamp: u64) -> i64 {
let ms = timestamp as i64;
let delta = ms - self.latest;
self.latest = ms;
delta
}
fn timestamp_from_delta(&mut self, timestamp_delta: i64) -> u64 {
self.latest += timestamp_delta;
self.latest as u64
}
}
impl SessionSerialize for Record {
type Session = RecordSession;
fn session_serialize<S>(
&self,
session: &mut Self::Session,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut s = serializer.serialize_struct("Record", 2)?;
let timestamp_delta = session.timestamp_to_delta(self.timestamp);
s.serialize_field("timestamp", &timestamp_delta)?;
s.serialize_field("data", &self.data)?;
s.end()
}
}
impl<'de> SessionDeserialize<'de> for Record {
type Session = RecordSession;
fn session_deserialize<D>(
session: &mut Self::Session,
deserializer: D,
) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
const FIELDS: &'static [&'static str] = &["timestamp", "data"];
struct RecordVisitor<'s, 'de> {
session: &'s mut <Record as SessionDeserialize<'de>>::Session,
}
impl<'s, 'de> Visitor<'de> for RecordVisitor<'s, 'de> {
type Value = Record;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
write!(formatter, "struct Record")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let timestamp_delta = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let timestamp = self.session.timestamp_from_delta(timestamp_delta);
let data = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(1, &self))?;
Ok(Record::new(timestamp, data))
}
}
let visitor = RecordVisitor { session };
deserializer.deserialize_struct("Record", FIELDS, visitor)
}
}
#[cfg(test)]
mod test {
use crate::delta_encoding::{Record, RecordSession};
use crate::session::{SessionDeserialize, SessionSerialize};
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::time::{SystemTime, UNIX_EPOCH};
fn now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
fn generate_sample(len: usize) -> Vec<Record> {
(0..len)
.into_iter()
.map(|i| Record::new(now(), vec![i as u8]))
.collect()
}
#[test]
fn standard_serialize() {
let input = generate_sample(100);
let mut buf = Vec::new();
// serialize 100 records using length-prefix encoding
let mut serializer = rmp_serde::Serializer::new(&mut buf);
input.len().serialize(&mut serializer).unwrap();
for record in input.iter() {
record.serialize(&mut serializer).unwrap();
}
println!("standard payload size: {} bytes", buf.len()); // 1201 bytes
// deserialize previously serialized input
let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf));
let len = usize::deserialize(&mut deserializer).unwrap();
for i in 0..len {
let record = Record::deserialize(&mut deserializer).unwrap();
assert_eq!(record, input[i]);
}
}
#[test]
fn session_serialize() {
let input = generate_sample(100);
let mut buf = Vec::new();
// serialize 100 records using length-prefix encoding with delta session
let mut session = RecordSession::default();
let mut serializer = rmp_serde::Serializer::new(&mut buf);
input.len().serialize(&mut serializer).unwrap();
for record in input.iter() {
record
.session_serialize(&mut session, &mut serializer)
.unwrap();
}
println!("delta payload size: {} bytes", buf.len()); // 409 bytes
// deserialize previously serialized input using new session (as we emulate remote end)
let mut session = RecordSession::default();
let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf));
let len = usize::deserialize(&mut deserializer).unwrap();
for i in 0..len {
let record = Record::session_deserialize(&mut session, &mut deserializer).unwrap();
assert_eq!(record, input[i]);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment