Skip to content

Instantly share code, notes, and snippets.

@daaku
Last active March 22, 2021 19:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daaku/58557e2545612df8f40b13b66b7d3bd0 to your computer and use it in GitHub Desktop.
Save daaku/58557e2545612df8f40b13b66b7d3bd0 to your computer and use it in GitHub Desktop.
mmap, parallel, zero-copy, xml parsing of wikipedia articles
use aho_corasick::AhoCorasick;
use anyhow::Result;
use crossbeam_channel::{bounded, Receiver, Sender};
use crossbeam_utils::thread;
use itertools::Itertools;
use memmap::Mmap;
use std::convert::TryInto;
use std::fs::File;
use std::str::from_utf8;
const NUM_WORKERS: usize = 32;
const CHUNK_SIZE: usize = 10 * 1024 * 1024;
const DUMP_FILE: &'static str =
"/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml";
const DEBUG: bool = true;
fn tag_text<'a>(doc: &'a roxmltree::Document, tag: &'static str) -> &'a str {
doc.descendants()
.find(|n| n.has_tag_name(tag))
.expect("tag to be found")
.text()
.expect("tag to have text")
}
fn process_chunk(data: &Mmap, receiver: &Receiver<(usize, usize)>) -> Result<()> {
let ac = AhoCorasick::new_auto_configured(&["<page>", "</page>"]);
for (start, end) in receiver {
let chunk = &data[start..end];
for (start, end) in ac.find_iter(&chunk).tuples() {
assert_eq!(start.pattern(), 0);
assert_eq!(end.pattern(), 1);
let text = from_utf8(&chunk[start.start()..end.end()])?;
let doc = roxmltree::Document::parse(text)?;
let id: u64 = tag_text(&doc, "id").parse()?;
let title = tag_text(&doc, "title");
println!("id: {}, title: {}", id, title);
}
}
Ok(())
}
fn read_dump(data: &Mmap, len: usize, sender: Sender<(usize, usize)>) -> Result<()> {
let ac = AhoCorasick::new_auto_configured(&["<page>"]);
let mut start = ac
.find(data.as_ref())
.expect("to find start of page")
.start();
let ac = AhoCorasick::new_auto_configured(&["</page>"]);
loop {
if DEBUG && start > 1_000_000 {
break;
}
if start >= len {
break;
}
let mut end = start + CHUNK_SIZE;
if end >= len {
end = len;
}
end = match ac.find(&data[end..]) {
Some(m) => end + m.end(),
None => match twoway::rfind_bytes(&data[start..], b"</page>") {
Some(v) => start + v + 7,
None => break,
},
};
sender.send((start, end))?;
start = end;
}
Ok(())
}
fn main() -> Result<()> {
let file = File::open(DUMP_FILE)?;
let len: usize = file.metadata()?.len().try_into()?;
let data = unsafe { Mmap::map(&file)? };
let (sender, receiver) = bounded(NUM_WORKERS);
thread::scope(|s| {
s.spawn(|_| read_dump(&data, len, sender).unwrap());
for _ in 0..NUM_WORKERS {
s.spawn(|_| process_chunk(&data, &receiver).unwrap());
}
})
.unwrap();
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment