Created
June 4, 2014 19:56
-
-
Save zyga/0b877203778e0d1a59a0 to your computer and use it in GitHub Desktop.
Directory scanner / hash collector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Copyright (c) Zygmunt Krynicki | |
# All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions | |
# are met: | |
# 1. Redistributions of source code must retain the above copyright | |
# notice, this list of conditions and the following disclaimer. | |
# 2. Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# 3. Neither the name of the University nor the names of its contributors | |
# may be used to endorse or promote products derived from this software | |
# without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND | |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE | |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
# SUCH DAMAGE. | |
import argparse | |
import hashlib | |
import logging | |
import os | |
def get_hash_of_file(filename, hash_func, chunk_size=2 << 18): | |
""" | |
Compute selected hash of a specified file | |
:param filename: | |
name of the file to hash | |
:param hash_func: | |
name of the hash function applicable to hashlib.new | |
(see hashlib.algorithms_available) | |
:param chunk_size: | |
size of the bucket used to read the file | |
:returns: | |
hexadecimal digest of the file | |
""" | |
logging.debug("Hashing %r with %r", filename, hash_func) | |
m = hashlib.new(hash_func) | |
buf = bytearray(chunk_size) | |
with open(filename, 'rb') as stream: | |
while True: | |
num_read = stream.readinto(buf) | |
if num_read == 0: | |
break | |
m.update(buf) | |
logging.debug("Done hashing %r %s", filename, m.hexdigest()) | |
return m.hexdigest() | |
class Scanner: | |
""" | |
Scanner for computing and storing digests of a number of files | |
Files scanned with :meth:`scan_file()` or :meth:`scan_dir()` are collected | |
in the seen_hashes instance attribute. New hashes (seen for the first time) | |
are reported via the new_hash_cb callback. | |
""" | |
def __init__(self, hash_func, chunk_size, new_hash_cb=None): | |
self.seen_hashes = set() | |
self.hash_func = hash_func | |
self.chunk_size = chunk_size | |
self.new_hash_cb = new_hash_cb | |
def scan_dir(self, root): | |
""" | |
Scan a directory recursively and record hashes of all the files there | |
""" | |
for dirpath, dirnames, filenames in os.walk(root): | |
for filename in filenames: | |
full_filename = os.path.join(dirpath, filename) | |
if not os.path.islink(full_filename): | |
self.scan_file(full_filename) | |
def scan_file(self, filename): | |
""" | |
Scan a single file and record the hash | |
""" | |
try: | |
hash = get_hash_of_file(filename, self.hash_func, self.chunk_size) | |
except FileNotFoundError as exc: | |
logging.warning("File not found %r: %r", filename, exc) | |
else: | |
if self.new_hash_cb is not None: | |
if hash not in self.seen_hashes: | |
self.new_hash_cb(hash) | |
self.seen_hashes.add(hash) | |
def main(): | |
logging.basicConfig(level=logging.INFO) | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--hash-func', action='store', | |
choices=sorted(hashlib.algorithms_available), default='sha1') | |
parser.add_argument( | |
'--chunk-size', type=int, action='store', default=2 << 18) | |
parser.add_argument('dir_list', action='store', metavar='DIR', nargs='+') | |
ns = parser.parse_args() | |
scanner = Scanner(ns.hash_func, ns.chunk_size, print) | |
for dirname in ns.dir_list: | |
scanner.scan_dir(dirname) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment