Skip to content

Instantly share code, notes, and snippets.

@Lokno
Last active October 25, 2021 22:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lokno/44cf9aeda36ccaadbb2e1c586cacdac1 to your computer and use it in GitHub Desktop.
Save Lokno/44cf9aeda36ccaadbb2e1c586cacdac1 to your computer and use it in GitHub Desktop.
Lightweight file archiver. Store and compare files.
# Archiver
#
# Lightweight file archiver. Store and compare files.
# File names, include their extensions, are used identification, not the full file path.
#
# Default archive directory in 'default_directory' variable below. To change the
# archive directory, edit archiver_config.ini in your home directory.
#
# archiver.py [-h] [--diff] [--list] [--meta] [--remove] [--file_name [FILE_NAME]] [--modified [MODIFIED]] file_path
#
# Behavior:
# No Flags: Adds the file at file_path to the archive. The file is only physically
# moved to the archive if it has a unique MD5 hash. File is also not added
# to the archive if an existing entry exists in the archive with the same
# file name and modification date.
# May be simply a name of an archived file when used with --list, --remove, or --meta
# --diff : Checks if the file matches MD5 hash with the archived file with the most
# recent modification date. It also lists all of the modification date for
# all of entries with the same MD5 hash.
# --list : Lists the modification date and MD5 hash for each entry in the archive
# for the name of the file in the given file_path
# If used with 'ALL' entered for the file_path, the script will list all
# of the unique files in the archive, along with the data associated with
# the most recent entry corresponding to each file.
# --meta : When used with no other flags, this means the user has additional meta
# data they wish to associate with file. When used with the --list flag
# all data associated with a file will be printed
# --query : Can be used with the --list or --remove option to add additional search criteria.
# Results must match all the entered fields exactly.
# --remove : Remove all entries that for this file path
# --file_name : File name to use instead of actual file name for comparison and storage
# --modified : Override modification time stamp of actual file with entered value
import sys, os
import re
import datetime
import argparse
import hashlib
from pathlib import Path
from tinydb import TinyDB, Query
import configparser
import shutil
from socket import gethostname
default_directory = '~/.archiver'
int_re = re.compile('^[0-9]+$')
flt_re = re.compile('^[0-9]*.[0-9]+$')
def PathExpanded(str_path):
return Path(os.path.expandvars(os.path.expanduser(str_path)))
def get_valid_file_path(file_path_str):
file_path = Path(file_path_str)
if not file_path.exists():
print(f'ERROR: File "{file_path_str:s}" does not exist.')
sys.exit(-1)
elif not file_path.is_file():
print(f'ERROR: "{file_path_str:s}" is not a file.')
sys.exit(-1)
return file_path
def get_file_data(file_path):
with file_path.open('rb') as f:
md5hash = hashlib.md5(f.read()).hexdigest()
return { 'path' : str(file_path.parent.absolute()),
'file_name' : file_path.name,
'size' : file_path.stat().st_size,
'modified' : file_path.stat().st_mtime,
'owner' : file_path.owner(),
'md5_hash' : md5hash,
'host_name' : gethostname() }
def prompt_for_meta_data():
meta_data = {}
while True:
attribute_name = input('Enter the name of a field (or type END to stop): ')
while attribute_name.upper() != 'END':
attribute_value = input(f'Enter value of {attribute_name:s} (or REMOVE to remove attribute): ')
if attribute_value.upper() == 'REMOVE' and attribute_name.lower() in meta_data:
del meta_data[attribute_name.lower()]
else:
if int_re.match(attribute_value):
attribute_value = int(attribute_value)
elif flt_re.match(attribute_value):
attribute_value = float(attribute_value)
meta_data[attribute_name.lower()] = attribute_value
attribute_name = input('Enter the name of a field (or type END to stop): ')
print("You've entered the following:\n\n" + '\n'.join([f'{n:s} : {str(v):s}' for n,v in meta_data.items()]) + '\n')
if input('Is this correct (Y/N)? ').upper() == 'Y':
break
return meta_data
def get_archive_file_name(archive_directory,file_path,file_data):
archive_file_name = file_data['md5_hash'] + ''.join(file_path.suffixes)
suffix_pos = file_path.stem.find('.')
if suffix_pos > 0:
archive_file_name = file_path.stem[:file_path.stem.find('.')] + '_' + archive_file_name
else:
archive_file_name = file_path.stem + '_' + archive_file_name
return archive_directory.joinpath(archive_file_name)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Lightweight file archiver. Store and compare files.')
parser.add_argument("file_path", help="Path to a file. Will be archived by filename, not by unique path.")
parser.add_argument("--diff", dest='diff', action='store_true', help="Compare to files in archive.")
parser.add_argument("--list", dest='list', action='store_true', help="List the MD5 hash and modification date from the archived entries for this file.")
parser.add_argument("--meta", dest='meta', action='store_true', help="User wishes to add meta data to new entry (not valid with --diff).")
parser.add_argument("--remove", dest='remove', action='store_true', help="Remove a file and all its entries from the archive (not valid with --diff, --list, --meta).")
parser.add_argument('--query', dest='query', action='store_true', help='User wishes to add restrictions to the search query (used with --list)')
parser.add_argument('--file_name', dest='file_name', nargs='?', type=str, help='Override for file name')
parser.add_argument('--modified', dest='modified', nargs='?', type=float, help='Override for modification date (seconds)')
args = parser.parse_args()
config = configparser.ConfigParser()
config_path = Path(os.path.expanduser("~/archiver_config.ini"))
if not config_path.exists():
archive_directory = default_directory
config['SETTINGS'] = {'archive_directory': default_directory}
with config_path.open('w') as configfile:
config.write(configfile)
else:
config.read(str(config_path))
if 'SETTINGS' in config and 'archive_directory' in config['SETTINGS']:
archive_directory = config['SETTINGS']['archive_directory']
else:
sys.stderr.write(f'ERROR: Archive Directory Not Found in {str(config_path):s}. Adding default "{default_directory:s}..."\n')
archive_directory = default_directory
config['SETTINGS']['archive_directory'] = archive_directory
with open(str(config_path), 'w') as configfile:
config.write(configfile)
archive_directory = PathExpanded(archive_directory)
if not archive_directory.exists():
sys.stderr.write(f'ERROR: Archive Directory "{str(archive_directory):s}" does not exist. Check config file "{str(config_path):s}"...')
sys.exit(-1)
if not archive_directory.is_dir():
sys.stderr.write(f'ERROR: Archive Directory "{str(archive_directory):s}" is not a directory. Check config file "{str(config_path):s}"...')
sys.exit(-1)
if args.remove and (args.diff or args.meta or args.list):
sys.stderr.write(f'Warning: --diff, --list, and --meta cannot be combined with --removed. Ignoring...\n')
args.list = False
args.meta = False
args.diff = False
if args.modified is not None and (args.diff or args.list or args.remove):
sys.stderr.write(f'Warning: --modified cannot be combined with --diff, --list, --remove. Ignoring...\n')
args.modified = False
if args.diff and args.list:
sys.stderr.write(f'Warning: --diff and --list cannot be combined. Ignoring "--list"')
args.list = False
if args.query and (not args.list and not args.remove):
sys.stderr.write(f'Warning: the flag --query is meaningless without the flag --list or --remove. Ignoring...')
args.query = False
# only validate file if we need a reference
add_or_mod = not any([args.list,args.remove,args.diff])
data = {}
file_path_exists = Path(args.file_path).exists()
if (not args.list and not args.remove) or args.diff or (add_or_mod and file_path_exists):
print('Calculating MD5 hash...')
file_path = get_valid_file_path(args.file_path)
data = get_file_data(file_path)
print(data['md5_hash'])
if args.modified is not None:
data['modified'] = args.modified
elif args.file_path.upper() == "ALL":
data = {'file_name' : 'ALL'}
else:
file_path = Path(args.file_path)
data['file_name'] = file_path.name
if args.file_name is not None:
data['file_name'] = args.file_name
if args.diff:
# DIFF ARCHIVE AGAINST FILE
db = TinyDB(archive_directory.joinpath('archiver.json'))
File = Query()
results = db.search(File.file_name == data['file_name'])
results = sorted(results, key=lambda entry: entry['modified'], reverse=True)
readable = datetime.datetime.fromtimestamp(data['modified']).isoformat()
print(f'File Modified: {readable:s}')
if len(results) > 0:
readable = datetime.datetime.fromtimestamp(results[0]['modified']).isoformat()
if results[0]['md5_hash'] != data['md5_hash']:
print(f'File does not match last archived version ({readable:s})')
archived_file_path = archive_directory.joinpath(results[0]["archive_file"])
print(f'Archived file: {str(archived_file_path):s}')
elif results[0]['modified'] == data['modified']:
print(f'File is unchanged from last archived version ({readable:s})')
else:
print(f'File is identical to last archived version ({readable:s})')
print('Matching Entries:')
for entry in results:
if entry['md5_hash'] == data['md5_hash']:
readable = datetime.datetime.fromtimestamp(entry['modified']).isoformat()
print(f'{readable:s} {entry["md5_hash"]:s} {entry["size"]:d} {entry["host_name"]:s}')
if args.meta:
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','file_name']]))
elif args.list:
# LIST ENTRIES ASSOCIATED WITH FILE NAME
db = TinyDB(archive_directory.joinpath('archiver.json'))
File = Query()
if data['file_name'] != 'ALL':
if args.query:
print("Please enter field-value pairs for search query:")
query_data = prompt_for_meta_data()
query_data['file_name'] = data['file_name']
results = db.search(Query().fragment(query_data))
else:
results = db.search(File.file_name == data['file_name'])
results = sorted(results, key=lambda entry: entry['modified'], reverse=True)
if len(results) > 0:
for entry in results:
readable = datetime.datetime.fromtimestamp(entry["modified"]).isoformat()
print(f'{readable:s} {entry["md5_hash"]:s} {entry["size"]:d} {entry["host_name"]:s}')
if args.meta:
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','file_name']]))
else:
print('No entries found.')
else:
# list all unique file names
results = db.all()
if args.query:
print("Please enter field-value pairs for search query:")
query_data = prompt_for_meta_data()
results = db.search(Query().fragment(query_data))
else:
results = db.all()
results = sorted(results, key=lambda entry: entry['modified'], reverse=True)
unique_files = set()
if len(results) > 0:
for entry in results:
if entry['file_name'] not in unique_files:
unique_files.add(entry['file_name'])
readable = datetime.datetime.fromtimestamp(entry["modified"]).isoformat()
print(f'{entry["file_name"]:s} {readable:s} {entry["md5_hash"]:s} {entry["size"]:d}')
if args.meta:
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in entry.items() if n not in ['size','md5_hash','modified','file_name']]))
else:
print('No entries found.')
elif args.remove:
# REMOVE ENTRIES ASSOCIATED WITH FILE NAME
db = TinyDB(archive_directory.joinpath('archiver.json'))
File = Query()
if args.query:
print("Please enter field-value pairs for search query for entries to remove:")
query_data = prompt_for_meta_data()
results = db.search(Query().fragment(query_data))
else:
results = db.search(File.file_name == data['file_name'])
num_results = len(results)
archive_files = set()
if num_results > 0:
if input(f'{num_results:d} files(s) will be deleted. Proceed (Y/N)? ').upper() == 'Y':
# Store unique archive files from entries being removed
for result in results:
if 'archive_file' in result:
archive_files.add(result['archive_file'])
# Remove all entries
if query_data is not None:
db.remove(Query().fragment(query_data))
else:
db.remove(File.file_name == data['file_name'])
# Remove archive files no longer referenced
for archive_file in archive_files:
results = db.search(File.archive_file == archive_file)
if len(results) == 0:
print(f'Removing archive file {archive_file:s}...')
Path(archive_file).unlink(missing_ok=True)
else:
print('No entries found.')
elif data['file_name'] != 'ALL' and not file_path_exists and args.meta:
# MODIFY AN EXISTING ENTRY
if input(f'The file does not exist. Do you wish to modify an existing entry (Y/N)? ').upper() == 'Y':
time_stamp = input('Please enter the time stamp of the entry you would like to modify (or QUIT): ')
while time_stamp != 'QUIT':
entry_found = True
try:
if not flt_re.match(time_stamp):
time_stamp = datetime.datetime.fromisoformat(time_stamp).timestamp()
else:
time_stamp = float(time_stamp)
except:
print('Invalid Time Stamp.')
entry_found = False
if entry_found:
db = TinyDB(archive_directory.joinpath('archiver.json'))
File = Query()
results = db.search((File.file_name == data['file_name']) & (File.modified == time_stamp))
if len(results) > 0:
result_data = results[0]
print('Existing data:')
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in result_data.items() if n not in ['file_name','modified']]))
print('Enter any additional data or update existing entries.')
meta_data = prompt_for_meta_data()
db.update( meta_data, (File.file_name == data['file_name']) & (File.modified == result_data['modified']) )
break
else:
print('No matching entry found.')
entry_found = False
if not entry_found:
time_stamp = input('Please enter the time stamp of the entry you would like to modify (or QUIT): ')
else:
# ADD A NEW ENTRY
# Check for existing version with the same modification date
# Add to archive if genuinely new file.
db = TinyDB(archive_directory.joinpath('archiver.json'))
File = Query()
results = db.search((File.file_name == data['file_name']) & (File.modified == data['modified']))
if len(results) > 0:
print("File already exists in archive with the same name and modification date.")
print("File will not be archived. Instead we will compare your file to the existing archived file.")
print( len(results) )
refdata = results[0]
if refdata['size'] != data['size']:
sys.stderr.write(f' Warning: Archived file size mismatch (Archived {refsize["size"]:d} bytes, New: {data["size"]:d} bytes.')
elif refdata['md5_hash'] != data['md5_hash']:
sys.stderr.write(f' Warning: MD5 Hash of archived file does not match {str(file_path):s}.')
else:
print(' Files are identical')
if args.meta:
print('Existing data:')
print('\n'.join([f' {n:s} : {str(v):s}' for n,v in results[0].items() if n not in ['file_name','md5_hash']]))
print('Enter any additional data or update existing entries.')
meta_data = prompt_for_meta_data()
db.update( meta_data, (File.file_name == data['file_name']) & (File.modified == data['modified']) )
else:
if args.meta:
data.update(prompt_for_meta_data())
archive_file = get_archive_file_name(archive_directory,Path(data['file_name']),data)
if archive_file.exists():
print('Identical file in archive directory. File matches a previous version.')
else:
print(f'Copying file to archive as {archive_file.name:s}')
shutil.copy(str(file_path), str(archive_file))
print('Recording new archive entry...')
data['archive_file'] = str(archive_file)
db.insert(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment