Created
February 9, 2017 20:44
-
-
Save pjanzen/23c82407b729aaa58911995db918f323 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: UTF-8 -*- | |
import sys | |
from datetime import datetime, timedelta | |
import elasticsearch | |
from elasticsearch import helpers | |
import logging | |
class App: | |
def __init__(self): | |
self.logger = logging.getLogger('mergeAccountDetails') | |
self.logger.setLevel(logging.INFO) | |
# create file handler which logs even debug messages | |
fh = logging.FileHandler('/var/log/mergeAccountDetails.log') | |
fh.setLevel(logging.DEBUG) | |
# create console handler and set level to debug | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# add formatter to ch and fh | |
ch.setFormatter(formatter) | |
fh.setFormatter(formatter) | |
# add ch to logger | |
self.logger.addHandler(ch) | |
self.logger.addHandler(fh) | |
""" Setup the correct index. """ | |
utc_index = datetime.utcnow() - timedelta(days=0) | |
utc_index = utc_index.strftime("%Y.%m.%d") | |
self.query1 = { | |
"query": { | |
"bool": { | |
"must": [ | |
{ | |
"query_string": { | |
"analyze_wildcard": "true", | |
"query": "*" | |
} | |
}, | |
{ | |
"range": { | |
"@timestamp": { | |
"gte": "now-1d/d", | |
"lte": "now/d" | |
} | |
} | |
} | |
], | |
"must_not": [] | |
} | |
} | |
} | |
self.index = "account_details-{}".format(utc_index) | |
self.search_size = 1000 | |
def collect_data(self): | |
es = elasticsearch.Elasticsearch(['esearchr1.xxx.net', | |
'esearchr2.xxx.net', | |
'esearchr3.xxx.net'], | |
http_auth=('elastic', 'changeme'), timeout=60) | |
try: | |
es.ping() | |
except Exception, e: | |
print e | |
page = es.search(index=self.index, scroll='2m', size=self.search_size, body=self.query1) | |
sid = page['_scroll_id'] | |
scroll_size = page['hits']['total'] | |
batches = (scroll_size / self.search_size) | |
self.logger.info("{} getting {} documents, processing them in {} batches.".format(self.index, scroll_size, batches)) | |
while scroll_size > 0: | |
try: | |
page = es.scroll(scroll_id=sid, scroll='2m') | |
sid = page['_scroll_id'] | |
scroll_size = len(page['hits']['hits']) | |
tmp = [] | |
dd = [] | |
for hit in page['hits']['hits']: | |
d_doc_id = hit['_id'] | |
ddata = {'_op_type': 'delete', '_id': str(d_doc_id), '_index': self.index, '_type': 'reporting_clone'} | |
dd.append(ddata) | |
data = {'doc': {'timestamp': str(datetime.utcnow().isoformat("T") + "Z")}, 'doc_as_upsert': True, | |
'_type': 'reporting', '_index': 'rep_acct_details', '_op_type': 'update'} | |
try: | |
data['_id'] = str(hit['_source']['email']) | |
except KeyError: | |
continue | |
for field in hit['_source']: | |
if field.startswith('@') or field == 'tags' or field == 'program' or field == 'type': | |
continue | |
else: | |
data['doc'][str(field)] = str(hit['_source'][field]) | |
tmp.append(data) | |
try: | |
helpers.bulk(es, dd) | |
es.indices.refresh(index=self.index) | |
except Exception, e: | |
self.logger.exception(e) | |
es.indices.refresh(index='rep_acct_details') | |
try: | |
for success, info in helpers.parallel_bulk(es, tmp, thread_count=4): | |
if not success: | |
self.logger.error("Doc failed: {}".format(info)) | |
except elasticsearch.helpers.BulkIndexError, e: | |
# self.logger.exception(e) | |
continue | |
except elasticsearch.NotFoundError, e: | |
self.logger.exception(e) | |
sys.exit() | |
except Exception, e: | |
self.logger.exception(e) | |
sys.exit() | |
self.logger.info("Refreshing indexes rep_acct_details and {}".format(self.index)) | |
es.indices.refresh(index='rep_acct_details') | |
es.indices.refresh(index=self.index) | |
self.logger.info("All done.") | |
if __name__ == '__main__': | |
run = App() | |
run.collect_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment