Skip to content

Instantly share code, notes, and snippets.

@Battleroid

Battleroid/corpus_sample.py Secret

Created Feb 14, 2020
Embed
What would you like to do?
"""
Pull indices, dump a sample of docs to corpus file. Strip standard metadata
as that is unimportant and can be faked later on. Aliases with no documents are
omitted (can't get something from literally nothing).
Note: High sample counts may error out, as this script is simple and only makes
search requests without using the scrolling API. Default of 1000 should be a safe
bet however.
"""
import argparse
import json
import random
import re
from collections import Counter
from getpass import getpass, getuser
import requests
import tabulate
# Keep only these, the remainder can be generated on bulk creation.
KEEP_FIELDS = [
'context',
'extra',
'message',
'channel',
'elasticsearch_index'
]
def main(args):
# Setup
auth = (args.es_user, args.es_pass or getpass())
corpus_limit = args.corpus_limit
api = args.api.rstrip('/')
out = args.out
keepers = set(args.keepers.strip().split(',')) if args.keepers else KEEP_FIELDS
ro_pattern = re.compile('\-\d{4}\.\d{2}\.\d{2}\-\d{6}$')
# Get indices list and get the distribution of items
counts = Counter()
indices = requests.get(
f'{api}/_cat/indices',
auth=auth,
params={'h': 'index,docs.count', 'format': 'json'}
).json()
for entry in indices:
if not entry['docs.count']:
continue
index = ro_pattern.sub('', entry['index'])
docs = int(entry['docs.count'])
# Ignore specifically kibana/tasks indices, or . prefixed in general
if any(x in index for x in ['.kibana', '.tasks']):
continue
if index.startswith('.'):
continue
counts[index] += docs
total_docs = sum(counts.values())
omitted = 0
dist = {}
for alias, total in counts.items():
if total == 0:
continue
perc = total / total_docs
dist[alias] = total / total_docs
if total_docs < corpus_limit:
corpus_limit = total_docs
sample_counts = Counter(random.choices(
list(dist.keys()),
weights=dist.values(),
k=corpus_limit
))
lines = []
bar_maxlen = args.bar_max_len
for alias, perc in sorted(dist.items(), key=lambda x: (x[1], x[0])):
corpus_count = sample_counts[alias]
if corpus_count == 0:
omitted += 1
continue
bar = '[' + '#' * round(perc * bar_maxlen) + ' ' * (bar_maxlen - round(perc * bar_maxlen)) + ']'
lines.append([alias, f'{perc * 100:.2f}%', f'{counts[alias]:,}', corpus_count, bar])
print(tabulate.tabulate(
lines,
headers=['Alias', 'Percentage', 'Doc Count', 'Corpus Count\n(Sampled)', ''],
tablefmt='psql'
))
if omitted > 0:
print(f'{omitted} omitted aliases that are sampled at zero documents. '
'Try increasing the limit to incorporate more aliases.')
# Query each alias, pulling in last n documents, clear junk fields and save to corpus
fout = open(out, 'a')
fetched_total = 0
for i, item in enumerate(sample_counts.items()):
alias, n = item
print(f'[{i + 1}/{len(sample_counts)}] Working on {alias} ({n} docs)', flush=True, end='\r')
# Get & clean fields
resp = requests.get(
f'{api}/{alias}-*/_search',
params={
'size': n,
'filter_path': ','.join([f'hits.hits._source.{field}' for field in keepers]),
'expand_wildcards': 'open'
},
auth=auth,
).json()
try:
for doc in resp['hits']['hits']:
doc = doc['_source']
if doc:
fout.write(json.dumps(doc, separators=(',', ':')) + '\n')
fetched_total += 1
except KeyError:
print(f'\033[1;33;40mWARNING: {alias} has no results, check to see if the target fields are present in the pattern!\033[0;0m')
print(f'Completed, fetched {fetched_total} docs across {len(sample_counts.keys())} aliases.', flush=True, end='\n\r')
fout.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-u', '--es-user', default=getuser())
parser.add_argument('-p', '--es-pass', default=None)
parser.add_argument('-t', '--corpus-limit', type=int, default=1000, help='total maximum documents to keep from target cluster')
parser.add_argument('--bar-max-len', type=int, default=30, help='bar viz max length')
parser.add_argument('-k', '--keepers', type=str, default='', help='fields to keep, comma delimited')
parser.add_argument('api')
parser.add_argument('out')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment