Skip to content

Instantly share code, notes, and snippets.

@tenstormavi
Created May 10, 2019 12:17
Show Gist options
  • Save tenstormavi/3b7088e12ba803fd71f515514afc2b11 to your computer and use it in GitHub Desktop.
Save tenstormavi/3b7088e12ba803fd71f515514afc2b11 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- mode: python -*-
import os
import sys
import sh
import glob
import base64
import signal
import hashlib
import shutil
import configtools
import subprocess
from enum import Enum
from argparse import ArgumentParser
from pbench import init_report_template, report_status, _rename_tb_link, \
PbenchConfig, BadConfig, get_es, get_pbench_logger, quarantine
from s3backup import S3Config
_NAME_ = "pbench-backup-tarballs"
# The link source and destination for this operation of this script.
_linksrc = "TO-BACKUP"
_linkdest = "BACKED-UP"
# Global logger for the module, setup in main()
_logger = None
class Status(Enum):
TRY_S3 = 10
NEXT_RESULT = 20
STATUS_OK = 30
class Results(object):
def __init__(self, status, ntotal=0, nbackup_success=0, nbackup_fail=0, ns3_success=0, ns3_fail=0, nquaran=0):
self.status = status
self.ntotal = ntotal
self.nbackup_success = nbackup_success
self.nbackup_fail = nbackup_fail
self.ns3_success = ns3_success
self.ns3_fail = ns3_fail
self.nquaran = nquaran
def sanity_check(s3_obj, config):
# make sure archive is present
archive = config.ARCHIVE
if not os.path.realpath(archive):
_logger.error(
'The ARCHIVE directory {}, does not resolve to a real location'.format(archive))
os._exit(1)
if not os.path.isdir(archive):
_logger.error(
'The ARCHIVE directory {}, does not resolve {} to a directory'.format(archive, os.path.realpath(archive)))
os._exit(1)
# make sure the local backup directory is present
backup = config.BACKUP
if len(backup) == 0:
_logger.error(
'Unspecified backup directory, no pbench-backup-dir config in pbench-server section')
os._exit(1)
try:
os.mkdir(backup)
except FileExistsError:
# directory already exists, ignore
pass
except Exception:
_logger.exception(
"os.mkdir: Unable to create backup destination directory: {}\n".format(backup))
os._exit(1)
if not os.path.realpath(backup):
_logger.error(
'The BACKUP directory {}, does not resolve to a real location'.format(backup))
os._exit(1)
if not os.path.isdir(backup):
_logger.error(
'The BACKUP directory {}, does not resolve {} to a directory'.format(backup, os.path.realpath(backup)))
os._exit(1)
# make sure the quarantine directory is present
qdir = config.QDIR
if len(qdir) == 0:
_logger.error(
'Unspecified quarantine directory, no pbench-quarantine-dir config in pbench-server section')
os._exit(1)
if not os.path.realpath(qdir):
_logger.error(
'The QUARANTINE directory {}, does not resolve to a real location'.format(qdir))
os._exit(1)
if not os.path.isdir(qdir):
_logger.error(
'The QUARANTINE directory {}, does not resolve {} to a directory'.format(qdir, os.path.realpath(qdir)))
os._exit(1)
# make sure the S3 bucket exists
try:
s3_obj.connector.head_bucket(Bucket='{}'.format(s3_obj.bucket_name))
except Exception as e:
_logger.exception(
"Bucket: {} does not exist or you have no access. {}\n".format(s3_obj.bucket_name, e))
os._exit(1)
def prepare_report(report, TS, prog, config, fresults):
with open(report, 'a') as f:
f.write("{}.{}({})\n".format(prog, TS, config.PBENCH_ENV))
f.write("Status: {}, Total processed: {}, "
"Locally backed-ed succesfully: {}, "
"Failed to locally backed-up: {}, "
"Uploaded to S3 succesfully: {}, "
"Failed to upload to S3: {}, Quarantine: {}"
.format(fresults.status.name,
fresults.ntotal,
fresults.nbackup_success,
fresults.nbackup_fail,
fresults.ns3_success,
fresults.ns3_fail,
fresults.nquaran))
def backup_to_local(config, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value):
nbackup_success = nbackup_fail = nquaran = 0
backup_controller_path = "{}/{}".format(config.BACKUP, controller)
# make sure the controller is present in local backup directory
try:
os.mkdir(backup_controller_path)
except FileExistsError:
# directory already exists, ignore
pass
except Exception:
logger.exception(
"os.mkdir: Unable to create backup destination directory: {}\n".format(backup_controller_path))
return Results(Status.TRY_S3)
# Check if tarball exist in local backup
backup_tar = os.path.join(backup_controller_path, resultname)
if os.path.exists(backup_tar) and os.path.isfile(backup_tar):
backup_md5 = (
"{}/{}.md5".format(backup_controller_path, resultname))
# check backup md5 file exist and it is a regular file
if os.path.exists(backup_md5) and os.path.isfile(backup_md5):
pass
else:
# backup md5 file does not exist or it is not a regular file
quarantine(qdir, logger, tb)
nquaran += 1
logger.error(
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, backup_md5))
return Results(Status.TRY_S3, nquaran=nquaran)
# read backup md5 file
try:
with open(backup_md5) as f:
backup_md5_hex_value = f.readline().split(" ")[0]
except (OSError, IOError) as e:
# Could not read file
quarantine(qdir, logger, tb)
nquaran += 1
logger.error(
"Quarantine: {}, could not read file {}, {}\n".format(tb, backup_md5, e))
return Results(Status.TRY_S3, nquaran=nquaran)
else:
# This asserts are just for testing purpose. They will not be
# in production. By now we make sure that the backed-up
# tarball and its md5 file exist and both are regular file.
assert os.path.exists(os.path.join(
backup_controller_path, resultname))
assert os.path.exists(
"{}/{}.md5".format(backup_controller_path, resultname))
if archive_md5_hex_value == backup_md5_hex_value:
# declare success
nbackup_success += 1
logger.info(
"Already locally backed-up: {}\n".format(resultname))
return Results(Status.TRY_S3, nbackup_success=nbackup_success)
else:
# md5 file of archive and backup does not match
quarantine(qdir, logger, tb)
nquaran += 1
logger.info(
"{} already exists in backup but md5 sums of archive and backup disagree\n".format(resultname))
return Results(Status.TRY_S3, nquaran=nquaran)
else:
md5_done = tar_done = False
# copy the md5 file from archive to backup
try:
shutil.copy(archive_md5, backup_controller_path)
md5_done = True
except Exception:
# couldn't copy md5 file
nbackup_fail += 1
logger.exception(
"shutil.copy: Unable to copy {} from archive to backup: {}\n".format(archive_md5, backup_controller_path))
# copy the tarball from archive to backup
if md5_done:
try:
shutil.copy(tar, backup_controller_path)
tar_done = True
except Exception:
# couldn't copy tarball
nbackup_fail += 1
logger.exception(
"shutil.copy: Unable to copy {} from archive to backup: {}\n".format(tar, backup_controller_path))
# remove the copied md5 file from backup
if md5_done:
bmd5_file = "{}/{}.md5".format(
backup_controller_path, resultname)
if os.path.exists(bmd5_file):
os.remove(bmd5_file)
if md5_done and tar_done:
nbackup_success += 1
logger.info(
"Locally Backed-up Sucessfully: {}\n".format(resultname))
return Results(Status.TRY_S3, nbackup_success=nbackup_success)
else:
return Results(Status.TRY_S3, nbackup_fail=nbackup_fail)
def backup_to_s3(s3_obj, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value):
ns3_success = ns3_fail = nquaran = 0
s3_resultname = "{}/{}".format(controller, resultname)
# Check if the result already present in s3 or not
try:
obj = s3_obj.connector.get_object(Bucket='{}'.format(
s3_obj.bucket_name), Key='{}'.format(s3_resultname))
in_s3 = True
except Exception:
in_s3 = False
if in_s3:
# compare md5 which we already have so no need to recalculate
s3_md5 = obj['ETag'].strip("\"")
if archive_md5 == s3_md5:
# declare success
_rename_tb_link(tb, os.path.join(
controller_path, _linkdest), logger)
ns3_success += 1
logger.info(
"The tarball {} is already present in s3 bucket, with same md5\n".format(s3_resultname))
return Results(Status.NEXT_RESULT, ns3_success=ns3_success)
else:
quarantine(qdir, logger, tb)
nquaran += 1
logger.error(
"Quarantine: {}, the tarball present in S3 but with different MD5\n".format(s3_resultname))
return Results(Status.NEXT_RESULT, nquaran=nquaran)
else:
md5_base64_value = (base64.b64encode(
bytes.fromhex(archive_md5_hex_value))).decode()
try:
with open(tar, 'rb') as data:
try:
s3_obj.connector.put_object(
Bucket=s3_obj.bucket_name, Key=s3_resultname, Body=data, ContentMD5=md5_base64_value)
except Exception as e:
quarantine(qdir, logger, tb)
nquaran += 1
ns3_fail += 1
logger.exception(
"Upload to s3 failed, Bad md5 for: {}, {}\n".format(s3_resultname, e))
return Results(Status.NEXT_RESULT, nquaran=nquaran)
else:
ns3_success += 1
_rename_tb_link(tb, os.path.join(
controller_path, _linkdest), logger)
logger.info(
"Upload to s3 succeeded: {}\n".format(s3_resultname))
return Results(Status.NEXT_RESULT,
ns3_success=ns3_success)
except (OSError, IOError) as e:
# could not read tarball
quarantine(qdir, logger, tb)
nquaran += 1
logger.error(
"Quarantine: {}, Failed to open tarball, {}\n".format(tar, e))
return Results(Status.NEXT_RESULT, nquaran=nquaran)
def backup_data(s3_obj, config, logger):
qdir = config.QDIR
tarlist = glob.iglob('{}/*/{}/*.tar.xz'.format(config.ARCHIVE, _linksrc))
fntotal = fnbackup_success = fnbackup_fail = \
fns3_success = fns3_fail = fnquaran = quaran = 0
for tb in sorted(tarlist):
fntotal += 1
# resolve the link
tar = os.path.realpath(tb)
# check tarball exist and it is a regular file
if os.path.exists(tar) and os.path.isfile(tar):
pass
else:
# tarball does not exist or it is not a regular file
quarantine(qdir, logger, tb)
quaran += 1
logger.error(
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, tar))
continue
archive_md5 = ("{}.md5".format(tar))
# check md5 file exist and it is a regular file
if os.path.exists(archive_md5) and os.path.isfile(archive_md5):
pass
else:
# md5 file does not exist or it is not a regular file
quarantine(qdir, logger, tb)
quaran += 1
logger.error(
"Quarantine: {}, {} does not exist or it is not a regular file\n".format(tb, archive_md5))
continue
# read the md5sum from md5 file
try:
with open(archive_md5) as f:
archive_md5_hex_value = f.readline().split(" ")[0]
except (OSError, IOError) as e:
# Could not read file.
quarantine(qdir, logger, tb)
quaran += 1
logger.error(
"Quarantine: {}, Could not read {}, {}\n".format(tb, archive_md5, e))
continue
# match md5sum of the tarball to its md5 file
try:
with open(tar, 'rb') as f:
adata = f.read()
archive_tar_hex_value = hashlib.md5(adata).hexdigest()
except (OSError, IOError) as e:
quarantine(qdir, logger, tb)
quaran += 1
logger.error(
"Quarantine: {}, Could not read {}, {}\n".format(tb, tar, e))
continue
else:
if archive_tar_hex_value == archive_md5_hex_value:
pass
else:
quarantine(qdir, logger, tb)
quaran += 1
logger.error(
"Quarantine: {}, md5sum of {} does not match with its md5 file {}\n".format(tb, tar, archive_md5))
continue
# This asserts are just for testing purpose. They will not be in
# production. By now we make sure the tarball and its md5 file
# exist and both are regular file. We also calculate the md5sum
# from the tarball and match with the md5 file, and it matches.
assert os.path.exists(tar)
assert os.path.exists(archive_md5)
resultname = os.path.basename(tar)
controller_path = os.path.dirname(tar)
controller = os.path.basename(controller_path)
backup_controller_path = "{}/{}".format(config.BACKUP, controller)
# This function call will handle all the local backup related
# operations and count the number of success and failure.
local_backup_result = backup_to_local(
config, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value)
fnbackup_success = fnbackup_success + \
local_backup_result.nbackup_success
fnbackup_fail = fnbackup_fail + local_backup_result.nbackup_fail
fnquaran = fnquaran + local_backup_result.nquaran
# This function call will handle all the S3 bucket related
# operations and count the number of success and failure.
if (local_backup_result.status.name) == "TRY_S3":
s3_backup_result = backup_to_s3(
s3_obj, logger, qdir, controller_path, controller, tb, tar, resultname, archive_md5, archive_md5_hex_value)
fns3_success = fns3_success + s3_backup_result.ns3_success
fns3_fail = fns3_fail + s3_backup_result.ns3_fail
fnquaran = fnquaran + s3_backup_result.nquaran
if (s3_backup_result.status.name) == "NEXT_RESULT":
continue
return Results(Status.STATUS_OK, ntotal=fntotal,
nbackup_success=fnbackup_success, nbackup_fail=fnbackup_fail, ns3_success=fns3_success, ns3_fail=fns3_fail, nquaran=quaran-fns3_fail)
def main(parsed):
if not parsed.cfg_name:
print("{}: ERROR: No config file specified; set CONFIG env variable or"
" use --config <file> on the command line".format(_NAME_),
file=sys.stderr)
return 2
try:
config = PbenchConfig(parsed.cfg_name)
except BadConfig as e:
print("{}: {}".format(_NAME_, e), file=sys.stderr)
return 1
global _logger
_logger = get_pbench_logger(_NAME_, config)
# call the S3Config class
s3_obj = S3Config(config)
_logger.info('start-{}'.format(s3_obj.ts()))
# add a BACKUP field to the config object
config.BACKUP = config.conf.get("pbench-server", "pbench-backup-dir")
config.QDIR = config.get('pbench-server', 'pbench-quarantine-dir')
# call the sanity check function
sanity_check(s3_obj, config)
prog = os.path.basename(sys.argv[0])
report_dir = "{}/{}.{}".format(config.TMP, prog, os.getpid())
try:
os.mkdir(report_dir)
except FileExistsError:
# directory already exists, ignore
pass
report = "{}/report".format(report_dir)
# set up signal handlers.
def signal_handler(signal, frame):
sh.rm('-rf', report_dir)
os._exit(1)
sig_type = ['SIGINT', 'SIGQUIT', 'SIGTERM']
for i in sig_type:
signum = getattr(signal, i)
signal.signal(signum, signal_handler)
# Initiate the backup
fresults = backup_data(s3_obj, config, _logger)
# call prepare report function
TS = s3_obj.ts()
prepare_report(report, TS, prog, config, fresults)
es, idx_prefix = init_report_template(config, _logger)
# Call report-status
report_status(es, _logger, config.LOGSDIR,
idx_prefix, _NAME_, TS, "status", report)
_logger.info("Status: {}, Total processed: {}, "
"Locally backed-ed succesfully: {}, "
"Failed to locally backed-up: {}, "
"Uploaded to S3 succesfully: {}, "
"Failed to upload to S3: {}, Quarantine: {}"
.format(fresults.status.name,
fresults.ntotal,
fresults.nbackup_success,
fresults.nbackup_fail,
fresults.ns3_success,
fresults.ns3_fail,
fresults.nquaran))
_logger.info('end-{}'.format(s3_obj.ts()))
# Clean up list_dir before exiting
sh.rm('-rf', report_dir)
if __name__ == '__main__':
parser = ArgumentParser("""Usage: pbench-backup""")
parser.set_defaults(cfg_name=os.environ.get("CONFIG"))
parser.set_defaults(tmpdir=os.environ.get("TMPDIR"))
parsed = parser.parse_args()
status = main(parsed)
sys.exit(status)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment