""" | |
Pull indices, dump a sample of docs to corpus file. Strip standard metadata | |
as that is unimportant and can be faked later on. Aliases with no documents are | |
omitted (can't get something from literally nothing). | |
Note: High sample counts may error out, as this script is simple and only makes | |
search requests without using the scrolling API. Default of 1000 should be a safe | |
bet however. | |
""" | |
import argparse | |
import json | |
import random | |
import re | |
from collections import Counter | |
from getpass import getpass, getuser | |
import requests | |
import tabulate | |
# Keep only these, the remainder can be generated on bulk creation. | |
KEEP_FIELDS = [ | |
'context', | |
'extra', | |
'message', | |
'channel', | |
'elasticsearch_index' | |
] | |
def main(args): | |
# Setup | |
auth = (args.es_user, args.es_pass or getpass()) | |
corpus_limit = args.corpus_limit | |
api = args.api.rstrip('/') | |
out = args.out | |
keepers = set(args.keepers.strip().split(',')) if args.keepers else KEEP_FIELDS | |
ro_pattern = re.compile('\-\d{4}\.\d{2}\.\d{2}\-\d{6}$') | |
# Get indices list and get the distribution of items | |
counts = Counter() | |
indices = requests.get( | |
f'{api}/_cat/indices', | |
auth=auth, | |
params={'h': 'index,docs.count', 'format': 'json'} | |
).json() | |
for entry in indices: | |
if not entry['docs.count']: | |
continue | |
index = ro_pattern.sub('', entry['index']) | |
docs = int(entry['docs.count']) | |
# Ignore specifically kibana/tasks indices, or . prefixed in general | |
if any(x in index for x in ['.kibana', '.tasks']): | |
continue | |
if index.startswith('.'): | |
continue | |
counts[index] += docs | |
total_docs = sum(counts.values()) | |
omitted = 0 | |
dist = {} | |
for alias, total in counts.items(): | |
if total == 0: | |
continue | |
perc = total / total_docs | |
dist[alias] = total / total_docs | |
if total_docs < corpus_limit: | |
corpus_limit = total_docs | |
sample_counts = Counter(random.choices( | |
list(dist.keys()), | |
weights=dist.values(), | |
k=corpus_limit | |
)) | |
lines = [] | |
bar_maxlen = args.bar_max_len | |
for alias, perc in sorted(dist.items(), key=lambda x: (x[1], x[0])): | |
corpus_count = sample_counts[alias] | |
if corpus_count == 0: | |
omitted += 1 | |
continue | |
bar = '[' + '#' * round(perc * bar_maxlen) + ' ' * (bar_maxlen - round(perc * bar_maxlen)) + ']' | |
lines.append([alias, f'{perc * 100:.2f}%', f'{counts[alias]:,}', corpus_count, bar]) | |
print(tabulate.tabulate( | |
lines, | |
headers=['Alias', 'Percentage', 'Doc Count', 'Corpus Count\n(Sampled)', ''], | |
tablefmt='psql' | |
)) | |
if omitted > 0: | |
print(f'{omitted} omitted aliases that are sampled at zero documents. ' | |
'Try increasing the limit to incorporate more aliases.') | |
# Query each alias, pulling in last n documents, clear junk fields and save to corpus | |
fout = open(out, 'a') | |
fetched_total = 0 | |
for i, item in enumerate(sample_counts.items()): | |
alias, n = item | |
print(f'[{i + 1}/{len(sample_counts)}] Working on {alias} ({n} docs)', flush=True, end='\r') | |
# Get & clean fields | |
resp = requests.get( | |
f'{api}/{alias}-*/_search', | |
params={ | |
'size': n, | |
'filter_path': ','.join([f'hits.hits._source.{field}' for field in keepers]), | |
'expand_wildcards': 'open' | |
}, | |
auth=auth, | |
).json() | |
try: | |
for doc in resp['hits']['hits']: | |
doc = doc['_source'] | |
if doc: | |
fout.write(json.dumps(doc, separators=(',', ':')) + '\n') | |
fetched_total += 1 | |
except KeyError: | |
print(f'\033[1;33;40mWARNING: {alias} has no results, check to see if the target fields are present in the pattern!\033[0;0m') | |
print(f'Completed, fetched {fetched_total} docs across {len(sample_counts.keys())} aliases.', flush=True, end='\n\r') | |
fout.close() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('-u', '--es-user', default=getuser()) | |
parser.add_argument('-p', '--es-pass', default=None) | |
parser.add_argument('-t', '--corpus-limit', type=int, default=1000, help='total maximum documents to keep from target cluster') | |
parser.add_argument('--bar-max-len', type=int, default=30, help='bar viz max length') | |
parser.add_argument('-k', '--keepers', type=str, default='', help='fields to keep, comma delimited') | |
parser.add_argument('api') | |
parser.add_argument('out') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment