Skip to content

Instantly share code, notes, and snippets.

@jedie
Created September 14, 2019 16:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jedie/0e20d8d8584f38ba74f54cb08e4a4bfb to your computer and use it in GitHub Desktop.
Save jedie/0e20d8d8584f38ba74f54cb08e4a4bfb to your computer and use it in GitHub Desktop.
import logging
import os
import random
import threading
import time
# https://github.com/peter-wangxu/persist-queue
from pathlib import Path
import persistqueue
log = logging.getLogger()
def scandir_walk(top, skip_dirs=(), on_skip=None):
try:
dir_entry_iterator = os.scandir(top)
except PermissionError as err:
log.error('scandir error: %s', err)
return
for entry in dir_entry_iterator:
if entry.is_dir(follow_symlinks=False):
if entry.name in skip_dirs:
on_skip(entry)
else:
yield from scandir_walk(entry.path, skip_dirs, on_skip)
else:
yield entry
class ThreadedFilesystemWalker:
next_update = None
total_count = 0
processed_count = 0
lock = threading.Lock()
def __init__(self, *, top_path, worker_count, skip_dirs=(), update_interval_sec=3):
self.top_path = Path(top_path)
assert self.top_path.is_dir(), f'Given path {top_path} is not a existing directory!'
self.worker_count = worker_count
self.skip_dirs = skip_dirs
self.update_interval_sec = update_interval_sec
self.queue = persistqueue.SQLiteAckQueue(path='./test', multithreading=True)
def _set_next_update(self):
self.next_update = time.time() + self.update_interval_sec
def process_path_item(self, path):
raise NotImplemented
def on_skip(self, dir_entry):
log.debug(f'Skip: {dir_entry}')
def _worker(self):
while True:
self.process_path_item(path=self.queue.get())
with self.lock:
self.processed_count += 1
if time.time() > self.next_update:
percent = (self.processed_count * 100) / self.total_count
print(f'{self.processed_count}/{self.total_count} - {percent:.2f} percent done.')
self._set_next_update()
def start(self):
self._set_next_update()
for i in range(self.worker_count):
t = threading.Thread(target=self._worker)
t.daemon = True
t.start()
for dir_entry in scandir_walk(top=self.top_path, skip_dirs=self.skip_dirs, on_skip=self.on_skip):
self.queue.put(dir_entry.path)
with self.lock:
self.total_count += 1
print(f'readed {self.total_count} done!')
if __name__ == '__main__':
# Example usage:
logging.basicConfig(level=logging.DEBUG)
class ExampleThreadedFilesystemWalker(ThreadedFilesystemWalker):
def process_path_item(self, path):
# so something with fs_item...
fs_item = Path(path)
# if fs_item.is_dir():
# print(f'<dir> {fs_item}')
# elif fs_item.is_file():
# print(f'<file> {fs_item} size: {fs_item.stat().st_size}')
# else:
# print(f'<???> {fs_item}')
time.sleep(random.random()) # 'simulating' a process time
walker = ExampleThreadedFilesystemWalker(
top_path=Path('~').expanduser(),
worker_count=3,
skip_dirs=(
'.config',
'.local'
),
update_interval_sec=1
)
walker.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment