Skip to content

Instantly share code, notes, and snippets.

@jctanner
Created July 22, 2020 19:28
Show Gist options
  • Save jctanner/db5a4d5e6a054ac0468ef09ed42ea276 to your computer and use it in GitHub Desktop.
Save jctanner/db5a4d5e6a054ac0468ef09ed42ea276 to your computer and use it in GitHub Desktop.
split bundles and ship
import glob
import inspect
import json
import logging
import os
import os.path
import tarfile
import tempfile
import shutil
import requests
from django.conf import settings
from django.utils.timezone import now, timedelta
from rest_framework.exceptions import PermissionDenied
from awx.conf.license import get_license
from awx.main.models import Job
from awx.main.access import access_registry
from awx.main.models.ha import TowerAnalyticsState
from awx.main.utils import get_awx_http_client_headers, set_environ
__all__ = ['register', 'gather', 'ship', 'table_version']
logger = logging.getLogger('awx.main.analytics')
manifest = dict()
def _valid_license():
try:
if get_license(show_key=False).get('license_type', 'UNLICENSED') == 'open':
return False
access_registry[Job](None).check_license()
except PermissionDenied:
logger.exception("A valid license was not found:")
return False
return True
def register(key, version):
"""
A decorator used to register a function as a metric collector.
Decorated functions should return JSON-serializable objects.
@register('projects_by_scm_type', 1)
def projects_by_scm_type():
return {'git': 5, 'svn': 1, 'hg': 0}
"""
def decorate(f):
f.__awx_analytics_key__ = key
f.__awx_analytics_version__ = version
return f
return decorate
def table_version(file_name, version):
global manifest
manifest[file_name] = version
def decorate(f):
return f
return decorate
def gather(dest=None, module=None, collection_type='scheduled'):
"""
Gather all defined metrics and write them as JSON files in a .tgz
:param dest: the (optional) absolute path to write a compressed tarball
:pararm module: the module to search for registered analytic collector
functions; defaults to awx.main.analytics.collectors
"""
run_now = now()
state = TowerAnalyticsState.get_solo()
last_run = state.last_run
logger.debug("Last analytics run was: {}".format(last_run))
max_interval = now() - timedelta(weeks=4)
if last_run < max_interval or not last_run:
last_run = max_interval
if _valid_license() is False:
logger.exception("Invalid License provided, or No License Provided")
return "Error: Invalid License provided, or No License Provided"
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE:
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return
if module is None:
from awx.main.analytics import collectors
module = collectors
#import datetime
#last_run = datetime.datetime.now()
dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'):
key = func.__awx_analytics_key__
manifest['{}.json'.format(key)] = func.__awx_analytics_version__
path = '{}.json'.format(os.path.join(dest, key))
import q; q(func.__name__, last_run)
with open('/tmp/testme.log', 'a') as f:
f.write('%s %s\n' % (func.__name__, last_run))
with open(path, 'w', encoding='utf-8') as f:
try:
if func.__name__ == 'query_info':
json.dump(func(last_run, collection_type=collection_type), f)
else:
json.dump(func(last_run), f)
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
path = os.path.join(dest, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(manifest, f)
except Exception:
logger.exception("Could not generate manifest.json")
f.close()
os.remove(f.name)
try:
collectors.copy_tables(since=last_run, full_path=dest)
except Exception:
logger.exception("Could not copy tables")
# can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([
settings.SYSTEM_UUID,
run_now.strftime('%Y-%m-%d-%H%M%S%z')
])
try:
tgz = shutil.make_archive(
os.path.join(os.path.dirname(dest), tarname),
'gztar',
dest
)
return tgz
except Exception:
logger.exception("Failed to write analytics archive file")
finally:
shutil.rmtree(dest)
def ship(path):
"""
Ship gathered metrics to the Insights API
"""
if not path:
logger.error('Automation Analytics TAR not found')
return
if "Error:" in str(path):
return
try:
logger.debug('shipping analytics file: {}'.format(path))
url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None)
if not url:
logger.error('AUTOMATION_ANALYTICS_URL is not set')
return
rh_user = getattr(settings, 'REDHAT_USERNAME', None)
rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
if not rh_user:
return logger.error('REDHAT_USERNAME is not set')
if not rh_password:
return logger.error('REDHAT_PASSWORD is not set')
'''
with open(path, 'rb') as f:
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
s = requests.Session()
s.headers = get_awx_http_client_headers()
s.headers.pop('Content-Type')
with set_environ(**settings.AWX_TASK_ENV):
response = s.post(url,
files=files,
verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
auth=(rh_user, rh_password),
headers=s.headers,
timeout=(31, 31))
if response.status_code != 202:
return logger.exception('Upload failed with status {}, {}'.format(response.status_code,
response.text))
'''
tarfiles = split_tar(path)
if len(tarfiles) == 0:
tarfiles = [path]
import q; q(tarfiles)
for tarfilen in tarfiles:
with open(tarfilen, 'rb') as f:
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
import q; q(files)
s = requests.Session()
s.headers = get_awx_http_client_headers()
s.headers.pop('Content-Type')
with set_environ(**settings.AWX_TASK_ENV):
response = s.post(url,
files=files,
verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
auth=(rh_user, rh_password),
headers=s.headers,
timeout=(31, 31))
if response.status_code != 202:
return logger.exception('Upload failed with status {}, {}'.format(response.status_code,
response.text))
run_now = now()
state = TowerAnalyticsState.get_solo()
state.last_run = run_now
state.save()
finally:
# cleanup tar.gz
os.remove(path)
def split_tar(filename, max_bytes_size=100000):
basefn = filename.replace('.tar.gz', '')
new_filenames = []
total_bytes = 0
flagged_files = set()
tdir = tempfile.mkdtemp()
with tarfile.open(filename, "r:gz") as tar:
for tarinfo in tar:
import q; q(tarinfo.name)
# bytes
import q; q(tarinfo.size)
total_bytes += tarinfo.size
if tarinfo.size > max_bytes_size:
flagged_files.add((tarinfo.name, tarinfo.size))
tar.extract(tarinfo.name, tdir)
#thistmppath = os.path.join(tdir, tarinfo.name)
#split_csv(thistmppath, 4)
import q; q('total bytes: %s' % total_bytes)
import q; q('flagged: %s' % flagged_files)
flagged_total = sum([x[1] for x in flagged_files])
N_splits = int(flagged_total / max_bytes_size)
import q; q('N splits: %s' % N_splits)
for x in range(0, N_splits):
import q; q(x)
newfn = basefn + '_' + str(x) + '.tar.gz'
new_filenames.append(newfn)
if os.path.exists(newfn):
os.remove(newfn)
edir = os.path.join(tdir, str(x))
tar.extractall(edir)
for ff in flagged_files:
cfile = os.path.join(edir, os.path.basename(ff[0]))
chunks = split_csv(cfile, N_splits)
with open(cfile, 'w') as f:
f.writelines(chunks[x])
newfiles = glob.glob('%s/*' % edir)
with tarfile.open(newfn, "w:gz") as newtar:
for nf in newfiles:
newtar.add(nf, arcname=os.path.basename(nf))
import q; q(new_filenames)
return new_filenames
def split_csv(filename, splits):
with open(filename, 'r') as f:
flines = f.readlines()
N = int(len(flines) / splits)
if len(flines) % N != 0:
N += 100
chunks = []
for i in range(0, len(flines), N):
chunks.append(flines[i:i+N])
for idx,x in enumerate(chunks):
if idx == 0:
continue
chunks[idx].insert(0, flines[0])
while len(chunks) < splits:
chunks.append([flines[0]])
#import epdb; epdb.st()
return chunks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment