Last active
October 25, 2021 22:38
-
-
Save Lokno/44cf9aeda36ccaadbb2e1c586cacdac1 to your computer and use it in GitHub Desktop.
Lightweight file archiver. Store and compare files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Archiver | |
# | |
# Lightweight file archiver. Store and compare files. | |
# File names, include their extensions, are used identification, not the full file path. | |
# | |
# Default archive directory in 'default_directory' variable below. To change the | |
# archive directory, edit archiver_config.ini in your home directory. | |
# | |
# archiver.py [-h] [--diff] [--list] [--meta] [--remove] [--file_name [FILE_NAME]] [--modified [MODIFIED]] file_path | |
# | |
# Behavior: | |
# No Flags: Adds the file at file_path to the archive. The file is only physically | |
# moved to the archive if it has a unique MD5 hash. File is also not added | |
# to the archive if an existing entry exists in the archive with the same | |
# file name and modification date. | |
# May be simply a name of an archived file when used with --list, --remove, or --meta | |
# --diff : Checks if the file matches MD5 hash with the archived file with the most | |
# recent modification date. It also lists all of the modification date for | |
# all of entries with the same MD5 hash. | |
# --list : Lists the modification date and MD5 hash for each entry in the archive | |
# for the name of the file in the given file_path | |
# If used with 'ALL' entered for the file_path, the script will list all | |
# of the unique files in the archive, along with the data associated with | |
# the most recent entry corresponding to each file. | |
# --meta : When used with no other flags, this means the user has additional meta | |
# data they wish to associate with file. When used with the --list flag | |
# all data associated with a file will be printed | |
# --query : Can be used with the --list or --remove option to add additional search criteria. | |
# Results must match all the entered fields exactly. | |
# --remove : Remove all entries that for this file path | |
# --file_name : File name to use instead of actual file name for comparison and storage | |
# --modified : Override modification time stamp of actual file with entered value | |
import sys, os | |
import re | |
import datetime | |
import argparse | |
import hashlib | |
from pathlib import Path | |
from tinydb import TinyDB, Query | |
import configparser | |
import shutil | |
from socket import gethostname | |
default_directory = '~/.archiver' | |
int_re = re.compile('^[0-9]+$') | |
flt_re = re.compile('^[0-9]*.[0-9]+$') | |
def PathExpanded(str_path): | |
return Path(os.path.expandvars(os.path.expanduser(str_path))) | |
def get_valid_file_path(file_path_str): | |
file_path = Path(file_path_str) | |
if not file_path.exists(): | |
print(f'ERROR: File "{file_path_str:s}" does not exist.') | |
sys.exit(-1) | |
elif not file_path.is_file(): | |
print(f'ERROR: "{file_path_str:s}" is not a file.') | |
sys.exit(-1) | |
return file_path | |
def get_file_data(file_path): | |
with file_path.open('rb') as f: | |
md5hash = hashlib.md5(f.read()).hexdigest() | |
return { 'path' : str(file_path.parent.absolute()), | |
'file_name' : file_path.name, | |
'size' : file_path.stat().st_size, | |
'modified' : file_path.stat().st_mtime, | |
'owner' : file_path.owner(), | |
'md5_hash' : md5hash, | |
'host_name' : gethostname() } | |
def prompt_for_meta_data(): | |
meta_data = {} | |
while True: | |
attribute_name = input('Enter the name of a field (or type END to stop): ') | |
while attribute_name.upper() != 'END': | |
attribute_value = input(f'Enter value of {attribute_name:s} (or REMOVE to remove attribute): ') | |
if attribute_value.upper() == 'REMOVE' and attribute_name.lower() in meta_data: | |
del meta_data[attribute_name.lower()] | |
else: | |
if int_re.match(attribute_value): | |
attribute_value = int(attribute_value) | |
elif flt_re.match(attribute_value): | |
attribute_value = float(attribute_value) | |
meta_data[attribute_name.lower()] = attribute_value | |
attribute_name = input('Enter the name of a field (or type END to stop): ') | |
print("You've entered the following:\n\n" + '\n'.join([f'{n:s} : {str(v):s}' for n,v in meta_data.items()]) + '\n') | |
if input('Is this correct (Y/N)? ').upper() == 'Y': | |
break | |
return meta_data | |
def get_archive_file_name(archive_directory,file_path,file_data): | |
archive_file_name = file_data['md5_hash'] + ''.join(file_path.suffixes) | |
suffix_pos = file_path.stem.find('.') | |
if suffix_pos > 0: | |
archive_file_name = file_path.stem[:file_path.stem.find('.')] + '_' + archive_file_name | |
else: | |
archive_file_name = file_path.stem + '_' + archive_file_name | |
return archive_directory.joinpath(archive_file_name) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Lightweight file archiver. Store and compare files.') | |
parser.add_argument("file_path", help="Path to a file. Will be archived by filename, not by unique path.") | |
parser.add_argument("--diff", dest='diff', action='store_true', help="Compare to files in archive.") | |
parser.add_argument("--list", dest='list', action='store_true', help="List the MD5 hash and modification date from the archived entries for this file.") | |
parser.add_argument("--meta", dest='meta', action='store_true', help="User wishes to add meta data to new entry (not valid with --diff).") | |
parser.add_argument("--remove", dest='remove', action='store_true', help="Remove a file and all its entries from the archive (not valid with --diff, --list, --meta).") | |
parser.add_argument('--query', dest='query', action='store_true', help='User wishes to add restrictions to the search query (used with --list)') | |
parser.add_argument('--file_name', dest='file_name', nargs='?', type=str, help='Override for file name') | |
parser.add_argument('--modified', dest='modified', nargs='?', type=float, help='Override for modification date (seconds)') | |
args = parser.parse_args() | |
config = configparser.ConfigParser() | |
config_path = Path(os.path.expanduser("~/archiver_config.ini")) | |
if not config_path.exists(): | |
archive_directory = default_directory | |
config['SETTINGS'] = {'archive_directory': default_directory} | |
with config_path.open('w') as configfile: | |
config.write(configfile) | |
else: | |
config.read(str(config_path)) | |
if 'SETTINGS' in config and 'archive_directory' in config['SETTINGS']: | |
archive_directory = config['SETTINGS']['archive_directory'] | |
else: | |
sys.stderr.write(f'ERROR: Archive Directory Not Found in {str(config_path):s}. Adding default "{default_directory:s}..."\n') | |
archive_directory = default_directory | |
config['SETTINGS']['archive_directory'] = archive_directory | |
with open(str(config_path), 'w') as configfile: | |
config.write(configfile) | |
archive_directory = PathExpanded(archive_directory) | |
if not archive_directory.exists(): | |
sys.stderr.write(f'ERROR: Archive Directory "{str(archive_directory):s}" does not exist. Check config file "{str(config_path):s}"...') | |
sys.exit(-1) | |
if not archive_directory.is_dir(): | |
sys.stderr.write(f'ERROR: Archive Directory "{str(archive_directory):s}" is not a directory. Check config file "{str(config_path):s}"...') | |
sys.exit(-1) | |
if args.remove and (args.diff or args.meta or args.list): | |
sys.stderr.write(f'Warning: --diff, --list, and --meta cannot be combined with --removed. Ignoring...\n') | |
args.list = False | |
args.meta = False | |
args.diff = False | |
if args.modified is not None and (args.diff or args.list or args.remove): | |
sys.stderr.write(f'Warning: --modified cannot be combined with --diff, --list, --remove. Ignoring...\n') | |
args.modified = False | |
if args.diff and args.list: | |
sys.stderr.write(f'Warning: --diff and --list cannot be combined. Ignoring "--list"') | |
args.list = False | |
if args.query and (not args.list and not args.remove): | |
sys.stderr.write(f'Warning: the flag --query is meaningless without the flag --list or --remove. Ignoring...') | |
args.query = False | |
# only validate file if we need a reference | |
add_or_mod = not any([args.list,args.remove,args.diff]) | |
data = {} | |
file_path_exists = Path(args.file_path).exists() | |
if (not args.list and not args.remove) or args.diff or (add_or_mod and file_path_exists): | |
print('Calculating MD5 hash...') | |
file_path = get_valid_file_path(args.file_path) | |
data = get_file_data(file_path) | |
print(data['md5_hash']) | |
if args.modified is not None: | |
data['modified'] = args.modified | |
elif args.file_path.upper() == "ALL": | |
data = {'file_name' : 'ALL'} | |
else: | |
file_path = Path(args.file_path) | |
data['file_name'] = file_path.name | |
if args.file_name is not None: | |
data['file_name'] = args.file_name | |
if args.diff: | |
# DIFF ARCHIVE AGAINST FILE | |
db = TinyDB(archive_directory.joinpath('archiver.json')) | |
File = Query() | |
results = db.search(File.file_name == data['file_name']) | |
results = sorted(results, key=lambda entry: entry['modified'], reverse=True) | |
readable = datetime.datetime.fromtimestamp(data['modified']).isoformat() | |
print(f'File Modified: {readable:s}') | |
if len(results) > 0: | |
readable = datetime.datetime.fromtimestamp(results[0]['modified']).isoformat() | |
if results[0]['md5_hash'] != data['md5_hash']: | |
print(f'File does not match last archived version ({readable:s})') | |
archived_file_path = archive_directory.joinpath(results[0]["archive_file"]) | |
print(f'Archived file: {str(archived_file_path):s}') | |
elif results[0]['modified'] == data['modified']: | |
print(f'File is unchanged from last archived version ({readable:s})') | |
else: | |
print(f'File is identical to last archived version ({readable:s})') | |
print('Matching Entries:') | |
for entry in results: | |
if entry['md5_hash'] == data['md5_hash']: | |
readable = datetime.datetime.fromtimestamp(entry['modified']).isoformat() | |
print(f'{readable:s} {entry["md5_hash"]:s} {entry["size"]:d} {entry["host_name"]:s}') | |
if args.meta: | |
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','file_name']])) | |
elif args.list: | |
# LIST ENTRIES ASSOCIATED WITH FILE NAME | |
db = TinyDB(archive_directory.joinpath('archiver.json')) | |
File = Query() | |
if data['file_name'] != 'ALL': | |
if args.query: | |
print("Please enter field-value pairs for search query:") | |
query_data = prompt_for_meta_data() | |
query_data['file_name'] = data['file_name'] | |
results = db.search(Query().fragment(query_data)) | |
else: | |
results = db.search(File.file_name == data['file_name']) | |
results = sorted(results, key=lambda entry: entry['modified'], reverse=True) | |
if len(results) > 0: | |
for entry in results: | |
readable = datetime.datetime.fromtimestamp(entry["modified"]).isoformat() | |
print(f'{readable:s} {entry["md5_hash"]:s} {entry["size"]:d} {entry["host_name"]:s}') | |
if args.meta: | |
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','file_name']])) | |
else: | |
print('No entries found.') | |
else: | |
# list all unique file names | |
results = db.all() | |
if args.query: | |
print("Please enter field-value pairs for search query:") | |
query_data = prompt_for_meta_data() | |
results = db.search(Query().fragment(query_data)) | |
else: | |
results = db.all() | |
results = sorted(results, key=lambda entry: entry['modified'], reverse=True) | |
unique_files = set() | |
if len(results) > 0: | |
for entry in results: | |
if entry['file_name'] not in unique_files: | |
unique_files.add(entry['file_name']) | |
readable = datetime.datetime.fromtimestamp(entry["modified"]).isoformat() | |
print(f'{entry["file_name"]:s} {readable:s} {entry["md5_hash"]:s} {entry["size"]:d}') | |
if args.meta: | |
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','modified','file_name']])) | |
else: | |
print('No entries found.') | |
elif args.remove: | |
# REMOVE ENTRIES ASSOCIATED WITH FILE NAME | |
db = TinyDB(archive_directory.joinpath('archiver.json')) | |
File = Query() | |
if args.query: | |
print("Please enter field-value pairs for search query for entries to remove:") | |
query_data = prompt_for_meta_data() | |
results = db.search(Query().fragment(query_data)) | |
else: | |
results = db.search(File.file_name == data['file_name']) | |
num_results = len(results) | |
archive_files = set() | |
if num_results > 0: | |
if input(f'{num_results:d} files(s) will be deleted. Proceed (Y/N)? ').upper() == 'Y': | |
# Store unique archive files from entries being removed | |
for result in results: | |
if 'archive_file' in result: | |
archive_files.add(result['archive_file']) | |
# Remove all entries | |
if query_data is not None: | |
db.remove(Query().fragment(query_data)) | |
else: | |
db.remove(File.file_name == data['file_name']) | |
# Remove archive files no longer referenced | |
for archive_file in archive_files: | |
results = db.search(File.archive_file == archive_file) | |
if len(results) == 0: | |
print(f'Removing archive file {archive_file:s}...') | |
Path(archive_file).unlink(missing_ok=True) | |
else: | |
print('No entries found.') | |
elif data['file_name'] != 'ALL' and not file_path_exists and args.meta: | |
# MODIFY AN EXISTING ENTRY | |
if input(f'The file does not exist. Do you wish to modify an existing entry (Y/N)? ').upper() == 'Y': | |
time_stamp = input('Please enter the time stamp of the entry you would like to modify (or QUIT): ') | |
while time_stamp != 'QUIT': | |
entry_found = True | |
try: | |
if not flt_re.match(time_stamp): | |
time_stamp = datetime.datetime.fromisoformat(time_stamp).timestamp() | |
else: | |
time_stamp = float(time_stamp) | |
except: | |
print('Invalid Time Stamp.') | |
entry_found = False | |
if entry_found: | |
db = TinyDB(archive_directory.joinpath('archiver.json')) | |
File = Query() | |
results = db.search((File.file_name == data['file_name']) & (File.modified == time_stamp)) | |
if len(results) > 0: | |
result_data = results[0] | |
print('Existing data:') | |
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in result_data.items() if n not in ['file_name','modified']])) | |
print('Enter any additional data or update existing entries.') | |
meta_data = prompt_for_meta_data() | |
db.update( meta_data, (File.file_name == data['file_name']) & (File.modified == result_data['modified']) ) | |
break | |
else: | |
print('No matching entry found.') | |
entry_found = False | |
if not entry_found: | |
time_stamp = input('Please enter the time stamp of the entry you would like to modify (or QUIT): ') | |
else: | |
# ADD A NEW ENTRY | |
# Check for existing version with the same modification date | |
# Add to archive if genuinely new file. | |
db = TinyDB(archive_directory.joinpath('archiver.json')) | |
File = Query() | |
results = db.search((File.file_name == data['file_name']) & (File.modified == data['modified'])) | |
if len(results) > 0: | |
print("File already exists in archive with the same name and modification date.") | |
print("File will not be archived. Instead we will compare your file to the existing archived file.") | |
print( len(results) ) | |
refdata = results[0] | |
if refdata['size'] != data['size']: | |
sys.stderr.write(f' Warning: Archived file size mismatch (Archived {refsize["size"]:d} bytes, New: {data["size"]:d} bytes.') | |
elif refdata['md5_hash'] != data['md5_hash']: | |
sys.stderr.write(f' Warning: MD5 Hash of archived file does not match {str(file_path):s}.') | |
else: | |
print(' Files are identical') | |
if args.meta: | |
print('Existing data:') | |
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in results[0].items() if n not in ['file_name','md5_hash']])) | |
print('Enter any additional data or update existing entries.') | |
meta_data = prompt_for_meta_data() | |
db.update( meta_data, (File.file_name == data['file_name']) & (File.modified == data['modified']) ) | |
else: | |
if args.meta: | |
data.update(prompt_for_meta_data()) | |
archive_file = get_archive_file_name(archive_directory,Path(data['file_name']),data) | |
if archive_file.exists(): | |
print('Identical file in archive directory. File matches a previous version.') | |
else: | |
print(f'Copying file to archive as {archive_file.name:s}') | |
shutil.copy(str(file_path), str(archive_file)) | |
print('Recording new archive entry...') | |
data['archive_file'] = str(archive_file) | |
db.insert(data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment