Skip to content

Instantly share code, notes, and snippets.

@ywzhang909
Last active August 28, 2022 15:20
Show Gist options
  • Save ywzhang909/c3c17b47eec8a6c8b00077b3add176ca to your computer and use it in GitHub Desktop.
Save ywzhang909/c3c17b47eec8a6c8b00077b3add176ca to your computer and use it in GitHub Desktop.
elasticsearch toolkit #elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class ESmanager():
def __init__(self, host, port):
self._host = host
self._port = port
self.__elasticsearch = Elasticsearch([{'host': self._host, 'port': self._port}])
self._connectES()
def getConnector(self):
return self.__elasticsearch
@classmethod
def fromURL(cls, url):
url_change = urlparse(url)
return cls(url_change.netloc, url_change.port)
def createIndex(self, indexname, indexbody):
if self.__elasticsearch.indices.exists(index=indexname) is not True:
self.__elasticsearch.indices.create(index=indexname, body=indexbody)
def insertDocment(self, indexname, typename, docbody):
if self.__elasticsearch.indices.exists(indexname) is True:
self.__elasticsearch.index(index=indexname, doc_type=typename, body=docbody, id=None)
def deleteDocment(self, indexname, typename, idvalue):
if self.__elasticsearch.exists(indexname, typename, idvalue) is True:
self.__elasticsearch.delete(index=indexname, doc_type=typename, id=idvalue)
def getDocment(self, indexname, typename, idvalue):
result = None
if self.__elasticsearch.exists(indexname, typename, idvalue) is True:
result = self.__elasticsearch.get_source(index=indexname, doc_type=typename, id=idvalue)
return result
def updateDocment(self, indexname, typename, idvalue, updatebody):
result = None
if self.__elasticsearch.exists(indexname, typename, idvalue) is True:
result = self.__elasticsearch.update(index=indexname, doc_type=typename, id=idvalue, body=updatebody)
return result
def searchDocument(self, indexname, typename, body):
result = None
if self.__elasticsearch.indices.exists(indexname) is True:
result = self.__elasticsearch.search(index=indexname, doc_type=typename, body=body)
return result
def search_scroll(self, index, doc_type, query):
try:
return self.__elasticsearch.search(index=index, doc_type=doc_type, body=query, search_type="query_then_fetch", scroll="1m")
except BaseException as e:
print(str(e))
pass
return {}
def scroll_scan(self, query):
try:
resJson = self.__elasticsearch.scroll(body=query)
return resJson
except BaseException as e:
print(str(e))
pass
return []
def delete_by_query(self, index, doc_type, query):
try:
resJson = self.__elasticsearch.delete_by_query(index=index, doc_type=doc_type, body=query)
return resJson.get('deleted')
except BaseException as e:
print('delete fail')
pass
return 0
def update_by_query(self, index, doc_type, query):
try:
resJson = self.__elasticsearch.update_by_query(index=index, doc_type=doc_type, body=query)
return resJson.get('updated')
except BaseException as e:
pass
return 0
def insert_bulk(self, data_lst):
try:
return helpers.bulk(self.__elasticsearch, data_lst, request_timeout=60)
except BaseException as e:
return [0, []]
if __name__ == '__main__':
es = ESmanager("192.168.10.31", "9200")
query={
"query": {
"match_all": {}
}
}
res = es.search_scroll('es_news', '_doc', query)
hits = res.get('hits')
for hit in hits.get('hits'):
print(hit)
while res.get('_scroll_id') and hits.get('total').get('value'):
res = es.scroll_scan({'scroll': '1m', 'scroll_id': res.get('_scroll_id')})
hits = res.get('hits')
for hit in hits.get('hits'):
print(hit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment