Created
July 22, 2020 19:28
-
-
Save jctanner/db5a4d5e6a054ac0468ef09ed42ea276 to your computer and use it in GitHub Desktop.
split bundles and ship
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import inspect | |
import json | |
import logging | |
import os | |
import os.path | |
import tarfile | |
import tempfile | |
import shutil | |
import requests | |
from django.conf import settings | |
from django.utils.timezone import now, timedelta | |
from rest_framework.exceptions import PermissionDenied | |
from awx.conf.license import get_license | |
from awx.main.models import Job | |
from awx.main.access import access_registry | |
from awx.main.models.ha import TowerAnalyticsState | |
from awx.main.utils import get_awx_http_client_headers, set_environ | |
__all__ = ['register', 'gather', 'ship', 'table_version'] | |
logger = logging.getLogger('awx.main.analytics') | |
manifest = dict() | |
def _valid_license(): | |
try: | |
if get_license(show_key=False).get('license_type', 'UNLICENSED') == 'open': | |
return False | |
access_registry[Job](None).check_license() | |
except PermissionDenied: | |
logger.exception("A valid license was not found:") | |
return False | |
return True | |
def register(key, version): | |
""" | |
A decorator used to register a function as a metric collector. | |
Decorated functions should return JSON-serializable objects. | |
@register('projects_by_scm_type', 1) | |
def projects_by_scm_type(): | |
return {'git': 5, 'svn': 1, 'hg': 0} | |
""" | |
def decorate(f): | |
f.__awx_analytics_key__ = key | |
f.__awx_analytics_version__ = version | |
return f | |
return decorate | |
def table_version(file_name, version): | |
global manifest | |
manifest[file_name] = version | |
def decorate(f): | |
return f | |
return decorate | |
def gather(dest=None, module=None, collection_type='scheduled'): | |
""" | |
Gather all defined metrics and write them as JSON files in a .tgz | |
:param dest: the (optional) absolute path to write a compressed tarball | |
:pararm module: the module to search for registered analytic collector | |
functions; defaults to awx.main.analytics.collectors | |
""" | |
run_now = now() | |
state = TowerAnalyticsState.get_solo() | |
last_run = state.last_run | |
logger.debug("Last analytics run was: {}".format(last_run)) | |
max_interval = now() - timedelta(weeks=4) | |
if last_run < max_interval or not last_run: | |
last_run = max_interval | |
if _valid_license() is False: | |
logger.exception("Invalid License provided, or No License Provided") | |
return "Error: Invalid License provided, or No License Provided" | |
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: | |
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") | |
return | |
if module is None: | |
from awx.main.analytics import collectors | |
module = collectors | |
#import datetime | |
#last_run = datetime.datetime.now() | |
dest = dest or tempfile.mkdtemp(prefix='awx_analytics') | |
for name, func in inspect.getmembers(module): | |
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): | |
key = func.__awx_analytics_key__ | |
manifest['{}.json'.format(key)] = func.__awx_analytics_version__ | |
path = '{}.json'.format(os.path.join(dest, key)) | |
import q; q(func.__name__, last_run) | |
with open('/tmp/testme.log', 'a') as f: | |
f.write('%s %s\n' % (func.__name__, last_run)) | |
with open(path, 'w', encoding='utf-8') as f: | |
try: | |
if func.__name__ == 'query_info': | |
json.dump(func(last_run, collection_type=collection_type), f) | |
else: | |
json.dump(func(last_run), f) | |
except Exception: | |
logger.exception("Could not generate metric {}.json".format(key)) | |
f.close() | |
os.remove(f.name) | |
path = os.path.join(dest, 'manifest.json') | |
with open(path, 'w', encoding='utf-8') as f: | |
try: | |
json.dump(manifest, f) | |
except Exception: | |
logger.exception("Could not generate manifest.json") | |
f.close() | |
os.remove(f.name) | |
try: | |
collectors.copy_tables(since=last_run, full_path=dest) | |
except Exception: | |
logger.exception("Could not copy tables") | |
# can't use isoformat() since it has colons, which GNU tar doesn't like | |
tarname = '_'.join([ | |
settings.SYSTEM_UUID, | |
run_now.strftime('%Y-%m-%d-%H%M%S%z') | |
]) | |
try: | |
tgz = shutil.make_archive( | |
os.path.join(os.path.dirname(dest), tarname), | |
'gztar', | |
dest | |
) | |
return tgz | |
except Exception: | |
logger.exception("Failed to write analytics archive file") | |
finally: | |
shutil.rmtree(dest) | |
def ship(path): | |
""" | |
Ship gathered metrics to the Insights API | |
""" | |
if not path: | |
logger.error('Automation Analytics TAR not found') | |
return | |
if "Error:" in str(path): | |
return | |
try: | |
logger.debug('shipping analytics file: {}'.format(path)) | |
url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None) | |
if not url: | |
logger.error('AUTOMATION_ANALYTICS_URL is not set') | |
return | |
rh_user = getattr(settings, 'REDHAT_USERNAME', None) | |
rh_password = getattr(settings, 'REDHAT_PASSWORD', None) | |
if not rh_user: | |
return logger.error('REDHAT_USERNAME is not set') | |
if not rh_password: | |
return logger.error('REDHAT_PASSWORD is not set') | |
''' | |
with open(path, 'rb') as f: | |
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} | |
s = requests.Session() | |
s.headers = get_awx_http_client_headers() | |
s.headers.pop('Content-Type') | |
with set_environ(**settings.AWX_TASK_ENV): | |
response = s.post(url, | |
files=files, | |
verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", | |
auth=(rh_user, rh_password), | |
headers=s.headers, | |
timeout=(31, 31)) | |
if response.status_code != 202: | |
return logger.exception('Upload failed with status {}, {}'.format(response.status_code, | |
response.text)) | |
''' | |
tarfiles = split_tar(path) | |
if len(tarfiles) == 0: | |
tarfiles = [path] | |
import q; q(tarfiles) | |
for tarfilen in tarfiles: | |
with open(tarfilen, 'rb') as f: | |
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} | |
import q; q(files) | |
s = requests.Session() | |
s.headers = get_awx_http_client_headers() | |
s.headers.pop('Content-Type') | |
with set_environ(**settings.AWX_TASK_ENV): | |
response = s.post(url, | |
files=files, | |
verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", | |
auth=(rh_user, rh_password), | |
headers=s.headers, | |
timeout=(31, 31)) | |
if response.status_code != 202: | |
return logger.exception('Upload failed with status {}, {}'.format(response.status_code, | |
response.text)) | |
run_now = now() | |
state = TowerAnalyticsState.get_solo() | |
state.last_run = run_now | |
state.save() | |
finally: | |
# cleanup tar.gz | |
os.remove(path) | |
def split_tar(filename, max_bytes_size=100000): | |
basefn = filename.replace('.tar.gz', '') | |
new_filenames = [] | |
total_bytes = 0 | |
flagged_files = set() | |
tdir = tempfile.mkdtemp() | |
with tarfile.open(filename, "r:gz") as tar: | |
for tarinfo in tar: | |
import q; q(tarinfo.name) | |
# bytes | |
import q; q(tarinfo.size) | |
total_bytes += tarinfo.size | |
if tarinfo.size > max_bytes_size: | |
flagged_files.add((tarinfo.name, tarinfo.size)) | |
tar.extract(tarinfo.name, tdir) | |
#thistmppath = os.path.join(tdir, tarinfo.name) | |
#split_csv(thistmppath, 4) | |
import q; q('total bytes: %s' % total_bytes) | |
import q; q('flagged: %s' % flagged_files) | |
flagged_total = sum([x[1] for x in flagged_files]) | |
N_splits = int(flagged_total / max_bytes_size) | |
import q; q('N splits: %s' % N_splits) | |
for x in range(0, N_splits): | |
import q; q(x) | |
newfn = basefn + '_' + str(x) + '.tar.gz' | |
new_filenames.append(newfn) | |
if os.path.exists(newfn): | |
os.remove(newfn) | |
edir = os.path.join(tdir, str(x)) | |
tar.extractall(edir) | |
for ff in flagged_files: | |
cfile = os.path.join(edir, os.path.basename(ff[0])) | |
chunks = split_csv(cfile, N_splits) | |
with open(cfile, 'w') as f: | |
f.writelines(chunks[x]) | |
newfiles = glob.glob('%s/*' % edir) | |
with tarfile.open(newfn, "w:gz") as newtar: | |
for nf in newfiles: | |
newtar.add(nf, arcname=os.path.basename(nf)) | |
import q; q(new_filenames) | |
return new_filenames | |
def split_csv(filename, splits): | |
with open(filename, 'r') as f: | |
flines = f.readlines() | |
N = int(len(flines) / splits) | |
if len(flines) % N != 0: | |
N += 100 | |
chunks = [] | |
for i in range(0, len(flines), N): | |
chunks.append(flines[i:i+N]) | |
for idx,x in enumerate(chunks): | |
if idx == 0: | |
continue | |
chunks[idx].insert(0, flines[0]) | |
while len(chunks) < splits: | |
chunks.append([flines[0]]) | |
#import epdb; epdb.st() | |
return chunks |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment