-
-
Save hmldd/44d12d3a61a8d8077a3091c4ff7b9307 to your computer and use it in GitHub Desktop.
# coding:utf-8 | |
from elasticsearch import Elasticsearch | |
import json | |
# Define config | |
host = "127.0.0.1" | |
port = 9200 | |
timeout = 1000 | |
index = "index" | |
doc_type = "type" | |
size = 1000 | |
body = {} | |
# Init Elasticsearch instance | |
es = Elasticsearch( | |
[ | |
{ | |
'host': host, | |
'port': port | |
} | |
], | |
timeout=timeout | |
) | |
# Process hits here | |
def process_hits(hits): | |
for item in hits: | |
print(json.dumps(item, indent=2)) | |
# Check index exists | |
if not es.indices.exists(index=index): | |
print("Index " + index + " not exists") | |
exit() | |
# Init scroll by search | |
data = es.search( | |
index=index, | |
doc_type=doc_type, | |
scroll='2m', | |
size=size, | |
body=body | |
) | |
# Get the scroll ID | |
sid = data['_scroll_id'] | |
scroll_size = len(data['hits']['hits']) | |
while scroll_size > 0: | |
"Scrolling..." | |
# Before scroll, process current batch of hits | |
process_hits(data['hits']['hits']) | |
data = es.scroll(scroll_id=sid, scroll='2m') | |
# Update the scroll ID | |
sid = data['_scroll_id'] | |
# Get the number of results that returned in the last scroll | |
scroll_size = len(data['hits']['hits']) | |
es.clear_scroll(scroll_id=sid) |
you saved my life, thanks
nice! thx
I have a requirement. In "process_hits", instead of using print, what "containers" can be used to store data instead (if use list or dict to store 10000+ pieces of data, is there a big performance overhead)? I need to get all the data because I need to compare and aggregate the data.
I have a requirement. In "process_hits", instead of using print, what "containers" can be used to store data instead (if use list or dict to store 10000+ pieces of data, is there a big performance overhead)? I need to get all the data because I need to compare and aggregate the data.
It depends on single data size,10000+ pieces of data is a piece of cake for modern computer.
Very helpful. Thank you!
This is a nice example, but it's missing this at the very end after the while loop finishes...
es.clear_scroll(scroll_id=sid)
If you don't do that it leaves the scroll id open and these can build up over time until it reaches the global server side limit and then other queries which rely on scroll can start failing across the board until the open scroll ids eventually timeout.
Thank you for your feedback, I have updated the gist.
Thanks for the gist.
I also found an answer on SOF talking about the helper function scan
that abstracts away the scroll logic.
Thanks for the gist. I also found an answer on SOF talking about the helper function
scan
that abstracts away the scroll logic.
Does scan
just start wherever the server says the scroll_id
is? My concern is that scan
just yields an arbitrary-length generator full of hits, so if your script terminates unexpectedly you could gracefully dump to JSON but you might have no idea where exactly you were in the scroll when you wanted to resume.
amazing!