Skip to content

Instantly share code, notes, and snippets.

@drorata
Last active February 27, 2024 10:15
Show Gist options
  • Save drorata/146ce50807d16fd4a6aa to your computer and use it in GitHub Desktop.
Save drorata/146ce50807d16fd4a6aa to your computer and use it in GitHub Desktop.
Minimal Working example of Elasticsearch scrolling using Python client
# Initialize the scroll
page = es.search(
index = 'yourIndex',
doc_type = 'yourType',
scroll = '2m',
search_type = 'scan',
size = 1000,
body = {
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll = '2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: " + str(scroll_size)
# Do something with the obtained page
@vadirajjahagirdar
Copy link

Thanks a lot!!

@david0593112
Copy link

nice

@croepke
Copy link

croepke commented Dec 8, 2018

Many thanks! Very handy!

@akras-apixio
Copy link

akras-apixio commented Dec 21, 2018

Warning. This code has a bug, it will throw out first search result (aka first 1000 items). A co-worker of mine copy pasted this causing us to waste a few hours.

@mybluedog24
Copy link

This code doesn't work anymore in ES 6.4. I found another solution here: https://stackoverflow.com/questions/28537547/how-to-correctly-check-for-scroll-end

response = es.search(
    index='index_name',
    body=<your query here>,
    scroll='10m'
)
scroll_id = response['_scroll_id']

while len(response['hits']['hits']):
    # process results
    print([item["_id"] for item in response["hits"]["hits"]])
    response = es.scroll(scroll_id=scroll_id, scroll='10m')

Process the result right at the beginning of the while loop to avoid missing the first search result.

@feydan
Copy link

feydan commented Apr 9, 2019

The scroll id can change: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

The initial search request and each subsequent scroll request each return a _scroll_id. While the _scroll_id may change between requests, it doesn’t always change — in any case, only the most recently received _scroll_id should be used.

Here is a simplified version that will work if the scroll id changes

response = es.search(
    index='index_name',
    body=<your query here>,
    scroll='10m'
)

while len(response['hits']['hits']):
    # process results
    print([item["_id"] for item in response["hits"]["hits"]])
    response = es.scroll(scroll_id=response['_scroll_id'], scroll='10m')

@Bernardoow
Copy link

Thanks @feydan !

@flavienbwk
Copy link

Great thanks @feydan !

@manohar0079
Copy link

any such method in ruby?

@omarcsejust
Copy link

Thanks

@eavilesmejia
Copy link

eavilesmejia commented Feb 17, 2021

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

@derekangziying
Copy link

derekangziying commented Feb 22, 2021

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

Hi eavilesmejia,

I've used your code but am facing empty results when I am writing with a python open ().

I'm trying to basically extract out a portion of the source and write them into a text file.

The paths and configuration of the log file is all setup, when I use es.search as opposed to helpers.scan the code works and writes to my text file fine. But because of the 10k limit issue, I'm looking at helpers.scan.

g = open (LOG, 'a+')

for row in response:
     g.write ('$')
     g.write(row['_source']['messagetype']
     g.write ('$')
     g.write ('\n')
g.close

The following code snippet above returns nothing onto my text file.

Do you mind testing on your set up to write to a logfile and share your code?

@eavilesmejia
Copy link

For a huge query you can use:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch('http://localhost:9200')

# return a generator
response = helpers.scan(es, 
index='yourIndex', 
scroll='10m',
size=1000,
query={
    # query body
})

# iterate documents one by one
for row in response:
    print(row['_source'])

Tested in elasticsearch 7.8 and python3.9 with query hit of ~500k documents

Hi eavilesmejia,

I've used your code but am facing empty results when I am writing with a python open ().

I'm trying to basically extract out a portion of the source and write them into a text file.

The paths and configuration of the log file is all setup, when I use es.search as opposed to helpers.scan the code works and writes to my text file fine. But because of the 10k limit issue, I'm looking at helpers.scan.

g = open (LOG, 'a+')

for row in response:
     g.write ('$')
     g.write(row['_source']['messagetype']
     g.write ('$')
     g.write ('\n')
g.close

The following code snippet above returns nothing onto my text file.

Do you mind testing on your set up to write to a logfile and share your code?

Hi @derekangziying

I have tested it in a code very similar to:

import csv
from elasticsearch import Elasticsearch, helpers

def main()
    with open("/tmp/yesterday-all-events.csv", "w") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=[], extrasaction='ignore')
            for i, row in enumerate(get_scrolled_query()):
                if i == 0:
                    writer.fieldnames = list(filter(lambda x: x.startswith('ca.'), row['_source'].keys()))
                    writer.writeheader()
                writer.writerow(row["_source"])


def get_scrolled_query():
    es = Elasticsearch('http://localhost:9200')
    return helpers.scan(es,
                        index='my-index', scroll='40m',
                        size=8000,
                        query={
                            "query": {
                                "range": {
                                    "@timestamp": {
                                        "gte": "now-1d/d",
                                        "lt": "now/d"
                                    }
                                }
                            }
                        })


if __name__ == '__main__':
    main()

In my case I am getting all yesterday events and write the result into a CSV file by using DictWriter Class , I wanted to filter the fields that starts with ca. to be used as CSV header, for example fields ca.version, ca.date_time and more that are own my index.

The 10k limit is handled by helpers.scan by doing scroll requests based on size (in this case 8000) until there's not more data to return and finally scroll is cleared by default at the end of the scan process (that's the reason I don't care using '40min' of TTL)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment