Skip to content

Instantly share code, notes, and snippets.

@QuantVI
Created July 23, 2019 22:35
Show Gist options
  • Save QuantVI/1ed9e0ca85465119f70ce9bf091a1605 to your computer and use it in GitHub Desktop.
Save QuantVI/1ed9e0ca85465119f70ce9bf091a1605 to your computer and use it in GitHub Desktop.
Determine if the JSON result is 1 of 3 types, that over 12 different ElasticSearch queries could return. Parse based on this general type, and put the results into a readable format in Excel.
# -*- coding: utf-8 -*-
# import json
# import openpyxl
# This next section can be its own script if necessary
# begin dont_overwrite ----
ret_values = ['unknown','level_1','level_2','multi_s']
def detectStructure (jsondata):
"""Returns a value indicating if the ElasticSearch output has
1-level or 2-levels of aggregation buckets, or if the output is from a
multi-search"""
structure = ret_values[0]
msearch_indicators = (u'responses',)
leveled_indicators = (u'hits', u'timed_out', u'took',
u'aggregations', u'_shards')
t0_keys = jsondata.keys()
def el_one_or_two(somejson):
"""called when we know we're not dealing with multi-search results,
and we are definitely dealig with tiered results."""
ret = ''
t1 = somejson[leveled_indicators[3]]
for key in t1:
for item in t1[key]['buckets']:
for subkey in item:
if type(item[subkey]) == type({}):
if 'buckets' in item[subkey].keys():
ret = ret_values[2]
if ret != ret_values[2]:
ret = ret_values[1]
return ret
if any(k in t0_keys for k in msearch_indicators):
structure = ret_values[3]
elif any(k in t0_keys for k in leveled_indicators):
structure = el_one_or_two(jsondata)
return structure
def handleStructuredData(jsondata, structure):
"""After we detectStructure, we this fuction on the ssme jsondata, and
pass in the structure type. This fuction will print to one of
csv (or excel). All json of a certain structure can be printed to
csv/excel in the same way."""
k1 = 'aggregations'
k2 = 'key'
k3 = 'key_as_string'
def handle_unknown(jsondata,structure):
return 'unknown'
def handle_level_1(jsondata,structure):
hs = ''
print 'handle_level_1 called with "{}".'.format(structure)
if len(jsondata[k1].keys()) == 1:
hs = 'level_1_type_b'
else:
hs = 'level_1_type_a'
return hs
def handle_level_2(jsondata,structure):
print 'handle_level_2 called with "{}".'.format(structure)
return 'level_2_type_a'
def handle_multi_s(jsondata,structure):
print 'handle_multi_s called with "{}".'.format(structure)
return 'multi_s_type_a'
function_pick = {
ret_values[0]: handle_unknown,
ret_values[1]: handle_level_1,
ret_values[2]: handle_level_2,
ret_values[3]: handle_multi_s
}
return function_pick[structure](jsondata,structure)
def printInExcel(jsondata,handled_structure, out_to, print_mode='new_sheet'):
import openpyxl
print 'I will make an Excel file for :',handled_structure
print 'I will write to :',out_to
#wb = None
#if print_mode == 'new_sheet':
#wb = openpyxl.load_workbook(filename = out_to)
#else:
#wb = openpyxl.Workbook()
wb = openpyxl.load_workbook(filename = out_to)
if handled_structure == 'level_1_type_a':
# multiple 1st level aggregations
# simple bucket items
ws = wb.create_sheet()
for keyname in jsondata['aggregations']:
ws.append([]) # force a blank row
ws.append([keyname,'doc_count'])
bucket_list = jsondata['aggregations'][keyname]['buckets']
for dict_item in bucket_list:
ul1 = dict_item['key']
ul2 = dict_item['doc_count']
try:
ul = [ul1,ul2]
ws.append(ul)
except openpyxl.utils.exceptions.IllegalCharacterError:
ul = [ul1.__repr__(),ul2]
ws.append(ul)
wb.save(filename = out_to)
if handled_structure == 'level_1_type_b':
# single 1st level aggregation complex bucket items
ws = wb.create_sheet()
only_keyname = jsondata['aggregations'].keys()[0]
ws.title = only_keyname
bucket_list = jsondata['aggregations'][only_keyname]['buckets']
headers = bucket_list[0].keys()
# we keep either key or key_as_string (well formatted date)
# we force that to be the first item
# we force the overall doc count to be the last item
if 'key_as_string' in headers:
headers.remove('key')
headers.remove('key_as_string')
headers.insert(0,'key_as_string')
else:
headers.remove('key')
headers.insert(0,'key')
headers.remove('doc_count')
headers.append('doc_count')
ws.append(headers)
write_list = []
for dict_item in bucket_list:
main_key = ''
main_count = dict_item['doc_count']
if 'key_as_string' in headers:
main_key = 'key_as_string'
else:
main_key = 'key'
coll = {}
for kn in headers:
if (kn == main_key) or (kn == 'doc_count'):
coll[kn] = dict_item[kn]
else:
coll[kn] = dict_item[kn]['doc_count']
row_to_print = [coll[kn] for kn in headers]
ws.append(row_to_print)
wb.save(filename = out_to)
if handled_structure == 'level_2_type_a':
keylist = jsondata['aggregations'].keys()
header_set = set()
for keyname in keylist:
bucket_list = jsondata['aggregations'][keyname]['buckets']
for l1_dict in bucket_list:
for l1_key in l1_dict.keys():
header_set.add(l1_key)
# our second level agg will be a dict under one key
if type(l1_dict[l1_key]) == dict:
if l1_dict[l1_key].has_key('buckets'):
l2_bucket_list = l1_dict[l1_key]['buckets']
for item in l2_bucket_list:
header_set.add(item['key'])
# listify header_set and order it
header_list = list(header_set)
if 'key_as_string' in header_set:
header_list.remove('key')
header_list.remove('key_as_string')
header_list.insert(0,'key_as_string')
else:
header_list.remove('key')
header_list.insert(0,'key')
header_list.remove('doc_count')
header_list.append('doc_count')
# create a dict of what to write for a certain row
for keyname in keylist:
ws = wb.create_sheet(title = keyname)
ws.append(header_list)
bucket_list = jsondata['aggregations'][keyname]['buckets']
for l1_dict in bucket_list:
row_data = {}
for heading in header_list:
if heading in l1_dict.keys():
if type(l1_dict[heading]) == dict:
if l1_dict[heading].has_key('buckets'):
l2_bucket_list = l1_dict[heading]['buckets']
for item in l2_bucket_list:
row_data[item['key']] = item['doc_count']
else:
row_data[heading] = l1_dict[heading]
row_to_print = ['' if (not row_data.has_key(kn))
else row_data[kn] for kn in header_list]
ws.append(row_to_print)
wb.save(filename = out_to)
if handled_structure == 'multi_s_type_a':
pass
# sample multi search hierarchy
# {responses :
# [
# { hits : {
# { hits : {
# { hits :
# { hits :
# [ {reservaiton json} {reservaiton json} ]
ws = wb.create_sheet(title = 'responses')
resp_list = jsondata['responses']
headers_set = set()
for a in resp_list:
if a['hits']['hits']:
for b in a['hits']['hits']:
headers_set.update(b['_source'].keys())
headers = list(headers_set)
for hit_set in resp_list:
resv_list = hit_set['hits']['hits']
# if it's not empty list []
if resv_list:
# now we're at the json object for the reservation
for resv in resv_list:
resv['_source'] = row_data
row_to_print = []
for a_key in headers:
row_to_print.append(row_data[a_key])
print 'saved to : ',out_to,'\n'
@QuantVI
Copy link
Author

QuantVI commented Jul 23, 2019

Excel printer was a huge time-saver in the "data audit" process to check data quality in this role. The data were thousands or rows of hotel reservation information, with 12-30 columns per row. This information was stored in an ElasticSearch cluster, instead of being stored in SQL. Because of the storage format, there was no front-end way for people outside of Engineering Integrations to view and assess the data quality.

"Excel Printer" combined with my other custom script "Scribe Report" not only brought some automation to this process for myself and my team, but allowed us to create basic XLSX files of the results for counterparts in Customer Success and Account Representative roles. That doubled the amount of people who could help audit data and spot problems. We could even send certain results back to the clients to verify.

The benefits of this pair of scripts (along with the stored .JSON files containing each ElasticSearch query) were eventually recognized, and "story points" (i.e. Agile Scrum) were allocated to a software developer to add this functionality into out back-end tools (convert to Java + JS + React) which made it accessible company wide. Instead of waiting for my team (integrations) to run the audits, people could now, via front-end, click and use drop-downs to run an audit they needed at any time.

This was one of my favorite personal projects, and one that made a lot of difference for multiple teams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment