Skip to content

Instantly share code, notes, and snippets.

@songpp
Last active April 14, 2022 17:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save songpp/332f499db9fa77168cd2549dea9af9bf to your computer and use it in GitHub Desktop.
Save songpp/332f499db9fa77168cd2549dea9af9bf to your computer and use it in GitHub Desktop.
directory format toc.dat reader
//! this is a pg_dump archive (only 'directory' output) toc.dat reader.
use bytes::{buf::Buf, BytesMut};
use futures_util::stream::StreamExt;
use tokio_util::codec::*;
use nom::{IResult,
number::streaming as ns,
bytes::streaming as bs,
Offset};
use std::io;
use nom::bytes::complete::{tag};
use crate::ArchiveFormat::{UNKNOWN, CUSTOM, TAR, NULL, DIRECTORY};
use core::convert::TryInto;
use num::ToPrimitive;
use std::collections::{HashMap, HashSet};
use std::process::exit;
use std::str::FromStr;
use std::env::args;
use thiserror::Error;
#[derive(Debug, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize)]
struct Header {
magic: String,
major_ver: u8,
minor_ver: u8,
revision: u8,
int_size: usize,
off_size: usize,
format: ArchiveFormat,
db_name: String,
entry_count: usize,
}
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct TocEntry {
section: usize,
dump_id: EntryId,
had_dumper: bool,
cat_oid: u32,
cat_table_oid: u32,
tag: String,
desc: String,
deps: HashSet<EntryId>,
defn: Option<String>,
drop_stmt: Option<String>,
copy_stmt: Option<String>,
namespace: Option<String>,
tablespace: Option<String>,
owner: String,
with_oids: bool,
extra_data: ExtraTocData,
}
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct ExtraTocData {
filename: Option<String>
}
const EMPTY: &'static str = "";
const MAGIC: &'static str = "PGDMP";
const TRUE_STR: &'static str = "true";
#[derive(Error, Debug)]
pub enum FormatError {
#[error("Invalid header (expected {expected:?}, got {found:?})")]
InvalidHeader {
expected: String,
found: String,
},
#[error("Missing attribute: {0}")]
MissingAttribute(String),
#[error("io error")]
IOError {
#[from]
source: io::Error,
},
}
#[derive(Debug)]
struct TocCodec;
impl Decoder for TocCodec {
type Item = DumpArchive;
type Error = FormatError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let (consumed, header) = match parse_toc_header(src) {
Ok((remain, header)) => (src.offset(remain), header),
Err(e) => {
if e.is_incomplete() {
return Ok(None);
} else {
return Err(FormatError::InvalidHeader{
expected: "Header".to_string(), found: "?".to_string()
});
}
}
};
src.advance(consumed);
let entry_count = header.entry_count;
let mut archive = DumpArchive { header, entries: HashMap::new() };
if entry_count > 0 {
assert!(matches!(parse_toc_entries(src, &mut archive), Ok(_x)));
}
Ok(Some(archive))
}
}
fn parse_toc_entries<'a, 'b>(src: &'a [u8], archive: &'b mut DumpArchive) -> IResult<&'a [u8], ()> {
let entry_count = archive.header.entry_count;
let int_size = archive.header.int_size;
let mut input = src;
let entries = &mut archive.entries;
for _i in 0..entry_count {
let (remain_input, entry) = parse_entry(input, int_size)?;
input = remain_input;
entries.insert(entry.dump_id, entry);
}
Ok((input, ()))
}
fn parse_entry(src: &[u8], int_size: usize) -> IResult<&[u8], TocEntry> {
let input = src;
let (r, dump_id) = parse_int(input, int_size)?;
assert!(dump_id >= 0);
let (r, had_dumper_int) = parse_int(r, int_size)?;
let had_dumper = had_dumper_int > 0;
let (r, cat_tab_oid_str) = parse_str(r, int_size)?;
let cat_table_oid = match cat_tab_oid_str {
Some(t) if !t.is_empty() => u32::from_str(t.as_str()).unwrap(),
_ => 0
};
let (r, cat_oid_str) = parse_str(r, int_size)?;
let cat_oid = match cat_oid_str {
Some(id) if !id.is_empty() => u32::from_str(id.as_str()).unwrap(),
_ => 0
};
let (r, tag) = parse_str(r, int_size)?;
let (r, desc) = parse_str(r, int_size)?;
let (r, section) = parse_int(r, int_size)?;
let (r, defn) = parse_str(r, int_size)?;
let (r, drop_stmt) = parse_str(r, int_size)?;
let (r, copy_stmt) = parse_str(r, int_size)?;
let (r, namespace) = parse_str(r, int_size)?;
let (r, tablespace) = parse_str(r, int_size)?;
let (r, owner) = parse_str(r, int_size)?;
let (r, with_oids_bool_str) = parse_str(r, int_size)?;
let with_oids = if let Some(ref str) = with_oids_bool_str { str == TRUE_STR } else { false };
let (r, dep_dump_ids) = parse_entry_deps(r, int_size)?;
let (r, extra_toc_data) = parse_str(r, int_size)?;
Ok((r, TocEntry {
section: section as usize,
dump_id,
tag: tag.unwrap(),
desc: desc.unwrap(),
cat_oid: cat_oid.into(),
cat_table_oid: cat_table_oid.into(),
defn,
drop_stmt,
copy_stmt,
namespace,
tablespace,
owner: owner.unwrap(),
deps: dep_dump_ids,
had_dumper,
with_oids,
extra_data: ExtraTocData { filename: extra_toc_data },
}))
}
fn parse_entry_deps(input: &[u8], int_size: usize) -> IResult<&[u8], HashSet<EntryId>> {
let mut r = input;
let mut deps = HashSet::new();
loop {
let (remain, dump_id) = parse_str(r, int_size)?;
r = remain;
if let Some(id_str) = dump_id {
let dep = i64::from_str(id_str.as_str()).unwrap();
deps.insert(dep);
} else {
break;
}
}
Ok((r, deps))
}
fn parse_toc_magic(input: &[u8]) -> IResult<&[u8], &[u8]> {
tag(MAGIC)(input)
}
#[derive(Debug, PartialOrd, PartialEq, Eq, Ord, serde::Serialize, serde::Deserialize)]
enum ArchiveFormat {
UNKNOWN,
CUSTOM,
TAR,
NULL,
DIRECTORY,
}
impl ArchiveFormat {
fn from_ordinal(c: u8) -> Self {
if c == 1 {
CUSTOM
} else if c == 3 {
TAR
} else if c == 4 {
NULL
} else if c == 5 {
DIRECTORY
} else {
UNKNOWN
}
}
}
/// parse a int number from archive toc file
fn parse_int(input: &[u8], int_size: usize) -> IResult<&[u8], i64> {
let (r, sign_byte) = ns::u8(input)?;
let (r, int_bytes) = bs::take(int_size)(r)?;
let i = u32::from_le_bytes(int_bytes.try_into().unwrap());
let n = if sign_byte > 0 {
0 - i64::from(i)
} else {
i.into()
};
Ok((r, n))
}
/// parse a string from archive toc file
/// str byte length (int) + str bytes
fn parse_str(input: &[u8], int_size: usize) -> IResult<&[u8], Option<String>> {
let (r, len) = parse_int(input, int_size)?;
if len < 0 {
Ok((r, None))
} else if len == 0 {
Ok((r, Some(EMPTY.to_string())))
} else {
let (r, str_bytes) = bs::take(len as usize)(r)?;
Ok((r, Some(String::from_utf8_lossy(str_bytes).to_string())))
}
}
fn parse_toc_header(input: &[u8]) -> IResult<&[u8], Header> {
let (r, magic) = parse_toc_magic(input)?;
let (r, major_ver) = ns::u8(r)?;
let (r, minor_ver) = ns::u8(r)?;
let (r, revision) = ns::u8(r)?;
let (r, int_size) = ns::u8(r)?;
let (r, off_size) = ns::u8(r)?;
let (r, format) = ns::u8(r)?;
// used through parsing
let int_size: usize = int_size.into();
let (r, _compression) = parse_int(r, int_size)?;
let mut remain = r;
// todo time
for _i in 0..7usize {
let (r, _v) = parse_int(remain, int_size)?;
remain = r;
}
let (r, db_name) = parse_str(remain, int_size)?;
let (r, _arch_remove_ver) = parse_str(r, int_size)?;
let (r, _arch_dump_ver) = parse_str(r, int_size)?;
// dbg!(&db_name, &arch_remove_ver, &arch_dump_ver);
let (r, entry_count) = parse_int(r, int_size)?;
Ok((r, Header {
magic: String::from_utf8_lossy(magic).to_string(),
major_ver: major_ver.into(),
minor_ver: minor_ver.into(),
revision: revision.into(),
int_size: int_size.into(),
off_size: off_size.into(),
format: ArchiveFormat::from_ordinal(format),
db_name: db_name.unwrap().to_owned(),
entry_count: entry_count.to_usize().unwrap(),
}))
}
type EntryId = i64;
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
struct DumpArchive {
header: Header,
entries: HashMap<EntryId, TocEntry>,
}
async fn parse_dump_archive(toc_file: &str) -> Result<DumpArchive, anyhow::Error> {
let source = tokio::fs::File::open(toc_file).await?;
let stream = FramedRead::new(source, TocCodec);
let (parsed, _entry_stream) = stream.into_future().await;
if let Some(Err(err)) = parsed {
eprintln!("{:?}", err);
exit(1);
}
let archive = parsed.unwrap()?;
Ok(archive)
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let target_toc_file = args().nth(1);
// dbg!(&target_toc_file);
if let Some(toc_file) = target_toc_file {
let archive = parse_dump_archive(toc_file.as_str()).await?;
// println!("archive = {:#?}", &archive);
let j = serde_json::to_string(&archive).unwrap();
println!("{}", j);
// println!("including entry count = {}", archive.entries.len());
} else {
eprintln!("give me a file toc.dat file path");
exit(1);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// use tokio::io::{AsyncRead};
// use tokio::io::{AsyncRead, ReadBuf};
// use futures_util::io::AllowStdIo;
const MY_TEST_DUMPED_TOC: &str = "/tmp/x5/toc.dat";
//type GzEntryStream = FramedRead<Compat<AllowStdIo<GzDecoder<File>>>, HeaderCodec>;
//
#[tokio::test]
async fn test_parse_toc_header() -> Result<(), anyhow::Error> {
// let mut file = File::open(MY_TEST_DUMPED_TOC).await?;
// let compat = file.compat();
// let std_io = AllowStdIo::new(std::fs::File::open(MY_TEST_DUMPED_TOC)?);
// let file = std_io.compat();
// let decoder = read::GzDecoder::new(std_io);
// let decoder = read::GzDecoder::new(std::fs::File::open(MY_TEST_DUMPED_TOC)?);
// let reader = AllowStdIo::new(decoder);
// let source = reader.compat();
let mut source = tokio::fs::File::open(MY_TEST_DUMPED_TOC).await?;
let stream = FramedRead::new(source, TocCodec);
let (header, entry_stream) = stream.into_future().await;
dbg!(&header, &entry_stream);
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment