Skip to content

Instantly share code, notes, and snippets.

@timothyrenner
Last active March 13, 2018 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timothyrenner/2710e3a6798f15ece5769b2259f94a16 to your computer and use it in GitHub Desktop.
Save timothyrenner/2710e3a6798f15ece5769b2259f94a16 to your computer and use it in GitHub Desktop.
Example of a coroutine that uses the scroll API to allow async scrolling for Elasticsearch queries.
import aiohttp
import asyncio
import json
from toolz import get_in, assoc, concat
url = "http://localhost:9200"
index = "stocks"
doc_type = "stock"
query = {
"query": {
"bool": {
"filter": [
{"term": {
"symbol.keyword": "EXPE"
}},
{"range": {
"open": {
"gt": 75.0,
"lt": 100.0
}
}}
]
}
}
}
async def async_scan(query, scroll_size):
# all_hits accumulates the result.
all_hits = []
async with aiohttp.ClientSession() as session:
# Create the primary scroll request.
scroll_id_request = assoc(
query, "size", scroll_size
)
async with session.post(
"{}/{}/{}/_search".format(url, index, doc_type),
params={"scroll": "1m"},
json=scroll_id_request
) as response:
read_response = await response.json()
scroll_id = read_response["_scroll_id"]
hits = get_in(["hits", "hits"], read_response, [])
all_hits.extend(hits)
# Loop until there are no more hits returned.
while hits:
async with session.post(
"{}/_search/scroll".format(url),
json={
"scroll": "1m",
"scroll_id": scroll_id
}
) as response:
read_response = await response.json()
hits = get_in(["hits", "hits"], read_response, [])
all_hits.extend(hits)
return all_hits
event_loop = asyncio.get_event_loop()
tasks = asyncio.ensure_future(async_scan(query, 10))
# Returns a list.
results = event_loop.run_until_complete(tasks)
print("Number of results: {}.".format(len(results)))
print("First result: {}.".format(json.dumps(results[0])))
event_loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment