Skip to content

Instantly share code, notes, and snippets.

@jessegrosjean
Last active November 7, 2018 23:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jessegrosjean/6b3ca3a52249b65f58294dfafca12c91 to your computer and use it in GitHub Desktop.
Save jessegrosjean/6b3ca3a52249b65f58294dfafca12c91 to your computer and use it in GitHub Desktop.
// Originally based off https://github.com/BurntSushi/ripgrep/tree/master/ignore
// Use as you see fit.
use std::fs;
use std::fs::{Metadata, FileType};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::collections::BinaryHeap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::cmp;
use std::cmp::{Ordering};
use std::thread;
use std::vec;
use rand::Rng;
use rand::prelude::*;
use num_cpus;
use alphanumeric_sort;
pub struct WalkBuilder {
paths: Vec<PathBuf>,
max_depth: Option<usize>,
max_filesize: Option<u64>,
include_metadata: bool,
ignore_hidden: bool,
threads: usize,
}
pub struct Walk {
paths: vec::IntoIter<PathBuf>,
max_filesize: Option<u64>,
max_depth: Option<usize>,
include_metadata: bool,
ignore_hidden: bool,
threads: usize,
}
pub struct WalkEntry {
pub name: String,
pub path: PathBuf,
pub file_type: FileType,
pub metadata: Option<Metadata>,
index_path: Vec<usize>,
child_count: usize,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum WalkState {
Continue,
Skip,
Quit,
}
struct Worker {
callback: Box<FnMut(WalkEntry) -> WalkState + Send + 'static>,
work_senders: Vec<Sender<WalkEntry>>,
work_receiver: Receiver<WalkEntry>,
steal_from_queues: Vec<Arc<Mutex<BinaryHeap<WalkEntry>>>>,
work_queue: Arc<Mutex<BinaryHeap<WalkEntry>>>,
active_work_count: Arc<AtomicUsize>,
max_depth: Option<usize>,
max_filesize: Option<u64>,
quit_now: Arc<AtomicBool>,
threads: usize,
include_metadata: bool,
ignore_hidden: bool,
id: usize,
}
struct NextEntry {
index_path: Vec<usize>,
remaining_siblings: Vec<usize>,
}
impl WalkBuilder {
pub fn new<P: AsRef<Path>>(path: P) -> WalkBuilder {
WalkBuilder {
paths: vec![path.as_ref().to_path_buf()],
max_depth: None,
max_filesize: None,
include_metadata: false,
ignore_hidden: true,
threads: cmp::min(12, num_cpus::get()),
}
}
pub fn build(&self) -> Walk {
Walk {
paths: self.paths.clone().into_iter(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
include_metadata: self.include_metadata,
ignore_hidden: self.ignore_hidden,
threads: self.threads,
}
}
pub fn add<P: AsRef<Path>>(&mut self, path: P) -> &mut WalkBuilder {
self.paths.push(path.as_ref().to_path_buf());
self
}
pub fn max_depth(&mut self, depth: Option<usize>) -> &mut WalkBuilder {
self.max_depth = depth;
self
}
pub fn include_metadata(&mut self, yes: bool) -> &mut WalkBuilder {
self.include_metadata = yes;
self
}
pub fn ignore_hidden(&mut self, yes: bool) -> &mut WalkBuilder {
self.ignore_hidden = yes;
self
}
pub fn max_filesize(&mut self, filesize: Option<u64>) -> &mut WalkBuilder {
self.max_filesize = filesize;
self
}
pub fn threads(&mut self, n: usize) -> &mut WalkBuilder {
self.threads = n;
self
}
}
impl Walk {
pub fn entries(self) -> Receiver<WalkEntry> {
let (sx, rx) = channel::<WalkEntry>();
let (ordered_sx, ordered_rx) = channel::<WalkEntry>();
thread::spawn(move || {
let mut receive_buffer = BinaryHeap::new();
let mut next_entry = NextEntry::new();
while let Ok(entry) = rx.recv() {
receive_buffer.push(entry);
while next_entry.matches(receive_buffer.peek()) {
let send_entry = receive_buffer.pop().unwrap();
next_entry.advance_past(&send_entry);
if ordered_sx.send(send_entry).is_err() {
return
}
}
}
while let Some(entry) = receive_buffer.pop() {
if ordered_sx.send(entry).is_err() {
return
}
}
});
self.run_async(move || {
let sx = sx.clone();
Box::new(move |entry| {
if sx.send(entry).is_err() {
WalkState::Quit
} else {
WalkState::Continue
}
})
});
ordered_rx
}
pub fn run<F>(self, make_callback: F)
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static>
{
self.inner_run(false, make_callback);
}
pub fn run_async<F>(self, make_callback: F)
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static>
{
self.inner_run(true, make_callback);
}
fn inner_run<F>(self, async: bool, mut make_callback: F)
where F: FnMut() -> Box<FnMut(WalkEntry) -> WalkState + Send + 'static>
{
let quit_now = Arc::new(AtomicBool::new(false));
let worker_count = cmp::max(1, self.threads);
let mut active_work_count = 0;
let mut handles = vec![];
let mut work_queues = Vec::new();
let mut work_senders = Vec::new();
let mut work_receivers = Vec::new();
for _ in 0..worker_count {
let (sender, reciver) = channel::<WalkEntry>();
work_senders.push(sender);
work_receivers.push(reciver);
work_queues.push(Arc::new(Mutex::new(BinaryHeap::new())));
}
for each in self.paths {
let mut rng = thread_rng();
if let Some(each_entry) = WalkEntry::new(&each) {
if rng.choose(&work_senders).unwrap().send(each_entry).is_ok() {
active_work_count += 1;
}
}
}
let active_work_count = Arc::new(AtomicUsize::new(active_work_count));
for i in 0..worker_count {
let mut worker = Worker {
callback: make_callback(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
ignore_hidden: self.ignore_hidden,
steal_from_queues: work_queues.clone(),
work_senders: work_senders.clone(),
work_receiver: work_receivers.pop().unwrap(),
work_queue: work_queues[i].clone(),
active_work_count: active_work_count.clone(),
quit_now: quit_now.clone(),
include_metadata: self.include_metadata,
threads: worker_count,
id: i,
};
handles.push(thread::spawn(move || worker.run()));
}
let sync = !async;
if sync {
for handle in handles {
handle.join().unwrap();
}
assert!(active_work_count.load(AtomicOrdering::SeqCst) == 0);
}
}
}
impl WalkState {
fn is_quit(self) -> bool {
self == WalkState::Quit
}
}
impl Worker {
fn run(&mut self) {
self.steal_from_queues.remove(self.id); // dont' steal from self!
while let Some(mut entry) = self.get_work() {
let should_skip_child_entries =
entry.is_symlink() ||
!entry.is_dir() ||
self.max_depth.map_or(false, |max_depth| {
entry.depth() >= max_depth
});
if should_skip_child_entries {
if (self.callback)(entry).is_quit() {
self.quit_now();
return
}
self.completed_work();
continue;
}
let readdir = match fs::read_dir(&entry.path) {
Ok(readdir) => readdir,
Err(_err) => {
self.completed_work();
continue;
}
};
let entry_index_path = entry.index_path.clone();
let mut child_entries: Vec<_> = readdir.filter_map(|entry_result| {
let entry = match entry_result {
Ok(entry) => entry,
Err(_err) => {
return None
}
};
let name = match entry.file_name().to_str() {
Some(name) => name.to_string(),
None => return None
};
let file_type = match entry.file_type() {
Ok(file_type) => file_type,
Err(_err) => {
return None
}
};
let path = entry.path();
let metadata = if self.include_metadata { entry.metadata().ok() } else { None };
if self.should_skip_entry(&name, file_type, &path, &metadata) {
return None
}
let mut index_path = Vec::with_capacity(entry_index_path.len() + 1);
index_path.extend_from_slice(&entry_index_path[..]);
Some(WalkEntry {
name,
path,
index_path,
file_type,
metadata,
child_count: 0
})
}).collect();
child_entries.sort_by(|a, b| {
alphanumeric_sort::compare_str(a.name(), b.name())
});
child_entries.iter_mut().enumerate().for_each(|(i, each)| {
each.index_path.push(i);
});
entry.child_count = child_entries.len();
match (self.callback)(entry) {
WalkState::Continue => (),
WalkState::Skip => {
self.completed_work();
continue;
}
WalkState::Quit => {
self.quit_now();
return
}
}
self.schedule_work(child_entries);
self.completed_work();
}
}
fn should_skip_entry(&self, name: &str, _file_type: FileType, _path: &Path, metadata: &Option<Metadata>) -> bool {
if self.ignore_hidden && name.chars().next() == Some('.') {
return true
}
if let (Some(metadata), Some(max_filesize)) = (metadata, self.max_filesize) {
if metadata.len() > max_filesize {
return true
}
}
false
}
fn schedule_work(&mut self, entries: Vec<WalkEntry>) {
let entries_count = entries.len();
self.active_work_count.fetch_add(entries_count, AtomicOrdering::SeqCst);
let mut rng = thread_rng();
for entry in entries {
let _ = rng.choose(&self.work_senders).unwrap().send(entry);
}
}
fn get_work(&mut self) -> Option<WalkEntry> {
loop {
if self.is_quit_now() {
return None
}
let mut work_queue = self.work_queue.lock().unwrap();
while let Ok(entry) = self.work_receiver.try_recv() {
work_queue.push(entry);
}
let next_entry = work_queue.pop();
match next_entry {
Some(entry) => {
return Some(entry)
},
None => {
if self.active_work_count() == 0 {
return None
} else {
if let Some(entries) = self.steal_work() {
for each in entries {
work_queue.push(each);
}
}
if work_queue.len() == 0 {
thread::yield_now();
}
}
}
}
}
}
fn steal_work(&self) -> Option<Vec<WalkEntry>> {
if self.steal_from_queues.len() == 0 {
return None
}
let steal_from = thread_rng().choose(&self.steal_from_queues).unwrap();
if let Ok(mut steal_from) = steal_from.try_lock() {
if steal_from.len() > 0 {
let half = steal_from.len() / 2;
let mut stolen_work = Vec::with_capacity(half);
let mut return_work = Vec::with_capacity(half);
for (i, each) in steal_from.drain().enumerate() {
if i % 2 == 0 {
stolen_work.push(each);
} else {
return_work.push(each);
}
}
for each in return_work {
steal_from.push(each);
}
return Some(stolen_work)
}
}
None
}
fn completed_work(&self) {
self.active_work_count.fetch_sub(1, AtomicOrdering::SeqCst);
}
fn quit_now(&self) {
self.quit_now.store(true, AtomicOrdering::SeqCst);
}
fn is_quit_now(&self) -> bool {
self.quit_now.load(AtomicOrdering::SeqCst)
}
fn active_work_count(&self) -> usize {
self.active_work_count.load(AtomicOrdering::SeqCst)
}
}
impl WalkEntry {
fn new(path: &Path) -> Option<WalkEntry> {
if let (Some(Some(name)), Ok(metadata)) = (path.file_name().map(|n| n.to_str()), fs::metadata(path)) {
Some(WalkEntry {
name: name.to_string(),
path: path.to_path_buf(),
index_path: vec![0],
child_count: 0,
file_type: metadata.file_type(),
metadata: Some(metadata),
})
} else {
None
}
}
pub fn is_dir(&self) -> bool {
self.file_type.is_dir()
}
pub fn is_file(&self) -> bool {
self.file_type.is_file()
}
pub fn is_symlink(&self) -> bool {
self.file_type.is_symlink()
}
pub fn name(&self) -> &str {
&self.name
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn depth(&self) -> usize {
self.index_path.len() - 1
}
pub fn metadata(&self) -> Option<&Metadata> {
match &self.metadata {
Some(metadata) => Some(metadata),
None => None
}
}
}
impl PartialEq for WalkEntry {
fn eq(&self, o: &Self) -> bool {
self.index_path.eq(&o.index_path)
}
}
impl Eq for WalkEntry {}
impl PartialOrd for WalkEntry {
fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
o.index_path.partial_cmp(&self.index_path)
}
}
impl Ord for WalkEntry {
fn cmp(&self, o: &Self) -> Ordering {
o.index_path.cmp(&self.index_path)
}
}
impl NextEntry {
fn new() -> NextEntry {
NextEntry {
index_path: vec![0],
remaining_siblings: vec![1],
}
}
fn matches(&self, entry: Option<&WalkEntry>) -> bool {
entry.map_or(false, |e| { e.index_path == self.index_path })
}
fn advance_past(&mut self, entry: &WalkEntry) {
// Decrement remaining siblings at this level
*self.remaining_siblings.last_mut().unwrap() -= 1;
if entry.child_count > 0 {
// If visited item has children then push 0 index path, since we are now
// looking for the first child.
self.index_path.push(0);
self.remaining_siblings.push(entry.child_count);
} else {
// Incrememnt sibling index
*self.index_path.last_mut().unwrap() += 1;
// If no siblings remain at this level unwind stacks
while !self.remaining_siblings.is_empty() && *self.remaining_siblings.last().unwrap() == 0 {
self.index_path.pop();
self.remaining_siblings.pop();
// Finished processing level, so increment sibling index
if !self.index_path.is_empty() {
*self.index_path.last_mut().unwrap() += 1;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment