Created
May 10, 2019 12:17
-
-
Save tenstormavi/3b7088e12ba803fd71f515514afc2b11 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- mode: python -*- | |
import os | |
import sys | |
import sh | |
import glob | |
import base64 | |
import signal | |
import hashlib | |
import shutil | |
import configtools | |
import subprocess | |
from enum import Enum | |
from argparse import ArgumentParser | |
from pbench import init_report_template, report_status, _rename_tb_link, \ | |
PbenchConfig, BadConfig, get_es, get_pbench_logger, quarantine | |
from s3backup import S3Config | |
_NAME_ = "pbench-backup-tarballs" | |
# The link source and destination for this operation of this script. | |
_linksrc = "TO-BACKUP" | |
_linkdest = "BACKED-UP" | |
# Global logger for the module, setup in main() | |
_logger = None | |
class Status(Enum): | |
TRY_S3 = 10 | |
NEXT_RESULT = 20 | |
STATUS_OK = 30 | |
class Results(object): | |
def __init__(self, status, ntotal=0, nbackup_success=0, nbackup_fail=0, ns3_success=0, ns3_fail=0, nquaran=0): | |
self.status = status | |
self.ntotal = ntotal | |
self.nbackup_success = nbackup_success | |
self.nbackup_fail = nbackup_fail | |
self.ns3_success = ns3_success | |
self.ns3_fail = ns3_fail | |
self.nquaran = nquaran | |
def sanity_check(s3_obj, config): | |
# make sure archive is present | |
archive = config.ARCHIVE | |
if not os.path.realpath(archive): | |
_logger.error( | |
'The ARCHIVE directory {}, does not resolve to a real location'.format(archive)) | |
os._exit(1) | |
if not os.path.isdir(archive): | |
_logger.error( | |
'The ARCHIVE directory {}, does not resolve {} to a directory'.format(archive, os.path.realpath(archive))) | |
os._exit(1) | |
# make sure the local backup directory is present | |
backup = config.BACKUP | |
if len(backup) == 0: | |
_logger.error( | |
'Unspecified backup directory, no pbench-backup-dir config in pbench-server section') | |
os._exit(1) | |
try: | |
os.mkdir(backup) | |
except FileExistsError: | |
# directory already exists, ignore | |
pass | |
except Exception: | |
_logger.exception( | |
"os.mkdir: Unable to create backup destination directory: {}\n".format(backup)) | |
os._exit(1) | |
if not os.path.realpath(backup): | |
_logger.error( | |
'The BACKUP directory {}, does not resolve to a real location'.format(backup)) | |
os._exit(1) | |
if not os.path.isdir(backup): | |
_logger.error( | |
'The BACKUP directory {}, does not resolve {} to a directory'.format(backup, os.path.realpath(backup))) | |
os._exit(1) | |
# make sure the quarantine directory is present | |
qdir = config.QDIR | |
if len(qdir) == 0: | |
_logger.error( | |
'Unspecified quarantine directory, no pbench-quarantine-dir config in pbench-server section') | |
os._exit(1) | |
if not os.path.realpath(qdir): | |
_logger.error( | |
'The QUARANTINE directory {}, does not resolve to a real location'.format(qdir)) | |
os._exit(1) | |
if not os.path.isdir(qdir): | |
_logger.error( | |
'The QUARANTINE directory {}, does not resolve {} to a directory'.format(qdir, os.path.realpath(qdir))) | |
os._exit(1) | |
# make sure the S3 bucket exists | |
try: | |
s3_obj.connector.head_bucket(Bucket='{}'.format(s3_obj.bucket_name)) | |
except Exception as e: | |
_logger.exception( | |
"Bucket: {} does not exist or you have no access. {}\n".format(s3_obj.bucket_name, e)) | |
os._exit(1) | |
def prepare_report(report, TS, prog, config, fresults): | |
with open(report, 'a') as f: | |
f.write("{}.{}({})\n".format(prog, TS, config.PBENCH_ENV)) | |
f.write("Status: {}, Total processed: {}, " | |
"Locally backed-ed succesfully: {}, " | |
"Failed to locally backed-up: {}, " | |
"Uploaded to S3 succesfully: {}, " | |
"Failed to upload to S3: {}, Quarantine: {}" | |
.format(fresults.status.name, | |
fresults.ntotal, | |
fresults.nbackup_success, | |
fresults.nbackup_fail, | |
fresults.ns3_success, | |
fresults.ns3_fail, | |
fresults.nquaran)) | |
def backup_to_local(config, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value): | |
nbackup_success = nbackup_fail = nquaran = 0 | |
backup_controller_path = "{}/{}".format(config.BACKUP, controller) | |
# make sure the controller is present in local backup directory | |
try: | |
os.mkdir(backup_controller_path) | |
except FileExistsError: | |
# directory already exists, ignore | |
pass | |
except Exception: | |
logger.exception( | |
"os.mkdir: Unable to create backup destination directory: {}\n".format(backup_controller_path)) | |
return Results(Status.TRY_S3) | |
# Check if tarball exist in local backup | |
backup_tar = os.path.join(backup_controller_path, resultname) | |
if os.path.exists(backup_tar) and os.path.isfile(backup_tar): | |
backup_md5 = ( | |
"{}/{}.md5".format(backup_controller_path, resultname)) | |
# check backup md5 file exist and it is a regular file | |
if os.path.exists(backup_md5) and os.path.isfile(backup_md5): | |
pass | |
else: | |
# backup md5 file does not exist or it is not a regular file | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
logger.error( | |
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, backup_md5)) | |
return Results(Status.TRY_S3, nquaran=nquaran) | |
# read backup md5 file | |
try: | |
with open(backup_md5) as f: | |
backup_md5_hex_value = f.readline().split(" ")[0] | |
except (OSError, IOError) as e: | |
# Could not read file | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
logger.error( | |
"Quarantine: {}, could not read file {}, {}\n".format(tb, backup_md5, e)) | |
return Results(Status.TRY_S3, nquaran=nquaran) | |
else: | |
# This asserts are just for testing purpose. They will not be | |
# in production. By now we make sure that the backed-up | |
# tarball and its md5 file exist and both are regular file. | |
assert os.path.exists(os.path.join( | |
backup_controller_path, resultname)) | |
assert os.path.exists( | |
"{}/{}.md5".format(backup_controller_path, resultname)) | |
if archive_md5_hex_value == backup_md5_hex_value: | |
# declare success | |
nbackup_success += 1 | |
logger.info( | |
"Already locally backed-up: {}\n".format(resultname)) | |
return Results(Status.TRY_S3, nbackup_success=nbackup_success) | |
else: | |
# md5 file of archive and backup does not match | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
logger.info( | |
"{} already exists in backup but md5 sums of archive and backup disagree\n".format(resultname)) | |
return Results(Status.TRY_S3, nquaran=nquaran) | |
else: | |
md5_done = tar_done = False | |
# copy the md5 file from archive to backup | |
try: | |
shutil.copy(archive_md5, backup_controller_path) | |
md5_done = True | |
except Exception: | |
# couldn't copy md5 file | |
nbackup_fail += 1 | |
logger.exception( | |
"shutil.copy: Unable to copy {} from archive to backup: {}\n".format(archive_md5, backup_controller_path)) | |
# copy the tarball from archive to backup | |
if md5_done: | |
try: | |
shutil.copy(tar, backup_controller_path) | |
tar_done = True | |
except Exception: | |
# couldn't copy tarball | |
nbackup_fail += 1 | |
logger.exception( | |
"shutil.copy: Unable to copy {} from archive to backup: {}\n".format(tar, backup_controller_path)) | |
# remove the copied md5 file from backup | |
if md5_done: | |
bmd5_file = "{}/{}.md5".format( | |
backup_controller_path, resultname) | |
if os.path.exists(bmd5_file): | |
os.remove(bmd5_file) | |
if md5_done and tar_done: | |
nbackup_success += 1 | |
logger.info( | |
"Locally Backed-up Sucessfully: {}\n".format(resultname)) | |
return Results(Status.TRY_S3, nbackup_success=nbackup_success) | |
else: | |
return Results(Status.TRY_S3, nbackup_fail=nbackup_fail) | |
def backup_to_s3(s3_obj, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value): | |
ns3_success = ns3_fail = nquaran = 0 | |
s3_resultname = "{}/{}".format(controller, resultname) | |
# Check if the result already present in s3 or not | |
try: | |
obj = s3_obj.connector.get_object(Bucket='{}'.format( | |
s3_obj.bucket_name), Key='{}'.format(s3_resultname)) | |
in_s3 = True | |
except Exception: | |
in_s3 = False | |
if in_s3: | |
# compare md5 which we already have so no need to recalculate | |
s3_md5 = obj['ETag'].strip("\"") | |
if archive_md5 == s3_md5: | |
# declare success | |
_rename_tb_link(tb, os.path.join( | |
controller_path, _linkdest), logger) | |
ns3_success += 1 | |
logger.info( | |
"The tarball {} is already present in s3 bucket, with same md5\n".format(s3_resultname)) | |
return Results(Status.NEXT_RESULT, ns3_success=ns3_success) | |
else: | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
logger.error( | |
"Quarantine: {}, the tarball present in S3 but with different MD5\n".format(s3_resultname)) | |
return Results(Status.NEXT_RESULT, nquaran=nquaran) | |
else: | |
md5_base64_value = (base64.b64encode( | |
bytes.fromhex(archive_md5_hex_value))).decode() | |
try: | |
with open(tar, 'rb') as data: | |
try: | |
s3_obj.connector.put_object( | |
Bucket=s3_obj.bucket_name, Key=s3_resultname, Body=data, ContentMD5=md5_base64_value) | |
except Exception as e: | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
ns3_fail += 1 | |
logger.exception( | |
"Upload to s3 failed, Bad md5 for: {}, {}\n".format(s3_resultname, e)) | |
return Results(Status.NEXT_RESULT, nquaran=nquaran) | |
else: | |
ns3_success += 1 | |
_rename_tb_link(tb, os.path.join( | |
controller_path, _linkdest), logger) | |
logger.info( | |
"Upload to s3 succeeded: {}\n".format(s3_resultname)) | |
return Results(Status.NEXT_RESULT, | |
ns3_success=ns3_success) | |
except (OSError, IOError) as e: | |
# could not read tarball | |
quarantine(qdir, logger, tb) | |
nquaran += 1 | |
logger.error( | |
"Quarantine: {}, Failed to open tarball, {}\n".format(tar, e)) | |
return Results(Status.NEXT_RESULT, nquaran=nquaran) | |
def backup_data(s3_obj, config, logger): | |
qdir = config.QDIR | |
tarlist = glob.iglob('{}/*/{}/*.tar.xz'.format(config.ARCHIVE, _linksrc)) | |
fntotal = fnbackup_success = fnbackup_fail = \ | |
fns3_success = fns3_fail = fnquaran = quaran = 0 | |
for tb in sorted(tarlist): | |
fntotal += 1 | |
# resolve the link | |
tar = os.path.realpath(tb) | |
# check tarball exist and it is a regular file | |
if os.path.exists(tar) and os.path.isfile(tar): | |
pass | |
else: | |
# tarball does not exist or it is not a regular file | |
quarantine(qdir, logger, tb) | |
quaran += 1 | |
logger.error( | |
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, tar)) | |
continue | |
archive_md5 = ("{}.md5".format(tar)) | |
# check md5 file exist and it is a regular file | |
if os.path.exists(archive_md5) and os.path.isfile(archive_md5): | |
pass | |
else: | |
# md5 file does not exist or it is not a regular file | |
quarantine(qdir, logger, tb) | |
quaran += 1 | |
logger.error( | |
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, archive_md5)) | |
continue | |
# read the md5sum from md5 file | |
try: | |
with open(archive_md5) as f: | |
archive_md5_hex_value = f.readline().split(" ")[0] | |
except (OSError, IOError) as e: | |
# Could not read file. | |
quarantine(qdir, logger, tb) | |
quaran += 1 | |
logger.error( | |
"Quarantine: {}, Could not read {}, {}\n".format(tb, archive_md5, e)) | |
continue | |
# match md5sum of the tarball to its md5 file | |
try: | |
with open(tar, 'rb') as f: | |
adata = f.read() | |
archive_tar_hex_value = hashlib.md5(adata).hexdigest() | |
except (OSError, IOError) as e: | |
quarantine(qdir, logger, tb) | |
quaran += 1 | |
logger.error( | |
"Quarantine: {}, Could not read {}, {}\n".format(tb, tar, e)) | |
continue | |
else: | |
if archive_tar_hex_value == archive_md5_hex_value: | |
pass | |
else: | |
quarantine(qdir, logger, tb) | |
quaran += 1 | |
logger.error( | |
"Quarantine: {}, md5sum of {} does not match with its md5 file {}\n".format(tb, tar, archive_md5)) | |
continue | |
# This asserts are just for testing purpose. They will not be in | |
# production. By now we make sure the tarball and its md5 file | |
# exist and both are regular file. We also calculate the md5sum | |
# from the tarball and match with the md5 file, and it matches. | |
assert os.path.exists(tar) | |
assert os.path.exists(archive_md5) | |
resultname = os.path.basename(tar) | |
controller_path = os.path.dirname(tar) | |
controller = os.path.basename(controller_path) | |
backup_controller_path = "{}/{}".format(config.BACKUP, controller) | |
# This function call will handle all the local backup related | |
# operations and count the number of success and failure. | |
local_backup_result = backup_to_local( | |
config, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value) | |
fnbackup_success = fnbackup_success + \ | |
local_backup_result.nbackup_success | |
fnbackup_fail = fnbackup_fail + local_backup_result.nbackup_fail | |
fnquaran = fnquaran + local_backup_result.nquaran | |
# This function call will handle all the S3 bucket related | |
# operations and count the number of success and failure. | |
if (local_backup_result.status.name) == "TRY_S3": | |
s3_backup_result = backup_to_s3( | |
s3_obj, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value) | |
fns3_success = fns3_success + s3_backup_result.ns3_success | |
fns3_fail = fns3_fail + s3_backup_result.ns3_fail | |
fnquaran = fnquaran + s3_backup_result.nquaran | |
if (s3_backup_result.status.name) == "NEXT_RESULT": | |
continue | |
return Results(Status.STATUS_OK, ntotal=fntotal, | |
nbackup_success=fnbackup_success, nbackup_fail=fnbackup_fail, ns3_success=fns3_success, ns3_fail=fns3_fail, nquaran=quaran-fns3_fail) | |
def main(parsed): | |
if not parsed.cfg_name: | |
print("{}: ERROR: No config file specified; set CONFIG env variable or" | |
" use --config <file> on the command line".format(_NAME_), | |
file=sys.stderr) | |
return 2 | |
try: | |
config = PbenchConfig(parsed.cfg_name) | |
except BadConfig as e: | |
print("{}: {}".format(_NAME_, e), file=sys.stderr) | |
return 1 | |
global _logger | |
_logger = get_pbench_logger(_NAME_, config) | |
# call the S3Config class | |
s3_obj = S3Config(config) | |
_logger.info('start-{}'.format(s3_obj.ts())) | |
# add a BACKUP field to the config object | |
config.BACKUP = config.conf.get("pbench-server", "pbench-backup-dir") | |
config.QDIR = config.get('pbench-server', 'pbench-quarantine-dir') | |
# call the sanity check function | |
sanity_check(s3_obj, config) | |
prog = os.path.basename(sys.argv[0]) | |
report_dir = "{}/{}.{}".format(config.TMP, prog, os.getpid()) | |
try: | |
os.mkdir(report_dir) | |
except FileExistsError: | |
# directory already exists, ignore | |
pass | |
report = "{}/report".format(report_dir) | |
# set up signal handlers. | |
def signal_handler(signal, frame): | |
sh.rm('-rf', report_dir) | |
os._exit(1) | |
sig_type = ['SIGINT', 'SIGQUIT', 'SIGTERM'] | |
for i in sig_type: | |
signum = getattr(signal, i) | |
signal.signal(signum, signal_handler) | |
# Initiate the backup | |
fresults = backup_data(s3_obj, config, _logger) | |
# call prepare report function | |
TS = s3_obj.ts() | |
prepare_report(report, TS, prog, config, fresults) | |
es, idx_prefix = init_report_template(config, _logger) | |
# Call report-status | |
report_status(es, _logger, config.LOGSDIR, | |
idx_prefix, _NAME_, TS, "status", report) | |
_logger.info("Status: {}, Total processed: {}, " | |
"Locally backed-ed succesfully: {}, " | |
"Failed to locally backed-up: {}, " | |
"Uploaded to S3 succesfully: {}, " | |
"Failed to upload to S3: {}, Quarantine: {}" | |
.format(fresults.status.name, | |
fresults.ntotal, | |
fresults.nbackup_success, | |
fresults.nbackup_fail, | |
fresults.ns3_success, | |
fresults.ns3_fail, | |
fresults.nquaran)) | |
_logger.info('end-{}'.format(s3_obj.ts())) | |
# Clean up list_dir before exiting | |
sh.rm('-rf', report_dir) | |
if __name__ == '__main__': | |
parser = ArgumentParser("""Usage: pbench-backup""") | |
parser.set_defaults(cfg_name=os.environ.get("CONFIG")) | |
parser.set_defaults(tmpdir=os.environ.get("TMPDIR")) | |
parsed = parser.parse_args() | |
status = main(parsed) | |
sys.exit(status) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment