Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Minimal Working example of Elasticsearch scrolling using Python client
# Initialize the scroll
page = es.search(
index = 'yourIndex',
doc_type = 'yourType',
scroll = '2m',
search_type = 'scan',
size = 1000,
body = {
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll = '2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: " + str(scroll_size)
# Do something with the obtained page
@henrikno
Copy link

henrikno commented Sep 27, 2017

You should also clear the scroll when done to free memory in Elasticsearch. Otherwise it will keep the memory until the scroll timeout.
E.g. es.clear_scroll(body={'scroll_id': [sid]}, ignore=(404, ))

@NinaSalimi
Copy link

NinaSalimi commented Oct 21, 2017

Very helpful! thanks

@alanwds
Copy link

alanwds commented Jan 5, 2018

+1 work's like a charm. Thank you!

@jjjbushjjj
Copy link

jjjbushjjj commented Jan 18, 2018

+100 Thank you!

@hardikgw
Copy link

hardikgw commented Feb 2, 2018

in 6.x no need to es.clear_scroll the default is clear_scroll=True

@krishthotem
Copy link

krishthotem commented Feb 20, 2018

I have something like this in my python program:

response = requests.get('https://example.com/_search?q=@version:2&scroll=2m')

data = json.loads(response.text) 

sid = data['_scroll_id']
scroll_size = data['hits']['total']

// Until this line it works as expected

while (scroll_size > 0):
 data = {"scroll" : "2m", "scroll_id" :  sid}
 response = requests.post('https://example.com/_search/scroll', data=data)
 data = json.loads(response.text) 
 # do something
 sid = data['_scroll_id']

// But here in requests.post() line, I get error: {'message': 'Not Found', 'code': 404}

Any thoughts on what I am doing wrong and what should be changed? Thanks!

@evrycollin
Copy link

evrycollin commented Apr 4, 2018

Working with python generator

works with ES >= 5

1. utility generator method

def scroll(index, doc_type, query_body, page_size=100, debug=False, scroll='2m'):
    page = es.search(index=index, doc_type=doc_type, scroll=scroll, size=page_size, body=query_body)
    sid = page['_scroll_id']
    scroll_size = page['hits']['total']
    total_pages = math.ceil(scroll_size/page_size)
    page_counter = 0
    if debug: 
        print('Total items : {}'.format(scroll_size))
        print('Total pages : {}'.format( math.ceil(scroll_size/page_size) ) )
    # Start scrolling
    while (scroll_size > 0):
        # Get the number of results that we returned in the last scroll
        scroll_size = len(page['hits']['hits'])
        if scroll_size>0:
            if debug: 
                print('> Scrolling page {} : {} items'.format(page_counter, scroll_size))
            yield total_pages, page_counter, scroll_size, page
        # get next page
        page = es.scroll(scroll_id = sid, scroll = '2m')
        page_counter += 1
        # Update the scroll ID
        sid = page['_scroll_id']

Usage :

index = 'cases_*'
doc_type = 'detail'
query = { "query": { "match_all": {} }, "_source": ['caseId'] }
page_size = 1000

for total_pages, page_counter, page_items, page_data in scroll(index, doc_type, query, page_size=page_size):
    print('total_pages={}, page_counter={}, page_items={}'.format(total_pages, page_counter, page_items))
    # do what you need with page_data

@mikej165
Copy link

mikej165 commented Apr 6, 2018

+1 Saved me a lot of time. Thanks!

@abdulwahid24
Copy link

abdulwahid24 commented Apr 13, 2018

Thank you @evrycollin +1

@AnalystNidhi
Copy link

AnalystNidhi commented Apr 24, 2018

In my project requirement, I need to fetch more than 10k documents. I used ElasticSearch scroll api with python to do that. Here is my sample code -

url = 'http://hostname:portname/_search/scroll'
scroll_url='http://hostname:portname//_search?scroll=2m'
Query= {"query": {"bool": {"must": [{"match_all": { }},{ "range": { "@timestamp": { "gt": "now-24h", "lt": "now-1h", "time_zone": "-06:00" } } }],"must_not": [ ],"should": [ ]}},"from": 0,"size":10,"sort": [ ],"aggs": { }}
response=requests.post(scroll_url, json=query)
sid = response['_scroll_id']
hits=response['hits']
total=hits["total"]
while(total>0):
scroll = '2m'
scroll_query=json.dumps({"scroll" : scroll, "scroll_id" : sid })
response1=rquests.post(url,data=scroll_query)

sid = response1['_scroll_id']

hits=response1['hits']
total=len(response1['hits']['hits'])
for each in hits['hits']:

Scroll work perfect the way I wanted to, but later I was informed that because of this scroll elasticsearch schema got corrupted and it recreated the indexes.

Is it true that scroll modify the ES structure or something wrong with my code. Please let me know.

@rain1024
Copy link

rain1024 commented Jun 22, 2018

+1 helpful

@NguyenHauHN
Copy link

NguyenHauHN commented Jul 12, 2018

Thank you so much!

@tade0726
Copy link

tade0726 commented Jul 19, 2018

@muelli TKS, you are brilliant, hope more people checkout this api !

@chamalis
Copy link

chamalis commented Aug 8, 2018

ES 6.3. This example makes my Elasticsearch service to crash, trying to scroll 110k documents with size=10000, at somewhere between 5th-7th iterations.

systemctl status elasticsearch

 elasticsearch.service - Elasticsearch
   Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: enabled)
   Active: failed (Result: exit-code) since Wed 2018-08-08 20:58:10 EEST; 21s ago
     Docs: http://www.elastic.co
  Process: 5860 ExecStart=/usr/share/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet (code=exited, status=127)
 Main PID: 5860 (code=exited, status=127)

Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112)
Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86)
Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124)
Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.cli.Command.main(Command.java:90)
Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92)
Aug 08 20:57:18 myhost elasticsearch[5860]:         at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:85)
Aug 08 20:57:18 myhost elasticsearch[5860]: 2018-08-08 20:57:18,490 main ERROR Null object returned for RollingFile in Appenders.
Aug 08 20:57:18 myhost elasticsearch[5860]: 2018-08-08 20:57:18,491 main ERROR Unable to locate appender "rolling" for logger config "root"
Aug 08 20:58:10 myhost systemd[1]: elasticsearch.service: Main process exited, code=exited, status=127/n/a
Aug 08 20:58:10 myhost systemd[1]: elasticsearch.service: Failed with result 'exit-code'.

No logs in /var/log/elasticsearch/elasticsearch.log

@venkz
Copy link

venkz commented Sep 28, 2018

Thanks for making a simple example, very useful.

For others who use this example, keep in mind that the initial es.search not only returns the first scroll_id that you'll use for scrolling, but also contains hits that you'll want to process before initiating your first scroll. For most people this is probably obvious, but for the 'challenged' (like me), be sure to do something like:


page = es.search(
.....
    })

sid = page['_scroll_id']
scroll_size = page['hits']['total']

#before you scroll, process your current batch of hits  
for hit in page['hits']['hits']:
    do_stuff

  # Start scrolling
while (scroll_size > 0)
...

Excellent! Important point to keep in mind 👍

@sibblegp
Copy link

sibblegp commented Oct 15, 2018

This is extremely slow for me. I used elasticsearch.helpers.scan instead and not only did it not crash my server, but it was much faster.

@tomaszhlawiczka
Copy link

tomaszhlawiczka commented Oct 22, 2018

This is extremely slow for me. I used elasticsearch.helpers.scan instead and not only did it not crash my server, but it was much faster.

@sibblegp please see: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/breaking_50_search_changes.html#_literal_search_type_scan_literal_removed

Scroll requests sorted by _doc have been optimized to more efficiently resume from where the previous request stopped, so this will have the same performance characteristics as the former scan search type.

@vadirajjahagirdar
Copy link

vadirajjahagirdar commented Oct 23, 2018

Thanks a lot!!

@dtwh
Copy link

dtwh commented Nov 20, 2018

nice

@croepke
Copy link

croepke commented Dec 8, 2018

Many thanks! Very handy!

@akras-apixio
Copy link

akras-apixio commented Dec 21, 2018

Warning. This code has a bug, it will throw out first search result (aka first 1000 items). A co-worker of mine copy pasted this causing us to waste a few hours.

@mybluedog24
Copy link

mybluedog24 commented Mar 8, 2019

This code doesn't work anymore in ES 6.4. I found another solution here: https://stackoverflow.com/questions/28537547/how-to-correctly-check-for-scroll-end

response = es.search(
    index='index_name',
    body=<your query here>,
    scroll='10m'
)
scroll_id = response['_scroll_id']

while len(response['hits']['hits']):
    # process results
    print([item["_id"] for item in response["hits"]["hits"]])
    response = es.scroll(scroll_id=scroll_id, scroll='10m')

Process the result right at the beginning of the while loop to avoid missing the first search result.

@feydan
Copy link

feydan commented Apr 9, 2019

The scroll id can change: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

The initial search request and each subsequent scroll request each return a _scroll_id. While the _scroll_id may change between requests, it doesn’t always change — in any case, only the most recently received _scroll_id should be used.

Here is a simplified version that will work if the scroll id changes

response = es.search(
    index='index_name',
    body=<your query here>,
    scroll='10m'
)

while len(response['hits']['hits']):
    # process results
    print([item["_id"] for item in response["hits"]["hits"]])
    response = es.scroll(scroll_id=response['_scroll_id'], scroll='10m')

@Bernardoow
Copy link

Bernardoow commented Dec 6, 2019

Thanks @feydan !

@flavienbwk
Copy link

flavienbwk commented Mar 5, 2020

Great thanks @feydan !

@manohar0079
Copy link

manohar0079 commented Apr 17, 2020

any such method in ruby?

@omarcsejust
Copy link

omarcsejust commented Aug 6, 2020

Thanks

@eavilesmejia
Copy link

eavilesmejia commented Feb 17, 2021

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

@derekangziying
Copy link

derekangziying commented Feb 22, 2021

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

Hi eavilesmejia,

I've used your code but am facing empty results when I am writing with a python open ().

I'm trying to basically extract out a portion of the source and write them into a text file.

The paths and configuration of the log file is all setup, when I use es.search as opposed to helpers.scan the code works and writes to my text file fine. But because of the 10k limit issue, I'm looking at helpers.scan.

g = open (LOG, 'a+')

for row in response:
     g.write ('$')
     g.write(row['_source']['messagetype']
     g.write ('$')
     g.write ('\n')
g.close

The following code snippet above returns nothing onto my text file.

Do you mind testing on your set up to write to a logfile and share your code?

@eavilesmejia
Copy link

eavilesmejia commented Feb 22, 2021

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

Hi eavilesmejia,

I've used your code but am facing empty results when I am writing with a python open ().

I'm trying to basically extract out a portion of the source and write them into a text file.

The paths and configuration of the log file is all setup, when I use es.search as opposed to helpers.scan the code works and writes to my text file fine. But because of the 10k limit issue, I'm looking at helpers.scan.

g = open (LOG, 'a+')

for row in response:
     g.write ('$')
     g.write(row['_source']['messagetype']
     g.write ('$')
     g.write ('\n')
g.close

The following code snippet above returns nothing onto my text file.

Do you mind testing on your set up to write to a logfile and share your code?

Hi @derekangziying

I have tested it in a code very similar to:

import csv
from elasticsearch import Elasticsearch, helpers

def main()
    with open("/tmp/yesterday-all-events.csv", "w") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=[], extrasaction='ignore')
            for i, row in enumerate(get_scrolled_query()):
                if i == 0:
                    writer.fieldnames = list(filter(lambda x: x.startswith('ca.'), row['_source'].keys()))
                    writer.writeheader()
                writer.writerow(row["_source"])


def get_scrolled_query():
    es = Elasticsearch('http://localhost:9200')
    return helpers.scan(es,
                        index='my-index', scroll='40m',
                        size=8000,
                        query={
                            "query": {
                                "range": {
                                    "@timestamp": {
                                        "gte": "now-1d/d",
                                        "lt": "now/d"
                                    }
                                }
                            }
                        })


if __name__ == '__main__':
    main()

In my case I am getting all yesterday events and write the result into a CSV file by using DictWriter Class , I wanted to filter the fields that starts with ca. to be used as CSV header, for example fields ca.version, ca.date_time and more that are own my index.

The 10k limit is handled by helpers.scan by doing scroll requests based on size (in this case 8000) until there's not more data to return and finally scroll is cleared by default at the end of the scan process (that's the reason I don't care using '40min' of TTL)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment