Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@pjanzen
Created February 9, 2017 20:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pjanzen/23c82407b729aaa58911995db918f323 to your computer and use it in GitHub Desktop.
Save pjanzen/23c82407b729aaa58911995db918f323 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
from datetime import datetime, timedelta
import elasticsearch
from elasticsearch import helpers
import logging
class App:
def __init__(self):
self.logger = logging.getLogger('mergeAccountDetails')
self.logger.setLevel(logging.INFO)
# create file handler which logs even debug messages
fh = logging.FileHandler('/var/log/mergeAccountDetails.log')
fh.setLevel(logging.DEBUG)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch and fh
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add ch to logger
self.logger.addHandler(ch)
self.logger.addHandler(fh)
""" Setup the correct index. """
utc_index = datetime.utcnow() - timedelta(days=0)
utc_index = utc_index.strftime("%Y.%m.%d")
self.query1 = {
"query": {
"bool": {
"must": [
{
"query_string": {
"analyze_wildcard": "true",
"query": "*"
}
},
{
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
],
"must_not": []
}
}
}
self.index = "account_details-{}".format(utc_index)
self.search_size = 1000
def collect_data(self):
es = elasticsearch.Elasticsearch(['esearchr1.xxx.net',
'esearchr2.xxx.net',
'esearchr3.xxx.net'],
http_auth=('elastic', 'changeme'), timeout=60)
try:
es.ping()
except Exception, e:
print e
page = es.search(index=self.index, scroll='2m', size=self.search_size, body=self.query1)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
batches = (scroll_size / self.search_size)
self.logger.info("{} getting {} documents, processing them in {} batches.".format(self.index, scroll_size, batches))
while scroll_size > 0:
try:
page = es.scroll(scroll_id=sid, scroll='2m')
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
tmp = []
dd = []
for hit in page['hits']['hits']:
d_doc_id = hit['_id']
ddata = {'_op_type': 'delete', '_id': str(d_doc_id), '_index': self.index, '_type': 'reporting_clone'}
dd.append(ddata)
data = {'doc': {'timestamp': str(datetime.utcnow().isoformat("T") + "Z")}, 'doc_as_upsert': True,
'_type': 'reporting', '_index': 'rep_acct_details', '_op_type': 'update'}
try:
data['_id'] = str(hit['_source']['email'])
except KeyError:
continue
for field in hit['_source']:
if field.startswith('@') or field == 'tags' or field == 'program' or field == 'type':
continue
else:
data['doc'][str(field)] = str(hit['_source'][field])
tmp.append(data)
try:
helpers.bulk(es, dd)
es.indices.refresh(index=self.index)
except Exception, e:
self.logger.exception(e)
es.indices.refresh(index='rep_acct_details')
try:
for success, info in helpers.parallel_bulk(es, tmp, thread_count=4):
if not success:
self.logger.error("Doc failed: {}".format(info))
except elasticsearch.helpers.BulkIndexError, e:
# self.logger.exception(e)
continue
except elasticsearch.NotFoundError, e:
self.logger.exception(e)
sys.exit()
except Exception, e:
self.logger.exception(e)
sys.exit()
self.logger.info("Refreshing indexes rep_acct_details and {}".format(self.index))
es.indices.refresh(index='rep_acct_details')
es.indices.refresh(index=self.index)
self.logger.info("All done.")
if __name__ == '__main__':
run = App()
run.collect_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment