Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Last active March 18, 2024 03:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthewjberger/f6cd3e63c5da6a225d35811d9171abab to your computer and use it in GitHub Desktop.
Save matthewjberger/f6cd3e63c5da6a225d35811d9171abab to your computer and use it in GitHub Desktop.
Snappy and bincode data streaming to/from file on disk
// [dependencies]
// bincode = "1.3.3"
// serde = { version = "1.0.197", features = ["derive"] }
// snap = "1.1.1"
use bincode::{deserialize, serialize};
use serde::{de::DeserializeOwned, Serialize};
use snap::{read::FrameDecoder, write::FrameEncoder};
use std::{
fs::{File, OpenOptions},
io::{self, BufRead, BufReader, BufWriter, Write},
path::Path,
};
pub struct SnappyBincodeStream<T> {
reader: Option<BufReader<FrameDecoder<File>>>,
writer: Option<FrameEncoder<BufWriter<File>>>,
_phantom: std::marker::PhantomData<T>,
}
impl<T: Serialize + DeserializeOwned + Send + Sync> SnappyBincodeStream<T> {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let file = OpenOptions::new()
.read(true)
.create(true)
.append(true)
.open(path)?;
let reader = BufReader::new(FrameDecoder::new(file.try_clone()?));
let writer = FrameEncoder::new(BufWriter::new(file));
Ok(Self {
reader: Some(reader),
writer: Some(writer),
_phantom: std::marker::PhantomData,
})
}
pub fn write(&mut self, item: &T) -> bincode::Result<()> {
if let Some(writer) = &mut self.writer {
let serialized = serialize(item)?;
writer.write_all(&serialized)?;
writer.write_all(b"\n")?;
}
Ok(())
}
pub fn write_all<I>(&mut self, items: I) -> bincode::Result<()>
where
I: IntoIterator<Item = T>,
{
items.into_iter().try_for_each(|item| self.write(&item))
}
pub fn read(&mut self) -> bincode::Result<Option<T>> {
if let Some(reader) = &mut self.reader {
let mut buffer = Vec::new();
reader.read_until(b'\n', &mut buffer)?;
if !buffer.is_empty() {
let item = deserialize(&buffer[..buffer.len() - 1])?;
Ok(Some(item))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn read_all(&mut self) -> bincode::Result<Vec<T>> {
std::iter::from_fn(|| self.read().transpose()).collect()
}
pub fn decompress(&mut self) -> bincode::Result<Vec<T>> {
self.read_all()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Data {
field1: String,
field2: i32,
}
#[test]
fn test_snappy_bincode_stream() -> bincode::Result<()> {
let data1 = Data {
field1: "Hello".to_string(),
field2: 42,
};
let data2 = Data {
field1: "World".to_string(),
field2: 123,
};
{
let mut stream = SnappyBincodeStream::<Data>::open("test.snappy")?;
stream.write(&data1)?;
}
{
let mut stream = SnappyBincodeStream::<Data>::open("test.snappy")?;
stream.write(&data2)?;
}
{
let mut stream = SnappyBincodeStream::<Data>::open("test.snappy")?;
let lines = stream.decompress().unwrap();
for line in lines {
println!("{:?}", line);
}
}
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment