Last active
March 13, 2018 12:40
-
-
Save timothyrenner/2710e3a6798f15ece5769b2259f94a16 to your computer and use it in GitHub Desktop.
Example of a coroutine that uses the scroll API to allow async scrolling for Elasticsearch queries.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import json | |
from toolz import get_in, assoc, concat | |
url = "http://localhost:9200" | |
index = "stocks" | |
doc_type = "stock" | |
query = { | |
"query": { | |
"bool": { | |
"filter": [ | |
{"term": { | |
"symbol.keyword": "EXPE" | |
}}, | |
{"range": { | |
"open": { | |
"gt": 75.0, | |
"lt": 100.0 | |
} | |
}} | |
] | |
} | |
} | |
} | |
async def async_scan(query, scroll_size): | |
# all_hits accumulates the result. | |
all_hits = [] | |
async with aiohttp.ClientSession() as session: | |
# Create the primary scroll request. | |
scroll_id_request = assoc( | |
query, "size", scroll_size | |
) | |
async with session.post( | |
"{}/{}/{}/_search".format(url, index, doc_type), | |
params={"scroll": "1m"}, | |
json=scroll_id_request | |
) as response: | |
read_response = await response.json() | |
scroll_id = read_response["_scroll_id"] | |
hits = get_in(["hits", "hits"], read_response, []) | |
all_hits.extend(hits) | |
# Loop until there are no more hits returned. | |
while hits: | |
async with session.post( | |
"{}/_search/scroll".format(url), | |
json={ | |
"scroll": "1m", | |
"scroll_id": scroll_id | |
} | |
) as response: | |
read_response = await response.json() | |
hits = get_in(["hits", "hits"], read_response, []) | |
all_hits.extend(hits) | |
return all_hits | |
event_loop = asyncio.get_event_loop() | |
tasks = asyncio.ensure_future(async_scan(query, 10)) | |
# Returns a list. | |
results = event_loop.run_until_complete(tasks) | |
print("Number of results: {}.".format(len(results))) | |
print("First result: {}.".format(json.dumps(results[0]))) | |
event_loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment