Skip to content

Instantly share code, notes, and snippets.

@tonyroberts
Last active Sep 27, 2022
Embed
What would you like to do?
# Import the modules we're going to use.
# You may need to 'pip install aiohttp'
from urllib.parse import urlencode
import asyncio
import aiohttp
import json
# Test data
STREET_ADDRESSES = [
"1600 Pennsylvania Avenue, Washington DC, USA",
"11 Wall Street New York, NY",
"350 Fifth Avenue New York, NY 10118",
"221B Baker St, London, England",
"Tour Eiffel Champ de Mars, Paris",
"4059 Mt Lee Dr.Hollywood, CA 90068",
"Buckingham Palace, London, England",
"Statue of Liberty, Liberty Island New York, NY 10004",
"Manger Square, Bethlehem, West Bank",
"2 Macquarie Street, Sydney"
]
# Constants for accessing the Geoapify API
GEOCODING_BATCH_API = "https://api.geoapify.com/v1/batch/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"
# State for batching requests
ADDRESSES_BATCH = []
BATCH_LOOP_RUNNING = False
async def process_batches_loop():
global ADDRESSES_BATCH, BATCH_LOOP_RUNNING
# Loop while BATCH_LOOP_RUNNING is True
while BATCH_LOOP_RUNNING:
# Wait for more to be added to the batch
await asyncio.sleep(0.1)
# If nothing has been added to the batch then continue
# to the start of the loop as there's nothing to do.
if not ADDRESSES_BATCH:
continue
# Get the current items from the batch and reset the batch
batch = ADDRESSES_BATCH
ADDRESSES_BATCH = []
# Get the locations of the current batch
addresses = [address for (address, future) in batch]
locations = await get_locations(addresses)
# Set the results on the futures from this batch
for address, future in batch:
coords = locations.get(address)
future.set_result(coords)
async def get_locations(addresses):
"""Return a dictionary of address -> (lat, lon)."""
# Construct the URL to do the batch request
query_string = urlencode({"apiKey": YOUR_API_KEY})
url = f"{GEOCODING_BATCH_API}?{query_string}"
# Build the JSON payload for the batch POST request
data = json.dumps(addresses)
# And use Content-Type: application/json in the headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
# Make the POST request to the API
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, headers=headers) as response:
response_json = await response.read()
response_data = json.loads(response_json)
# The API can return a dict with a pending status if it needs more
# time to complete. Poll the API until the result is ready.
while isinstance(response_data, dict) \
and response_data.get("status") == "pending":
# Wait a bit before calling the API
await asyncio.sleep(0.1)
# Query the result to see if it's ready yet
request_id = response_data.get("id")
async with aiohttp.ClientSession() as session:
async with session.get(url + f"&id={request_id}") as response:
response_json = await response.read()
response_data = json.loads(response_json)
# Gather the results into a dictionary of address -> (lat, lon)
locations = {}
for result in response_data:
address = result["query"]["text"]
coords = result["lat"], result["lon"]
locations[address] = coords
return locations
async def get_location(address):
"""Return (latitude, longitude) from an address."""
global BATCH_LOOP_RUNNING
# Create a Future that will be set with the location once the
# request has completed.
loop = asyncio.get_event_loop()
future = loop.create_future()
# Add the ip address and future to the batch
ADDRESSES_BATCH.append((address, future))
# Start 'process_batches' running on the asyncio event loop if it's
# not already running.
if not BATCH_LOOP_RUNNING:
BATCH_LOOP_RUNNING = True
asyncio.create_task(process_batches_loop())
# Wait for the batch our address is in to return
await future
# And return the result
return future.result()
async def main():
# Print the city for each IP address
tasks = []
for address in STREET_ADDRESSES:
tasks.append(get_location(address))
# Wait for all tasks to complete
locations = await asyncio.gather(*tasks)
# Print them all once all requests have completed
for address, location in zip(STREET_ADDRESSES, locations):
print(f"{address} -> {location}")
if __name__ == "__main__":
# Because it's an async function we need to run it using the asyncio event loop
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment