-
-
Save Battleroid/c80763dffc415814460d51c78450ce2e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Pull indices, dump a sample of docs to corpus file. Strip standard metadata | |
as that is unimportant and can be faked later on. Aliases with no documents are | |
omitted (can't get something from literally nothing). | |
Note: High sample counts may error out, as this script is simple and only makes | |
search requests without using the scrolling API. Default of 1000 should be a safe | |
bet however. | |
""" | |
import argparse | |
import json | |
import random | |
import re | |
from collections import Counter | |
from getpass import getpass, getuser | |
import requests | |
import tabulate | |
# Keep only these, the remainder can be generated on bulk creation. | |
KEEP_FIELDS = [ | |
'context', | |
'extra', | |
'message', | |
'channel', | |
'elasticsearch_index' | |
] | |
def main(args): | |
# Setup | |
auth = (args.es_user, args.es_pass or getpass()) | |
corpus_limit = args.corpus_limit | |
api = args.api.rstrip('/') | |
out = args.out | |
keepers = set(args.keepers.strip().split(',')) if args.keepers else KEEP_FIELDS | |
ro_pattern = re.compile('\-\d{4}\.\d{2}\.\d{2}\-\d{6}$') | |
# Get indices list and get the distribution of items | |
counts = Counter() | |
indices = requests.get( | |
f'{api}/_cat/indices', | |
auth=auth, | |
params={'h': 'index,docs.count', 'format': 'json'} | |
).json() | |
for entry in indices: | |
if not entry['docs.count']: | |
continue | |
index = ro_pattern.sub('', entry['index']) | |
docs = int(entry['docs.count']) | |
# Ignore specifically kibana/tasks indices, or . prefixed in general | |
if any(x in index for x in ['.kibana', '.tasks']): | |
continue | |
if index.startswith('.'): | |
continue | |
counts[index] += docs | |
total_docs = sum(counts.values()) | |
omitted = 0 | |
dist = {} | |
for alias, total in counts.items(): | |
if total == 0: | |
continue | |
perc = total / total_docs | |
dist[alias] = total / total_docs | |
if total_docs < corpus_limit: | |
corpus_limit = total_docs | |
sample_counts = Counter(random.choices( | |
list(dist.keys()), | |
weights=dist.values(), | |
k=corpus_limit | |
)) | |
lines = [] | |
bar_maxlen = args.bar_max_len | |
for alias, perc in sorted(dist.items(), key=lambda x: (x[1], x[0])): | |
corpus_count = sample_counts[alias] | |
if corpus_count == 0: | |
omitted += 1 | |
continue | |
bar = '[' + '#' * round(perc * bar_maxlen) + ' ' * (bar_maxlen - round(perc * bar_maxlen)) + ']' | |
lines.append([alias, f'{perc * 100:.2f}%', f'{counts[alias]:,}', corpus_count, bar]) | |
print(tabulate.tabulate( | |
lines, | |
headers=['Alias', 'Percentage', 'Doc Count', 'Corpus Count\n(Sampled)', ''], | |
tablefmt='psql' | |
)) | |
if omitted > 0: | |
print(f'{omitted} omitted aliases that are sampled at zero documents. ' | |
'Try increasing the limit to incorporate more aliases.') | |
# Query each alias, pulling in last n documents, clear junk fields and save to corpus | |
fout = open(out, 'a') | |
fetched_total = 0 | |
for i, item in enumerate(sample_counts.items()): | |
alias, n = item | |
print(f'[{i + 1}/{len(sample_counts)}] Working on {alias} ({n} docs)', flush=True, end='\r') | |
# Get & clean fields | |
resp = requests.get( | |
f'{api}/{alias}-*/_search', | |
params={ | |
'size': n, | |
'filter_path': ','.join([f'hits.hits._source.{field}' for field in keepers]), | |
'expand_wildcards': 'open' | |
}, | |
auth=auth, | |
).json() | |
try: | |
for doc in resp['hits']['hits']: | |
doc = doc['_source'] | |
if doc: | |
fout.write(json.dumps(doc, separators=(',', ':')) + '\n') | |
fetched_total += 1 | |
except KeyError: | |
print(f'\033[1;33;40mWARNING: {alias} has no results, check to see if the target fields are present in the pattern!\033[0;0m') | |
print(f'Completed, fetched {fetched_total} docs across {len(sample_counts.keys())} aliases.', flush=True, end='\n\r') | |
fout.close() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('-u', '--es-user', default=getuser()) | |
parser.add_argument('-p', '--es-pass', default=None) | |
parser.add_argument('-t', '--corpus-limit', type=int, default=1000, help='total maximum documents to keep from target cluster') | |
parser.add_argument('--bar-max-len', type=int, default=30, help='bar viz max length') | |
parser.add_argument('-k', '--keepers', type=str, default='', help='fields to keep, comma delimited') | |
parser.add_argument('api') | |
parser.add_argument('out') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment