Skip to content

Instantly share code, notes, and snippets.

@rushirajnenuji
Created October 29, 2018 18:28
Show Gist options
  • Save rushirajnenuji/4f413332d2f39a218f6059e868e6e43c to your computer and use it in GitHub Desktop.
Save rushirajnenuji/4f413332d2f39a218f6059e868e6e43c to your computer and use it in GitHub Desktop.
Analyzing top N datasets for a given time frame from ES
"""
Metrics Analyzer module
Implemented as a falcon web application, https://falcon.readthedocs.io/en/stable/
"""
import json
import falcon
from urllib.parse import urlparse
from urllib.parse import unquote
from urllib.parse import quote_plus
import requests
from d1_metrics.metricselasticsearch import MetricsElasticSearch
from d1_metrics.metricsdatabase import MetricsDatabase
from datetime import datetime
import logging
import time
from d1_metrics_service import pid_resolution
DEFAULT_REPORT_CONFIGURATION={
"solr_query_url": "https://cn.dataone.org/cn/v2/query/solr/?"
}
# List of characters that should be escaped in solr query terms
SOLR_RESERVED_CHAR_LIST = [
'+', '-', '&', '|', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':'
]
class MetricsAnalyzer:
"""
This class performs some analysis on the Metrics from the ES
"""
def __init__(self):
self._config = DEFAULT_REPORT_CONFIGURATION
self.request = {}
self.response = {}
self.logger = logging.getLogger('metrics_service.' + __name__)
def get_top_n_datasets(self, n):
"""
This method parses the filters of the
MetricsRequest object
:param: n - TOP N most popular datasets
:return: MetricsResponse Object
"""
metrics_elastic_search = MetricsElasticSearch()
metrics_elastic_search.connect()
search_body = [
{
"term": {"event.key": "read"}
},
{
"exists": {
"field": "sessionId"
}
},
{
"terms": {
"formatType": [
"DATA",
"METADATA"
]
}
}
]
aggregation_body = {
"pid_list": {
"terms" : {
"size": 1000,
"field" : "pid.key",
"order" : { "_count" : "desc"}
}
}
}
data = metrics_elastic_search.get_aggregations(query=search_body,
aggQuery=aggregation_body,
date_start=datetime.strptime("09/25/2018", '%m/%d/%Y'),
date_end=datetime.strptime("10/25/2018", '%m/%d/%Y'))
self.get_resource_map_stats(data, n)
def get_resource_map_stats(self, data, n):
dict = {}
solr_results = {}
count = 0
for j in data["aggregations"]["pid_list"]["buckets"]:
solr_results = self.query_solr(j["key"])
try:
if len(solr_results) > 0:
for i in solr_results[0]["resourceMap"]:
if i in dict:
metric = "downloads" if solr_results[0]["formatType"] == "DATA" else "views"
dict[i][metric] = dict[i][metric] + j["doc_count"]
else:
dict[i] = {}
dict[i]["downloads"] = 0
dict[i]["views"] = 0
dict[i][("downloads" if solr_results[0]["formatType"] == "DATA" else "views")] = j["doc_count"]
# break
# we only want single resource map per dataset
break
except KeyError as e:
count += 1
with open('error.text', 'a') as err:
err.write(str(count))
err.write("\nKey - error : \t")
err.write(str(e))
err.close()
except IndexError as I:
count += 1
with open('error.text', 'a') as err:
err.write(str(count))
err.write("\nIndex - error : \t")
err.write(str(I))
err.close()
if (len(dict) >= n):
break
with open('analyzer.json', 'w') as outfile:
outfile.write(json.dumps(dict, indent=2))
outfile.close()
def query_solr(self, pid):
session = requests.Session()
params = {'wt': 'json',
'fl': 'id,formatType,resourceMap',
'q.op': 'OR',
}
params['q'] = "id:" + self.quoteTerm(pid)
response = session.get(self._config["solr_query_url"], params=params)
if(response.status_code == 200):
try:
res = response.json()
return(res["response"]["docs"])
except KeyError as e:
print(e)
def quoteTerm(self,term):
'''
Return a quoted, escaped Solr query term
Args:
term: (string) term to be escaped and quoted
Returns: (string) quoted, escaped term
'''
return '"' + self.escapeSolrQueryTerm(term) + '"'
def escapeSolrQueryTerm(self,term):
'''
Escape a solr query term for solr reserved characters
Args:
term: query term to be escaped
Returns: string, the escaped query term
'''
term = term.replace('\\', '\\\\')
for c in SOLR_RESERVED_CHAR_LIST:
term = term.replace(c, '\{}'.format(c))
return term
if __name__ == "__main__":
mr = MetricsAnalyzer()
mr.get_top_n_datasets(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment