Skip to content

Instantly share code, notes, and snippets.

@FreddieLindsey
Last active April 13, 2024 10:38
Show Gist options
  • Save FreddieLindsey/a1d6342886ec42018db704b9b2346914 to your computer and use it in GitHub Desktop.
Save FreddieLindsey/a1d6342886ec42018db704b9b2346914 to your computer and use it in GitHub Desktop.
Outputs a CSV of file information for a provided list of directories
#!/usr/bin/env python3
import argparse
import csv
import hashlib
import tqdm
from pathlib import Path
from threading import Thread, Lock
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
# BUF_SIZE is totally arbitrary, change for your app!
BUF_SIZE = 65536 # lets read stuff in 64kb chunks!
parser = argparse.ArgumentParser(
description="Outputs information about files in directories"
)
parser.add_argument(
"directories",
metavar="directories",
type=str,
nargs="+",
help="Directories to scan",
)
parser.add_argument("--output-file", nargs=1, required=True)
parser.add_argument("--output-format", nargs="?", default="csv", choices=["csv"])
parser.add_argument("--include-hash", action="store_true", default=argparse.SUPPRESS)
parser.add_argument("--parallelism", default=16, type=int)
args = parser.parse_args()
include_hash = hasattr(args, "include_hash")
csv_fieldnames = [
"file_path",
"file_ext",
"file_mode",
"file_uid",
"file_gid",
"file_access_time",
"file_modification_time",
"file_created_time",
"file_size_bytes",
"file_md5",
"file_sha1",
]
csv_writer = None
csv_writer_lock = Lock()
progress_bar = None
def process_file(root_path: Path, dir_path_str: str, dir_file_str: str):
if csv_writer is None:
raise Exception("CSV writer has not been initialised")
path_f_str = dir_path_str / dir_file_str
path_f = Path(path_f_str)
path_stat = path_f.stat()
file_data = {
"file_path": path_f.relative_to(root_path),
"file_ext": path_f.suffix,
"file_mode": path_stat.st_mode,
"file_uid": path_stat.st_uid,
"file_gid": path_stat.st_gid,
"file_access_time": path_stat.st_atime,
"file_modification_time": path_stat.st_mtime,
"file_created_time": path_stat.st_ctime,
"file_size_bytes": path_stat.st_size,
"file_md5": None,
"file_sha1": None,
}
if include_hash:
with path_f.open("rb") as fb:
file_data["file_md5"] = hashlib.file_digest(fb, "md5").hexdigest()
file_data["file_sha1"] = hashlib.file_digest(fb, "sha1").hexdigest()
with csv_writer_lock:
csv_writer.writerow(file_data)
progress_bar.update()
def main():
with open(args.output_file[0], "w", newline="", encoding="utf-8") as out_f:
global csv_writer
csv_writer = csv.DictWriter(
out_f, fieldnames=csv_fieldnames, dialect=csv.unix_dialect
)
csv_writer.writeheader()
print("Gathering files to process...")
processing_queue = []
for d in args.directories:
path_d = Path(d)
for dirpath, dirnames, filenames in path_d.walk(top_down=True):
for dir_file in filenames:
processing_queue.append((path_d, dirpath, dir_file))
dirnames.sort()
print("Getting file info...")
with tqdm.tqdm(
total=len(processing_queue),
smoothing=0,
unit=" files",
) as t:
global progress_bar
progress_bar = t
with ThreadPoolExecutor(max_workers=args.parallelism) as pooled_executor:
for i in processing_queue:
pooled_executor.submit(process_file, *i)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment