Skip to content

Instantly share code, notes, and snippets.

@MarinPostma
Created July 1, 2023 07:22
Show Gist options
  • Save MarinPostma/ce3d7bfe3f1e7f937f009b6570a04f64 to your computer and use it in GitHub Desktop.
Save MarinPostma/ce3d7bfe3f1e7f937f009b6570a04f64 to your computer and use it in GitHub Desktop.
use std::borrow::Cow;
use std::ops::Deref;
use std::collections::HashSet;
use std::fs::File;
use std::mem::{size_of, transmute};
use std::os::unix::prelude::FileExt;
use bytemuck::{Zeroable, Pod, pod_read_unaligned, bytes_of, try_from_bytes};
use bytes::{BytesMut, Bytes};
type FrameNo = u64;
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C)]
pub struct LogFileHeader {
/// magic number: b"SQLDWAL\0" as u64
pub magic: u64,
/// Initial checksum value for the rolling CRC checksum
/// computed with the 64 bits CRC_64_GO_ISO
pub start_checksum: u64,
/// Uuid of the database associated with this log.
pub db_id: u128,
/// Frame_no of the first frame in the log
pub start_frame_no: u64,
/// entry count in file
pub frame_count: u64,
/// Wal file version number, currently: 2
pub version: u32,
/// page size: 4096
pub page_size: i32,
/// sqld version when creating this log
pub sqld_version: [u16; 4],
}
#[derive(Debug)]
pub struct LogFile {
file: File,
pub header: LogFileHeader,
/// the maximum number of frames this log is allowed to contain before it should be compacted.
max_log_frame_count: u64,
/// number of frames in the log that have not been commited yet. On commit the header's frame
/// count is incremented by that ammount. New pages are written after the last
/// header.frame_count + uncommit_frame_count.
/// On rollback, this is reset to 0, so that everything that was written after the previous
/// header.frame_count is ignored and can be overwritten
uncommitted_frame_count: u64,
uncommitted_checksum: u64,
/// checksum of the last commited frame
commited_checksum: u64,
}
#[repr(C)]
#[derive(Debug, Clone, Copy, Zeroable, Pod)]
pub struct FrameHeader {
/// Incremental frame number
pub frame_no: u64,
/// Rolling checksum of all the previous frames, including this one.
pub checksum: u64,
/// page number, if frame_type is FrameType::Page
pub page_no: u32,
/// Size of the database (in page) after commiting the transaction. This is passed from sqlite,
/// and serves as commit transaction boundary
pub size_after: u32,
}
const WAL_PAGE_SIZE: u64 = 4096;
impl LogFile {
/// size of a single frame
pub const FRAME_SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize;
pub fn new(file: File, max_log_frame_count: u64) -> anyhow::Result<Self> {
// FIXME: we should probably take a lock on this file, to prevent anybody else to write to
// it.
let file_end = file.metadata()?.len();
let header = Self::read_header(&file)?;
let mut this = Self {
file,
header,
max_log_frame_count,
uncommitted_frame_count: 0,
uncommitted_checksum: 0,
commited_checksum: 0,
};
this.commited_checksum = this.header.start_checksum;
this.uncommitted_checksum = this.header.start_checksum;
Ok(this)
}
pub fn read_header(file: &File) -> anyhow::Result<LogFileHeader> {
let mut buf = [0; size_of::<LogFileHeader>()];
file.read_exact_at(&mut buf, 0)?;
let header: LogFileHeader = pod_read_unaligned(&buf);
Ok(header)
}
pub fn header(&self) -> &LogFileHeader {
&self.header
}
/// Returns an iterator over the WAL frame headers
#[allow(dead_code)]
fn frames_iter(&self) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Frame>> + '_> {
let mut current_frame_offset = 0;
Ok(std::iter::from_fn(move || {
if current_frame_offset >= self.header.frame_count {
return None;
}
let read_byte_offset = Self::absolute_byte_offset(current_frame_offset);
current_frame_offset += 1;
Some(self.read_frame_byte_offset(read_byte_offset))
}))
}
/// Returns an iterator over the WAL frame headers
pub fn rev_frames_iter(
&self,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Frame>> + '_> {
let mut current_frame_offset = self.header.frame_count;
Ok(std::iter::from_fn(move || {
if current_frame_offset == 0 {
return None;
}
current_frame_offset -= 1;
let read_byte_offset = Self::absolute_byte_offset(current_frame_offset);
let frame = self.read_frame_byte_offset(read_byte_offset);
Some(frame)
}))
}
/// offset in bytes at which to write the next frame
fn next_byte_offset(&self) -> u64 {
Self::absolute_byte_offset(self.header().frame_count + self.uncommitted_frame_count)
}
fn next_frame_no(&self) -> FrameNo {
self.header().start_frame_no + self.header().frame_count + self.uncommitted_frame_count
}
/// Returns the bytes position of the `nth` entry in the log
fn absolute_byte_offset(nth: u64) -> u64 {
std::mem::size_of::<LogFileHeader>() as u64 + nth * Self::FRAME_SIZE as u64
}
fn byte_offset(&self, id: FrameNo) -> anyhow::Result<Option<u64>> {
if id < self.header.start_frame_no
|| id > self.header.start_frame_no + self.header.frame_count
{
return Ok(None);
}
Ok(Self::absolute_byte_offset(id - self.header.start_frame_no).into())
}
/// Returns bytes represening a WalFrame for frame `frame_no`
///
/// If the requested frame is before the first frame in the log, or after the last frame,
/// Ok(None) is returned.
fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result<Frame> {
let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE);
self.file.read_exact_at(&mut buffer, offset)?;
let buffer = buffer.freeze();
Frame::try_from_bytes(buffer)
}
}
#[repr(transparent)]
pub struct FrameBorrowed {
data: [u8],
}
impl FrameBorrowed {
pub fn header(&self) -> Cow<FrameHeader> {
let data = &self.data[..size_of::<FrameHeader>()];
try_from_bytes(data)
.map(Cow::Borrowed)
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
}
/// Returns the bytes for this frame. Includes the header bytes.
pub fn as_slice(&self) -> &[u8] {
&self.data
}
pub fn from_bytes(data: &[u8]) -> &Self {
assert_eq!(data.len(), Frame::SIZE);
// SAFETY: &FrameBorrowed is equivalent to &[u8]
unsafe { transmute(data) }
}
/// returns this frame's page data.
pub fn page(&self) -> &[u8] {
&self.data[size_of::<FrameHeader>()..]
}
}
impl Deref for Frame {
type Target = FrameBorrowed;
fn deref(&self) -> &Self::Target {
FrameBorrowed::from_bytes(&self.data)
}
}
impl Frame {
/// size of a single frame
pub const SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize;
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
assert_eq!(data.len(), WAL_PAGE_SIZE as usize);
let mut buf = BytesMut::with_capacity(Self::SIZE);
buf.extend_from_slice(bytes_of(header));
buf.extend_from_slice(data);
Self { data: buf.freeze() }
}
pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> {
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size");
Ok(Self { data })
}
pub fn bytes(&self) -> Bytes {
self.data.clone()
}
}
#[derive(Clone)]
/// The owned version of a replication frame.
/// Cloning this is cheap.
pub struct Frame {
data: Bytes,
}
fn main() {
let path = std::env::args().nth(1).unwrap();
let file = File::open(path).unwrap();
let log = LogFile::new(file, 10000000000000000).unwrap();
dbg!(log.header);
let mut seen = HashSet::new();
let count_page = log.rev_frames_iter().unwrap().filter(|f| {
let frame_no = f.as_ref().unwrap().header().page_no;
seen.insert(frame_no)
}).count();
dbg!(log.rev_frames_iter().unwrap().map(|f| f.as_ref().unwrap().header().page_no).min());
dbg!(count_page);
let outfile = File::create("out.db").unwrap();
outfile.set_len(4096 * count_page as u64).unwrap();
let mut seen = HashSet::new();
log.rev_frames_iter().unwrap().for_each(|f| {
let f = f.as_ref().unwrap();
if seen.insert(f.header().page_no) {
outfile.write_all_at(f.page(), (f.header().page_no as u64 - 1) * 4096).unwrap();
println!("wrote page {}", f.header().page_no);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment