Last active
August 28, 2022 15:20
-
-
Save ywzhang909/c3c17b47eec8a6c8b00077b3add176ca to your computer and use it in GitHub Desktop.
elasticsearch toolkit #elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from elasticsearch import Elasticsearch | |
from elasticsearch import helpers | |
class ESmanager(): | |
def __init__(self, host, port): | |
self._host = host | |
self._port = port | |
self.__elasticsearch = Elasticsearch([{'host': self._host, 'port': self._port}]) | |
self._connectES() | |
def getConnector(self): | |
return self.__elasticsearch | |
@classmethod | |
def fromURL(cls, url): | |
url_change = urlparse(url) | |
return cls(url_change.netloc, url_change.port) | |
def createIndex(self, indexname, indexbody): | |
if self.__elasticsearch.indices.exists(index=indexname) is not True: | |
self.__elasticsearch.indices.create(index=indexname, body=indexbody) | |
def insertDocment(self, indexname, typename, docbody): | |
if self.__elasticsearch.indices.exists(indexname) is True: | |
self.__elasticsearch.index(index=indexname, doc_type=typename, body=docbody, id=None) | |
def deleteDocment(self, indexname, typename, idvalue): | |
if self.__elasticsearch.exists(indexname, typename, idvalue) is True: | |
self.__elasticsearch.delete(index=indexname, doc_type=typename, id=idvalue) | |
def getDocment(self, indexname, typename, idvalue): | |
result = None | |
if self.__elasticsearch.exists(indexname, typename, idvalue) is True: | |
result = self.__elasticsearch.get_source(index=indexname, doc_type=typename, id=idvalue) | |
return result | |
def updateDocment(self, indexname, typename, idvalue, updatebody): | |
result = None | |
if self.__elasticsearch.exists(indexname, typename, idvalue) is True: | |
result = self.__elasticsearch.update(index=indexname, doc_type=typename, id=idvalue, body=updatebody) | |
return result | |
def searchDocument(self, indexname, typename, body): | |
result = None | |
if self.__elasticsearch.indices.exists(indexname) is True: | |
result = self.__elasticsearch.search(index=indexname, doc_type=typename, body=body) | |
return result | |
def search_scroll(self, index, doc_type, query): | |
try: | |
return self.__elasticsearch.search(index=index, doc_type=doc_type, body=query, search_type="query_then_fetch", scroll="1m") | |
except BaseException as e: | |
print(str(e)) | |
pass | |
return {} | |
def scroll_scan(self, query): | |
try: | |
resJson = self.__elasticsearch.scroll(body=query) | |
return resJson | |
except BaseException as e: | |
print(str(e)) | |
pass | |
return [] | |
def delete_by_query(self, index, doc_type, query): | |
try: | |
resJson = self.__elasticsearch.delete_by_query(index=index, doc_type=doc_type, body=query) | |
return resJson.get('deleted') | |
except BaseException as e: | |
print('delete fail') | |
pass | |
return 0 | |
def update_by_query(self, index, doc_type, query): | |
try: | |
resJson = self.__elasticsearch.update_by_query(index=index, doc_type=doc_type, body=query) | |
return resJson.get('updated') | |
except BaseException as e: | |
pass | |
return 0 | |
def insert_bulk(self, data_lst): | |
try: | |
return helpers.bulk(self.__elasticsearch, data_lst, request_timeout=60) | |
except BaseException as e: | |
return [0, []] | |
if __name__ == '__main__': | |
es = ESmanager("192.168.10.31", "9200") | |
query={ | |
"query": { | |
"match_all": {} | |
} | |
} | |
res = es.search_scroll('es_news', '_doc', query) | |
hits = res.get('hits') | |
for hit in hits.get('hits'): | |
print(hit) | |
while res.get('_scroll_id') and hits.get('total').get('value'): | |
res = es.scroll_scan({'scroll': '1m', 'scroll_id': res.get('_scroll_id')}) | |
hits = res.get('hits') | |
for hit in hits.get('hits'): | |
print(hit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment