Created
September 14, 2019 16:59
-
-
Save jedie/0e20d8d8584f38ba74f54cb08e4a4bfb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import random | |
import threading | |
import time | |
# https://github.com/peter-wangxu/persist-queue | |
from pathlib import Path | |
import persistqueue | |
log = logging.getLogger() | |
def scandir_walk(top, skip_dirs=(), on_skip=None): | |
try: | |
dir_entry_iterator = os.scandir(top) | |
except PermissionError as err: | |
log.error('scandir error: %s', err) | |
return | |
for entry in dir_entry_iterator: | |
if entry.is_dir(follow_symlinks=False): | |
if entry.name in skip_dirs: | |
on_skip(entry) | |
else: | |
yield from scandir_walk(entry.path, skip_dirs, on_skip) | |
else: | |
yield entry | |
class ThreadedFilesystemWalker: | |
next_update = None | |
total_count = 0 | |
processed_count = 0 | |
lock = threading.Lock() | |
def __init__(self, *, top_path, worker_count, skip_dirs=(), update_interval_sec=3): | |
self.top_path = Path(top_path) | |
assert self.top_path.is_dir(), f'Given path {top_path} is not a existing directory!' | |
self.worker_count = worker_count | |
self.skip_dirs = skip_dirs | |
self.update_interval_sec = update_interval_sec | |
self.queue = persistqueue.SQLiteAckQueue(path='./test', multithreading=True) | |
def _set_next_update(self): | |
self.next_update = time.time() + self.update_interval_sec | |
def process_path_item(self, path): | |
raise NotImplemented | |
def on_skip(self, dir_entry): | |
log.debug(f'Skip: {dir_entry}') | |
def _worker(self): | |
while True: | |
self.process_path_item(path=self.queue.get()) | |
with self.lock: | |
self.processed_count += 1 | |
if time.time() > self.next_update: | |
percent = (self.processed_count * 100) / self.total_count | |
print(f'{self.processed_count}/{self.total_count} - {percent:.2f} percent done.') | |
self._set_next_update() | |
def start(self): | |
self._set_next_update() | |
for i in range(self.worker_count): | |
t = threading.Thread(target=self._worker) | |
t.daemon = True | |
t.start() | |
for dir_entry in scandir_walk(top=self.top_path, skip_dirs=self.skip_dirs, on_skip=self.on_skip): | |
self.queue.put(dir_entry.path) | |
with self.lock: | |
self.total_count += 1 | |
print(f'readed {self.total_count} done!') | |
if __name__ == '__main__': | |
# Example usage: | |
logging.basicConfig(level=logging.DEBUG) | |
class ExampleThreadedFilesystemWalker(ThreadedFilesystemWalker): | |
def process_path_item(self, path): | |
# so something with fs_item... | |
fs_item = Path(path) | |
# if fs_item.is_dir(): | |
# print(f'<dir> {fs_item}') | |
# elif fs_item.is_file(): | |
# print(f'<file> {fs_item} size: {fs_item.stat().st_size}') | |
# else: | |
# print(f'<???> {fs_item}') | |
time.sleep(random.random()) # 'simulating' a process time | |
walker = ExampleThreadedFilesystemWalker( | |
top_path=Path('~').expanduser(), | |
worker_count=3, | |
skip_dirs=( | |
'.config', | |
'.local' | |
), | |
update_interval_sec=1 | |
) | |
walker.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment