Last active
April 14, 2022 17:52
-
-
Save songpp/332f499db9fa77168cd2549dea9af9bf to your computer and use it in GitHub Desktop.
directory format toc.dat reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! this is a pg_dump archive (only 'directory' output) toc.dat reader. | |
use bytes::{buf::Buf, BytesMut}; | |
use futures_util::stream::StreamExt; | |
use tokio_util::codec::*; | |
use nom::{IResult, | |
number::streaming as ns, | |
bytes::streaming as bs, | |
Offset}; | |
use std::io; | |
use nom::bytes::complete::{tag}; | |
use crate::ArchiveFormat::{UNKNOWN, CUSTOM, TAR, NULL, DIRECTORY}; | |
use core::convert::TryInto; | |
use num::ToPrimitive; | |
use std::collections::{HashMap, HashSet}; | |
use std::process::exit; | |
use std::str::FromStr; | |
use std::env::args; | |
use thiserror::Error; | |
#[derive(Debug, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize)] | |
struct Header { | |
magic: String, | |
major_ver: u8, | |
minor_ver: u8, | |
revision: u8, | |
int_size: usize, | |
off_size: usize, | |
format: ArchiveFormat, | |
db_name: String, | |
entry_count: usize, | |
} | |
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] | |
struct TocEntry { | |
section: usize, | |
dump_id: EntryId, | |
had_dumper: bool, | |
cat_oid: u32, | |
cat_table_oid: u32, | |
tag: String, | |
desc: String, | |
deps: HashSet<EntryId>, | |
defn: Option<String>, | |
drop_stmt: Option<String>, | |
copy_stmt: Option<String>, | |
namespace: Option<String>, | |
tablespace: Option<String>, | |
owner: String, | |
with_oids: bool, | |
extra_data: ExtraTocData, | |
} | |
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] | |
struct ExtraTocData { | |
filename: Option<String> | |
} | |
const EMPTY: &'static str = ""; | |
const MAGIC: &'static str = "PGDMP"; | |
const TRUE_STR: &'static str = "true"; | |
#[derive(Error, Debug)] | |
pub enum FormatError { | |
#[error("Invalid header (expected {expected:?}, got {found:?})")] | |
InvalidHeader { | |
expected: String, | |
found: String, | |
}, | |
#[error("Missing attribute: {0}")] | |
MissingAttribute(String), | |
#[error("io error")] | |
IOError { | |
#[from] | |
source: io::Error, | |
}, | |
} | |
#[derive(Debug)] | |
struct TocCodec; | |
impl Decoder for TocCodec { | |
type Item = DumpArchive; | |
type Error = FormatError; | |
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { | |
let (consumed, header) = match parse_toc_header(src) { | |
Ok((remain, header)) => (src.offset(remain), header), | |
Err(e) => { | |
if e.is_incomplete() { | |
return Ok(None); | |
} else { | |
return Err(FormatError::InvalidHeader{ | |
expected: "Header".to_string(), found: "?".to_string() | |
}); | |
} | |
} | |
}; | |
src.advance(consumed); | |
let entry_count = header.entry_count; | |
let mut archive = DumpArchive { header, entries: HashMap::new() }; | |
if entry_count > 0 { | |
assert!(matches!(parse_toc_entries(src, &mut archive), Ok(_x))); | |
} | |
Ok(Some(archive)) | |
} | |
} | |
fn parse_toc_entries<'a, 'b>(src: &'a [u8], archive: &'b mut DumpArchive) -> IResult<&'a [u8], ()> { | |
let entry_count = archive.header.entry_count; | |
let int_size = archive.header.int_size; | |
let mut input = src; | |
let entries = &mut archive.entries; | |
for _i in 0..entry_count { | |
let (remain_input, entry) = parse_entry(input, int_size)?; | |
input = remain_input; | |
entries.insert(entry.dump_id, entry); | |
} | |
Ok((input, ())) | |
} | |
fn parse_entry(src: &[u8], int_size: usize) -> IResult<&[u8], TocEntry> { | |
let input = src; | |
let (r, dump_id) = parse_int(input, int_size)?; | |
assert!(dump_id >= 0); | |
let (r, had_dumper_int) = parse_int(r, int_size)?; | |
let had_dumper = had_dumper_int > 0; | |
let (r, cat_tab_oid_str) = parse_str(r, int_size)?; | |
let cat_table_oid = match cat_tab_oid_str { | |
Some(t) if !t.is_empty() => u32::from_str(t.as_str()).unwrap(), | |
_ => 0 | |
}; | |
let (r, cat_oid_str) = parse_str(r, int_size)?; | |
let cat_oid = match cat_oid_str { | |
Some(id) if !id.is_empty() => u32::from_str(id.as_str()).unwrap(), | |
_ => 0 | |
}; | |
let (r, tag) = parse_str(r, int_size)?; | |
let (r, desc) = parse_str(r, int_size)?; | |
let (r, section) = parse_int(r, int_size)?; | |
let (r, defn) = parse_str(r, int_size)?; | |
let (r, drop_stmt) = parse_str(r, int_size)?; | |
let (r, copy_stmt) = parse_str(r, int_size)?; | |
let (r, namespace) = parse_str(r, int_size)?; | |
let (r, tablespace) = parse_str(r, int_size)?; | |
let (r, owner) = parse_str(r, int_size)?; | |
let (r, with_oids_bool_str) = parse_str(r, int_size)?; | |
let with_oids = if let Some(ref str) = with_oids_bool_str { str == TRUE_STR } else { false }; | |
let (r, dep_dump_ids) = parse_entry_deps(r, int_size)?; | |
let (r, extra_toc_data) = parse_str(r, int_size)?; | |
Ok((r, TocEntry { | |
section: section as usize, | |
dump_id, | |
tag: tag.unwrap(), | |
desc: desc.unwrap(), | |
cat_oid: cat_oid.into(), | |
cat_table_oid: cat_table_oid.into(), | |
defn, | |
drop_stmt, | |
copy_stmt, | |
namespace, | |
tablespace, | |
owner: owner.unwrap(), | |
deps: dep_dump_ids, | |
had_dumper, | |
with_oids, | |
extra_data: ExtraTocData { filename: extra_toc_data }, | |
})) | |
} | |
fn parse_entry_deps(input: &[u8], int_size: usize) -> IResult<&[u8], HashSet<EntryId>> { | |
let mut r = input; | |
let mut deps = HashSet::new(); | |
loop { | |
let (remain, dump_id) = parse_str(r, int_size)?; | |
r = remain; | |
if let Some(id_str) = dump_id { | |
let dep = i64::from_str(id_str.as_str()).unwrap(); | |
deps.insert(dep); | |
} else { | |
break; | |
} | |
} | |
Ok((r, deps)) | |
} | |
fn parse_toc_magic(input: &[u8]) -> IResult<&[u8], &[u8]> { | |
tag(MAGIC)(input) | |
} | |
#[derive(Debug, PartialOrd, PartialEq, Eq, Ord, serde::Serialize, serde::Deserialize)] | |
enum ArchiveFormat { | |
UNKNOWN, | |
CUSTOM, | |
TAR, | |
NULL, | |
DIRECTORY, | |
} | |
impl ArchiveFormat { | |
fn from_ordinal(c: u8) -> Self { | |
if c == 1 { | |
CUSTOM | |
} else if c == 3 { | |
TAR | |
} else if c == 4 { | |
NULL | |
} else if c == 5 { | |
DIRECTORY | |
} else { | |
UNKNOWN | |
} | |
} | |
} | |
/// parse a int number from archive toc file | |
fn parse_int(input: &[u8], int_size: usize) -> IResult<&[u8], i64> { | |
let (r, sign_byte) = ns::u8(input)?; | |
let (r, int_bytes) = bs::take(int_size)(r)?; | |
let i = u32::from_le_bytes(int_bytes.try_into().unwrap()); | |
let n = if sign_byte > 0 { | |
0 - i64::from(i) | |
} else { | |
i.into() | |
}; | |
Ok((r, n)) | |
} | |
/// parse a string from archive toc file | |
/// str byte length (int) + str bytes | |
fn parse_str(input: &[u8], int_size: usize) -> IResult<&[u8], Option<String>> { | |
let (r, len) = parse_int(input, int_size)?; | |
if len < 0 { | |
Ok((r, None)) | |
} else if len == 0 { | |
Ok((r, Some(EMPTY.to_string()))) | |
} else { | |
let (r, str_bytes) = bs::take(len as usize)(r)?; | |
Ok((r, Some(String::from_utf8_lossy(str_bytes).to_string()))) | |
} | |
} | |
fn parse_toc_header(input: &[u8]) -> IResult<&[u8], Header> { | |
let (r, magic) = parse_toc_magic(input)?; | |
let (r, major_ver) = ns::u8(r)?; | |
let (r, minor_ver) = ns::u8(r)?; | |
let (r, revision) = ns::u8(r)?; | |
let (r, int_size) = ns::u8(r)?; | |
let (r, off_size) = ns::u8(r)?; | |
let (r, format) = ns::u8(r)?; | |
// used through parsing | |
let int_size: usize = int_size.into(); | |
let (r, _compression) = parse_int(r, int_size)?; | |
let mut remain = r; | |
// todo time | |
for _i in 0..7usize { | |
let (r, _v) = parse_int(remain, int_size)?; | |
remain = r; | |
} | |
let (r, db_name) = parse_str(remain, int_size)?; | |
let (r, _arch_remove_ver) = parse_str(r, int_size)?; | |
let (r, _arch_dump_ver) = parse_str(r, int_size)?; | |
// dbg!(&db_name, &arch_remove_ver, &arch_dump_ver); | |
let (r, entry_count) = parse_int(r, int_size)?; | |
Ok((r, Header { | |
magic: String::from_utf8_lossy(magic).to_string(), | |
major_ver: major_ver.into(), | |
minor_ver: minor_ver.into(), | |
revision: revision.into(), | |
int_size: int_size.into(), | |
off_size: off_size.into(), | |
format: ArchiveFormat::from_ordinal(format), | |
db_name: db_name.unwrap().to_owned(), | |
entry_count: entry_count.to_usize().unwrap(), | |
})) | |
} | |
type EntryId = i64; | |
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] | |
struct DumpArchive { | |
header: Header, | |
entries: HashMap<EntryId, TocEntry>, | |
} | |
async fn parse_dump_archive(toc_file: &str) -> Result<DumpArchive, anyhow::Error> { | |
let source = tokio::fs::File::open(toc_file).await?; | |
let stream = FramedRead::new(source, TocCodec); | |
let (parsed, _entry_stream) = stream.into_future().await; | |
if let Some(Err(err)) = parsed { | |
eprintln!("{:?}", err); | |
exit(1); | |
} | |
let archive = parsed.unwrap()?; | |
Ok(archive) | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), anyhow::Error> { | |
let target_toc_file = args().nth(1); | |
// dbg!(&target_toc_file); | |
if let Some(toc_file) = target_toc_file { | |
let archive = parse_dump_archive(toc_file.as_str()).await?; | |
// println!("archive = {:#?}", &archive); | |
let j = serde_json::to_string(&archive).unwrap(); | |
println!("{}", j); | |
// println!("including entry count = {}", archive.entries.len()); | |
} else { | |
eprintln!("give me a file toc.dat file path"); | |
exit(1); | |
} | |
Ok(()) | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
// use tokio::io::{AsyncRead}; | |
// use tokio::io::{AsyncRead, ReadBuf}; | |
// use futures_util::io::AllowStdIo; | |
const MY_TEST_DUMPED_TOC: &str = "/tmp/x5/toc.dat"; | |
//type GzEntryStream = FramedRead<Compat<AllowStdIo<GzDecoder<File>>>, HeaderCodec>; | |
// | |
#[tokio::test] | |
async fn test_parse_toc_header() -> Result<(), anyhow::Error> { | |
// let mut file = File::open(MY_TEST_DUMPED_TOC).await?; | |
// let compat = file.compat(); | |
// let std_io = AllowStdIo::new(std::fs::File::open(MY_TEST_DUMPED_TOC)?); | |
// let file = std_io.compat(); | |
// let decoder = read::GzDecoder::new(std_io); | |
// let decoder = read::GzDecoder::new(std::fs::File::open(MY_TEST_DUMPED_TOC)?); | |
// let reader = AllowStdIo::new(decoder); | |
// let source = reader.compat(); | |
let mut source = tokio::fs::File::open(MY_TEST_DUMPED_TOC).await?; | |
let stream = FramedRead::new(source, TocCodec); | |
let (header, entry_stream) = stream.into_future().await; | |
dbg!(&header, &entry_stream); | |
Ok(()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment