-
-
Save tonyroberts/a539885e12892dc43935860f75cdfe7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import the modules we're going to use. | |
# You may need to 'pip install aiohttp' | |
from urllib.parse import urlencode | |
import asyncio | |
import aiohttp | |
import json | |
# Test data | |
STREET_ADDRESSES = [ | |
"1600 Pennsylvania Avenue, Washington DC, USA", | |
"11 Wall Street New York, NY", | |
"350 Fifth Avenue New York, NY 10118", | |
"221B Baker St, London, England", | |
"Tour Eiffel Champ de Mars, Paris", | |
"4059 Mt Lee Dr.Hollywood, CA 90068", | |
"Buckingham Palace, London, England", | |
"Statue of Liberty, Liberty Island New York, NY 10004", | |
"Manger Square, Bethlehem, West Bank", | |
"2 Macquarie Street, Sydney" | |
] | |
# Constants for accessing the Geoapify API | |
GEOCODING_BATCH_API = "https://api.geoapify.com/v1/batch/geocode/search" | |
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx" | |
# State for batching requests | |
ADDRESSES_BATCH = [] | |
BATCH_LOOP_RUNNING = False | |
async def process_batches_loop(): | |
global ADDRESSES_BATCH, BATCH_LOOP_RUNNING | |
# Loop while BATCH_LOOP_RUNNING is True | |
while BATCH_LOOP_RUNNING: | |
# Wait for more to be added to the batch | |
await asyncio.sleep(0.1) | |
# If nothing has been added to the batch then continue | |
# to the start of the loop as there's nothing to do. | |
if not ADDRESSES_BATCH: | |
continue | |
# Get the current items from the batch and reset the batch | |
batch = ADDRESSES_BATCH | |
ADDRESSES_BATCH = [] | |
# Get the locations of the current batch | |
addresses = [address for (address, future) in batch] | |
locations = await get_locations(addresses) | |
# Set the results on the futures from this batch | |
for address, future in batch: | |
coords = locations.get(address) | |
future.set_result(coords) | |
async def get_locations(addresses): | |
"""Return a dictionary of address -> (lat, lon).""" | |
# Construct the URL to do the batch request | |
query_string = urlencode({"apiKey": YOUR_API_KEY}) | |
url = f"{GEOCODING_BATCH_API}?{query_string}" | |
# Build the JSON payload for the batch POST request | |
data = json.dumps(addresses) | |
# And use Content-Type: application/json in the headers | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
# Make the POST request to the API | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, data=data, headers=headers) as response: | |
response_json = await response.read() | |
response_data = json.loads(response_json) | |
# The API can return a dict with a pending status if it needs more | |
# time to complete. Poll the API until the result is ready. | |
while isinstance(response_data, dict) \ | |
and response_data.get("status") == "pending": | |
# Wait a bit before calling the API | |
await asyncio.sleep(0.1) | |
# Query the result to see if it's ready yet | |
request_id = response_data.get("id") | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url + f"&id={request_id}") as response: | |
response_json = await response.read() | |
response_data = json.loads(response_json) | |
# Gather the results into a dictionary of address -> (lat, lon) | |
locations = {} | |
for result in response_data: | |
address = result["query"]["text"] | |
coords = result["lat"], result["lon"] | |
locations[address] = coords | |
return locations | |
async def get_location(address): | |
"""Return (latitude, longitude) from an address.""" | |
global BATCH_LOOP_RUNNING | |
# Create a Future that will be set with the location once the | |
# request has completed. | |
loop = asyncio.get_event_loop() | |
future = loop.create_future() | |
# Add the ip address and future to the batch | |
ADDRESSES_BATCH.append((address, future)) | |
# Start 'process_batches' running on the asyncio event loop if it's | |
# not already running. | |
if not BATCH_LOOP_RUNNING: | |
BATCH_LOOP_RUNNING = True | |
asyncio.create_task(process_batches_loop()) | |
# Wait for the batch our address is in to return | |
await future | |
# And return the result | |
return future.result() | |
async def main(): | |
# Print the city for each IP address | |
tasks = [] | |
for address in STREET_ADDRESSES: | |
tasks.append(get_location(address)) | |
# Wait for all tasks to complete | |
locations = await asyncio.gather(*tasks) | |
# Print them all once all requests have completed | |
for address, location in zip(STREET_ADDRESSES, locations): | |
print(f"{address} -> {location}") | |
if __name__ == "__main__": | |
# Because it's an async function we need to run it using the asyncio event loop | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment