Created
July 1, 2023 07:22
-
-
Save MarinPostma/ce3d7bfe3f1e7f937f009b6570a04f64 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::borrow::Cow; | |
use std::ops::Deref; | |
use std::collections::HashSet; | |
use std::fs::File; | |
use std::mem::{size_of, transmute}; | |
use std::os::unix::prelude::FileExt; | |
use bytemuck::{Zeroable, Pod, pod_read_unaligned, bytes_of, try_from_bytes}; | |
use bytes::{BytesMut, Bytes}; | |
type FrameNo = u64; | |
#[derive(Debug, Clone, Copy, Zeroable, Pod)] | |
#[repr(C)] | |
pub struct LogFileHeader { | |
/// magic number: b"SQLDWAL\0" as u64 | |
pub magic: u64, | |
/// Initial checksum value for the rolling CRC checksum | |
/// computed with the 64 bits CRC_64_GO_ISO | |
pub start_checksum: u64, | |
/// Uuid of the database associated with this log. | |
pub db_id: u128, | |
/// Frame_no of the first frame in the log | |
pub start_frame_no: u64, | |
/// entry count in file | |
pub frame_count: u64, | |
/// Wal file version number, currently: 2 | |
pub version: u32, | |
/// page size: 4096 | |
pub page_size: i32, | |
/// sqld version when creating this log | |
pub sqld_version: [u16; 4], | |
} | |
#[derive(Debug)] | |
pub struct LogFile { | |
file: File, | |
pub header: LogFileHeader, | |
/// the maximum number of frames this log is allowed to contain before it should be compacted. | |
max_log_frame_count: u64, | |
/// number of frames in the log that have not been commited yet. On commit the header's frame | |
/// count is incremented by that ammount. New pages are written after the last | |
/// header.frame_count + uncommit_frame_count. | |
/// On rollback, this is reset to 0, so that everything that was written after the previous | |
/// header.frame_count is ignored and can be overwritten | |
uncommitted_frame_count: u64, | |
uncommitted_checksum: u64, | |
/// checksum of the last commited frame | |
commited_checksum: u64, | |
} | |
#[repr(C)] | |
#[derive(Debug, Clone, Copy, Zeroable, Pod)] | |
pub struct FrameHeader { | |
/// Incremental frame number | |
pub frame_no: u64, | |
/// Rolling checksum of all the previous frames, including this one. | |
pub checksum: u64, | |
/// page number, if frame_type is FrameType::Page | |
pub page_no: u32, | |
/// Size of the database (in page) after commiting the transaction. This is passed from sqlite, | |
/// and serves as commit transaction boundary | |
pub size_after: u32, | |
} | |
const WAL_PAGE_SIZE: u64 = 4096; | |
impl LogFile { | |
/// size of a single frame | |
pub const FRAME_SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize; | |
pub fn new(file: File, max_log_frame_count: u64) -> anyhow::Result<Self> { | |
// FIXME: we should probably take a lock on this file, to prevent anybody else to write to | |
// it. | |
let file_end = file.metadata()?.len(); | |
let header = Self::read_header(&file)?; | |
let mut this = Self { | |
file, | |
header, | |
max_log_frame_count, | |
uncommitted_frame_count: 0, | |
uncommitted_checksum: 0, | |
commited_checksum: 0, | |
}; | |
this.commited_checksum = this.header.start_checksum; | |
this.uncommitted_checksum = this.header.start_checksum; | |
Ok(this) | |
} | |
pub fn read_header(file: &File) -> anyhow::Result<LogFileHeader> { | |
let mut buf = [0; size_of::<LogFileHeader>()]; | |
file.read_exact_at(&mut buf, 0)?; | |
let header: LogFileHeader = pod_read_unaligned(&buf); | |
Ok(header) | |
} | |
pub fn header(&self) -> &LogFileHeader { | |
&self.header | |
} | |
/// Returns an iterator over the WAL frame headers | |
#[allow(dead_code)] | |
fn frames_iter(&self) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Frame>> + '_> { | |
let mut current_frame_offset = 0; | |
Ok(std::iter::from_fn(move || { | |
if current_frame_offset >= self.header.frame_count { | |
return None; | |
} | |
let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); | |
current_frame_offset += 1; | |
Some(self.read_frame_byte_offset(read_byte_offset)) | |
})) | |
} | |
/// Returns an iterator over the WAL frame headers | |
pub fn rev_frames_iter( | |
&self, | |
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Frame>> + '_> { | |
let mut current_frame_offset = self.header.frame_count; | |
Ok(std::iter::from_fn(move || { | |
if current_frame_offset == 0 { | |
return None; | |
} | |
current_frame_offset -= 1; | |
let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); | |
let frame = self.read_frame_byte_offset(read_byte_offset); | |
Some(frame) | |
})) | |
} | |
/// offset in bytes at which to write the next frame | |
fn next_byte_offset(&self) -> u64 { | |
Self::absolute_byte_offset(self.header().frame_count + self.uncommitted_frame_count) | |
} | |
fn next_frame_no(&self) -> FrameNo { | |
self.header().start_frame_no + self.header().frame_count + self.uncommitted_frame_count | |
} | |
/// Returns the bytes position of the `nth` entry in the log | |
fn absolute_byte_offset(nth: u64) -> u64 { | |
std::mem::size_of::<LogFileHeader>() as u64 + nth * Self::FRAME_SIZE as u64 | |
} | |
fn byte_offset(&self, id: FrameNo) -> anyhow::Result<Option<u64>> { | |
if id < self.header.start_frame_no | |
|| id > self.header.start_frame_no + self.header.frame_count | |
{ | |
return Ok(None); | |
} | |
Ok(Self::absolute_byte_offset(id - self.header.start_frame_no).into()) | |
} | |
/// Returns bytes represening a WalFrame for frame `frame_no` | |
/// | |
/// If the requested frame is before the first frame in the log, or after the last frame, | |
/// Ok(None) is returned. | |
fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result<Frame> { | |
let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); | |
self.file.read_exact_at(&mut buffer, offset)?; | |
let buffer = buffer.freeze(); | |
Frame::try_from_bytes(buffer) | |
} | |
} | |
#[repr(transparent)] | |
pub struct FrameBorrowed { | |
data: [u8], | |
} | |
impl FrameBorrowed { | |
pub fn header(&self) -> Cow<FrameHeader> { | |
let data = &self.data[..size_of::<FrameHeader>()]; | |
try_from_bytes(data) | |
.map(Cow::Borrowed) | |
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data))) | |
} | |
/// Returns the bytes for this frame. Includes the header bytes. | |
pub fn as_slice(&self) -> &[u8] { | |
&self.data | |
} | |
pub fn from_bytes(data: &[u8]) -> &Self { | |
assert_eq!(data.len(), Frame::SIZE); | |
// SAFETY: &FrameBorrowed is equivalent to &[u8] | |
unsafe { transmute(data) } | |
} | |
/// returns this frame's page data. | |
pub fn page(&self) -> &[u8] { | |
&self.data[size_of::<FrameHeader>()..] | |
} | |
} | |
impl Deref for Frame { | |
type Target = FrameBorrowed; | |
fn deref(&self) -> &Self::Target { | |
FrameBorrowed::from_bytes(&self.data) | |
} | |
} | |
impl Frame { | |
/// size of a single frame | |
pub const SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize; | |
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self { | |
assert_eq!(data.len(), WAL_PAGE_SIZE as usize); | |
let mut buf = BytesMut::with_capacity(Self::SIZE); | |
buf.extend_from_slice(bytes_of(header)); | |
buf.extend_from_slice(data); | |
Self { data: buf.freeze() } | |
} | |
pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> { | |
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); | |
Ok(Self { data }) | |
} | |
pub fn bytes(&self) -> Bytes { | |
self.data.clone() | |
} | |
} | |
#[derive(Clone)] | |
/// The owned version of a replication frame. | |
/// Cloning this is cheap. | |
pub struct Frame { | |
data: Bytes, | |
} | |
fn main() { | |
let path = std::env::args().nth(1).unwrap(); | |
let file = File::open(path).unwrap(); | |
let log = LogFile::new(file, 10000000000000000).unwrap(); | |
dbg!(log.header); | |
let mut seen = HashSet::new(); | |
let count_page = log.rev_frames_iter().unwrap().filter(|f| { | |
let frame_no = f.as_ref().unwrap().header().page_no; | |
seen.insert(frame_no) | |
}).count(); | |
dbg!(log.rev_frames_iter().unwrap().map(|f| f.as_ref().unwrap().header().page_no).min()); | |
dbg!(count_page); | |
let outfile = File::create("out.db").unwrap(); | |
outfile.set_len(4096 * count_page as u64).unwrap(); | |
let mut seen = HashSet::new(); | |
log.rev_frames_iter().unwrap().for_each(|f| { | |
let f = f.as_ref().unwrap(); | |
if seen.insert(f.header().page_no) { | |
outfile.write_all_at(f.page(), (f.header().page_no as u64 - 1) * 4096).unwrap(); | |
println!("wrote page {}", f.header().page_no); | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment