Created
October 29, 2018 18:28
-
-
Save rushirajnenuji/4f413332d2f39a218f6059e868e6e43c to your computer and use it in GitHub Desktop.
Analyzing top N datasets for a given time frame from ES
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Metrics Analyzer module | |
Implemented as a falcon web application, https://falcon.readthedocs.io/en/stable/ | |
""" | |
import json | |
import falcon | |
from urllib.parse import urlparse | |
from urllib.parse import unquote | |
from urllib.parse import quote_plus | |
import requests | |
from d1_metrics.metricselasticsearch import MetricsElasticSearch | |
from d1_metrics.metricsdatabase import MetricsDatabase | |
from datetime import datetime | |
import logging | |
import time | |
from d1_metrics_service import pid_resolution | |
DEFAULT_REPORT_CONFIGURATION={ | |
"solr_query_url": "https://cn.dataone.org/cn/v2/query/solr/?" | |
} | |
# List of characters that should be escaped in solr query terms | |
SOLR_RESERVED_CHAR_LIST = [ | |
'+', '-', '&', '|', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':' | |
] | |
class MetricsAnalyzer: | |
""" | |
This class performs some analysis on the Metrics from the ES | |
""" | |
def __init__(self): | |
self._config = DEFAULT_REPORT_CONFIGURATION | |
self.request = {} | |
self.response = {} | |
self.logger = logging.getLogger('metrics_service.' + __name__) | |
def get_top_n_datasets(self, n): | |
""" | |
This method parses the filters of the | |
MetricsRequest object | |
:param: n - TOP N most popular datasets | |
:return: MetricsResponse Object | |
""" | |
metrics_elastic_search = MetricsElasticSearch() | |
metrics_elastic_search.connect() | |
search_body = [ | |
{ | |
"term": {"event.key": "read"} | |
}, | |
{ | |
"exists": { | |
"field": "sessionId" | |
} | |
}, | |
{ | |
"terms": { | |
"formatType": [ | |
"DATA", | |
"METADATA" | |
] | |
} | |
} | |
] | |
aggregation_body = { | |
"pid_list": { | |
"terms" : { | |
"size": 1000, | |
"field" : "pid.key", | |
"order" : { "_count" : "desc"} | |
} | |
} | |
} | |
data = metrics_elastic_search.get_aggregations(query=search_body, | |
aggQuery=aggregation_body, | |
date_start=datetime.strptime("09/25/2018", '%m/%d/%Y'), | |
date_end=datetime.strptime("10/25/2018", '%m/%d/%Y')) | |
self.get_resource_map_stats(data, n) | |
def get_resource_map_stats(self, data, n): | |
dict = {} | |
solr_results = {} | |
count = 0 | |
for j in data["aggregations"]["pid_list"]["buckets"]: | |
solr_results = self.query_solr(j["key"]) | |
try: | |
if len(solr_results) > 0: | |
for i in solr_results[0]["resourceMap"]: | |
if i in dict: | |
metric = "downloads" if solr_results[0]["formatType"] == "DATA" else "views" | |
dict[i][metric] = dict[i][metric] + j["doc_count"] | |
else: | |
dict[i] = {} | |
dict[i]["downloads"] = 0 | |
dict[i]["views"] = 0 | |
dict[i][("downloads" if solr_results[0]["formatType"] == "DATA" else "views")] = j["doc_count"] | |
# break | |
# we only want single resource map per dataset | |
break | |
except KeyError as e: | |
count += 1 | |
with open('error.text', 'a') as err: | |
err.write(str(count)) | |
err.write("\nKey - error : \t") | |
err.write(str(e)) | |
err.close() | |
except IndexError as I: | |
count += 1 | |
with open('error.text', 'a') as err: | |
err.write(str(count)) | |
err.write("\nIndex - error : \t") | |
err.write(str(I)) | |
err.close() | |
if (len(dict) >= n): | |
break | |
with open('analyzer.json', 'w') as outfile: | |
outfile.write(json.dumps(dict, indent=2)) | |
outfile.close() | |
def query_solr(self, pid): | |
session = requests.Session() | |
params = {'wt': 'json', | |
'fl': 'id,formatType,resourceMap', | |
'q.op': 'OR', | |
} | |
params['q'] = "id:" + self.quoteTerm(pid) | |
response = session.get(self._config["solr_query_url"], params=params) | |
if(response.status_code == 200): | |
try: | |
res = response.json() | |
return(res["response"]["docs"]) | |
except KeyError as e: | |
print(e) | |
def quoteTerm(self,term): | |
''' | |
Return a quoted, escaped Solr query term | |
Args: | |
term: (string) term to be escaped and quoted | |
Returns: (string) quoted, escaped term | |
''' | |
return '"' + self.escapeSolrQueryTerm(term) + '"' | |
def escapeSolrQueryTerm(self,term): | |
''' | |
Escape a solr query term for solr reserved characters | |
Args: | |
term: query term to be escaped | |
Returns: string, the escaped query term | |
''' | |
term = term.replace('\\', '\\\\') | |
for c in SOLR_RESERVED_CHAR_LIST: | |
term = term.replace(c, '\{}'.format(c)) | |
return term | |
if __name__ == "__main__": | |
mr = MetricsAnalyzer() | |
mr.get_top_n_datasets(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment