-
-
Save hmldd/44d12d3a61a8d8077a3091c4ff7b9307 to your computer and use it in GitHub Desktop.
# coding:utf-8 | |
from elasticsearch import Elasticsearch | |
import json | |
# Define config | |
host = "127.0.0.1" | |
port = 9200 | |
timeout = 1000 | |
index = "index" | |
doc_type = "type" | |
size = 1000 | |
body = {} | |
# Init Elasticsearch instance | |
es = Elasticsearch( | |
[ | |
{ | |
'host': host, | |
'port': port | |
} | |
], | |
timeout=timeout | |
) | |
# Process hits here | |
def process_hits(hits): | |
for item in hits: | |
print(json.dumps(item, indent=2)) | |
# Check index exists | |
if not es.indices.exists(index=index): | |
print("Index " + index + " not exists") | |
exit() | |
# Init scroll by search | |
data = es.search( | |
index=index, | |
doc_type=doc_type, | |
scroll='2m', | |
size=size, | |
body=body | |
) | |
# Get the scroll ID | |
sid = data['_scroll_id'] | |
scroll_size = len(data['hits']['hits']) | |
while scroll_size > 0: | |
"Scrolling..." | |
# Before scroll, process current batch of hits | |
process_hits(data['hits']['hits']) | |
data = es.scroll(scroll_id=sid, scroll='2m') | |
# Update the scroll ID | |
sid = data['_scroll_id'] | |
# Get the number of results that returned in the last scroll | |
scroll_size = len(data['hits']['hits']) | |
es.clear_scroll(scroll_id=sid) |
Thanks
Thanks! 谢谢!
Thanks
it does not work for me, i'm getting this error:
Connected to pydev debugger (build 183.5429.31)
GET http://aaelk:9200/_search/scroll?scroll=2m [status:503 request:0.047s]
GET http://aaelk:9200/_search/scroll?scroll=2m [status:503 request:0.031s]
GET http://aaelk:9200/_search/scroll?scroll=2m [status:503 request:0.031s]
GET http://aaelk:9200/_search/scroll?scroll=2m [status:503 request:0.001s]
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.3.4\helpers\pydev\pydevd.py", line 1741, in
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.3.4\helpers\pydev\pydevd.py", line 1735, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.3.4\helpers\pydev\pydevd.py", line 1135, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.3.4\helpers\pydev_pydev_imps_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "C:/Users/yossih/PycharmProjects/zap/elastic.py", line 60, in
data = es.scroll(scroll_id=sid, scroll='2m')
File "C:\Users\yossih\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\client\utils.py", line 76, in wrapped
return func(*args, params=params, **kwargs)
File "C:\Users\yossih\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\client_init.py", line 1016, in scroll
params=params, body=body)
File "C:\Users\yossih\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\transport.py", line 318, in perform_request
status, headers_response, data = connection.perform_request(method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
File "C:\Users\yossih\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 186, in perform_request
self._raise_error(response.status, raw_data)
File "C:\Users\yossih\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\connection\base.py", line 125, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.TransportError: TransportError(503, '{"_scroll_id":"DnF1ZXJ5VGhlbkZldGNoCgAAAAAAZBRSFmtmNjJjcGctVFJTbVBYZXd6VDlDRUEAAAAAAGQUURZrZjYyY3BnLVRSU21QWGV3elQ5Q0VBAAAAAABkFFkWa2Y2MmNwZy1UUlNtUFhld3pUOUNFQQAAAAAAZBRVFmtmNjJjcGctVFJTbVBYZXd6VDlDRUEAAAAAAGQUWhZrZjYyY3BnLVRSU21QWGV3elQ5Q0VBAAAAAABkFFYWa2Y2MmNwZy1UUlNtUFhld3pUOUNFQQAAAAAAZBRXFmtmNjJjcGctVFJTbVBYZXd6VDlDRUEAAAAAAGQUUxZrZjYyY3BnLVRSU21QWGV3elQ5Q0VBAAAAAABkFFgWa2Y2MmNwZy1UUlNtUFhld3pUOUNFQQAAAAAAZBRUFmtmNjJjcGctVFJTbVBYZXd6VDlDRUE=","took":1,"timed_out":false,"_shards":{"total":10,"successful":0,"failed":0},"hits":{"total":0,"max_score":0.0,"hits":[]}}')
You can remove the 1st call to process_hits if you put the 2nd call to process_hits before es.scroll
This helped me out a lot! I needed to change port to 80 and doc_type is deprecated, but otherwise it's golden.
You can remove the 1st call to process_hits if you put the 2nd call to process_hits before es.scroll
Good idea!
TransportError
Checking if the index is already present or not
Thank you! :)
thanks. insightful.
Thanks, it's really helpful!
Thank you @hmldd. Inspired by your example, I created a Java version here: https://mincong.io/2020/01/19/elasticsearch-scroll-api/
the for sharing, really useful
it doesn't stop, really, it doesnt
cost soooooo much time when search results got a large number
cost soooooo much time when search results got a large number
It's not for search, maybe you need Query, see: https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
Doesn't work for me, I'm getting this error "Unexpected keyword argument 'scroll' in method call" in this line
data = es.search(
index=index,
doc_type=doc_type,
scroll='2m',
size=size,
body=body
)
elasticsearch package for python already installed.
Thanks man.
thx.
amazing!
you saved my life, thanks
nice! thx
I have a requirement. In "process_hits", instead of using print, what "containers" can be used to store data instead (if use list or dict to store 10000+ pieces of data, is there a big performance overhead)? I need to get all the data because I need to compare and aggregate the data.
I have a requirement. In "process_hits", instead of using print, what "containers" can be used to store data instead (if use list or dict to store 10000+ pieces of data, is there a big performance overhead)? I need to get all the data because I need to compare and aggregate the data.
It depends on single data size,10000+ pieces of data is a piece of cake for modern computer.
Very helpful. Thank you!
This is a nice example, but it's missing this at the very end after the while loop finishes...
es.clear_scroll(scroll_id=sid)
If you don't do that it leaves the scroll id open and these can build up over time until it reaches the global server side limit and then other queries which rely on scroll can start failing across the board until the open scroll ids eventually timeout.
Thank you for your feedback, I have updated the gist.
Thanks for the gist.
I also found an answer on SOF talking about the helper function scan
that abstracts away the scroll logic.
Thanks for the gist. I also found an answer on SOF talking about the helper function
scan
that abstracts away the scroll logic.
Does scan
just start wherever the server says the scroll_id
is? My concern is that scan
just yields an arbitrary-length generator full of hits, so if your script terminates unexpectedly you could gracefully dump to JSON but you might have no idea where exactly you were in the scroll when you wanted to resume.
Thanks for sharing.