Skip to content

Instantly share code, notes, and snippets.

@bribroder
Last active October 1, 2018 13:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bribroder/0bb0611a7fef04c378c7d0570f5f1940 to your computer and use it in GitHub Desktop.
Save bribroder/0bb0611a7fef04c378c7d0570f5f1940 to your computer and use it in GitHub Desktop.
import requests
class GoogleSearchError(Exception):
pass
def search_places(
location = None,
keywords = None,
place_type = None,
page_token = None,
api_key = None,
radius = 50000,
tries = 10
):
place_url = 'https://maps.googleapis.com/maps/api/place/nearbysearch/json' \
'?key={}'.format(api_key)
# read more about API fields here:
# https://developers.google.com/places/web-service/search#PlaceSearchRequests
for api_field, api_field_value in (
# location is a coordinate tuple like (latitude, longitude)
('location', location if location is None else ','.join(str(l) for l in location)),
('keyword', keywords),
('radius', radius),
('type', place_type),
('pagetoken', page_token)
):
if api_field_value is not None:
place_url += '&{}={}'.format(api_field, api_field_value)
"""
Requests using a page token may fail with status "INVALID_REQUEST"
if dispatched too quickly; tokens take a few seconds to become valid.
Requests may also semi-randomly fail with status "UNKNOWN_ERROR",
but often these work on the next try.
This function will retry after a short sleep until we either
hit the tries limit, succeed (status "OK"), or get a bad status like "DENIED".
"""
while tries > 0:
response = requests.get(place_url).json()
if response['status'] == 'UNKNOWN_ERROR' \
or response['status'] == 'INVALID_REQUEST' \
and page_token is not None:
tries -= 1
# We build in some sleep time when using a page token,
# but sometimes it may take another try.
print('[INFO] Request failed, will retry {} times.'.format(tries))
time.sleep(2)
continue
elif response['status'] == 'ZERO_RESULTS':
# sometimes pages are empty...
return []
elif 'DENIED' in response['status']:
raise GoogleSearchError('[ERROR] API request DENIED; response: {}'.format(
response
))
elif response['status'] != 'OK':
raise GoogleSearchError('[ERROR] API request failed; response: {}'.format(
response
))
else:
break
if tries == 0 and not any(
value in response for value in ('next_page_token', 'results')
):
raise GoogleSearchError('[ERROR] API request failed and ran out of retries.')
results = response['results']
if 'next_page_token' in response:
# There is a short delay between when a next_page_token
# is issued and when it will become valid.
time.sleep(2)
results += search_places(page_token = response['next_page_token'])
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment