Skip to content

Instantly share code, notes, and snippets.

@0187773933
Created July 8, 2024 23:50
Show Gist options
  • Save 0187773933/149402485d1ab5ad922b643b4c5458ad to your computer and use it in GitHub Desktop.
Save 0187773933/149402485d1ab5ad922b643b4c5458ad to your computer and use it in GitHub Desktop.
Google Places Search
import requests
from pprint import pprint
import json
import csv
import folium
import time
from folium.plugins import MarkerCluster
API_KEY = 'asdf'
def write_json( file_path , python_object ):
with open( file_path , 'w', encoding='utf-8' ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
with open( file_path ) as f:
return json.load( f )
def write_csv( filename , results):
# fieldnames = ['id', 'nationalPhoneNumber', 'internationalPhoneNumber', 'formattedAddress', 'latitude', 'longitude', 'rating', 'websiteUri', 'userRatingCount', 'displayName']
fieldnames = [ 'Name' , 'Phone Number' , 'Address' , 'Rating' , 'Total Reviews' , 'Website' ]
with open( filename , mode='w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in results:
row = {
'displayName': result['displayName']['text'] ,
'nationalPhoneNumber': result.get('nationalPhoneNumber'),
'formattedAddress': result.get('formattedAddress'),
'rating': result.get('rating'),
'userRatingCount': result.get('userRatingCount'),
'websiteUri': result.get('websiteUri'),
}
writer.writerow(row)
def create_map(map_filename , data ):
# Filter out entries without location data
valid_data = [place for place in data if 'location' in place and 'latitude' in place['location'] and 'longitude' in place['location']]
if not valid_data:
print("No valid location data found.")
return
# Initialize the map centered around the first valid result
first_location = valid_data[0]['location']
folium_map = folium.Map(location=[first_location['latitude'], first_location['longitude']], zoom_start=13)
# Add marker cluster to the map
marker_cluster = MarkerCluster().add_to(folium_map)
# Add each place to the map with a marker and popup (excluding review text)
for place in valid_data:
location = place['location']
name = place.get('displayName', {}).get('text', 'No name')
address = place.get('formattedAddress', 'No address')
rating = place.get('rating', 'No rating')
user_ratings_count = place.get('userRatingCount', 'No ratings count')
popup_content = f"""
<strong>{name}</strong><br>
Address: {address}<br>
Rating: {rating}<br>
User Ratings Count: {user_ratings_count}
"""
folium.Marker(
location=[location['latitude'], location['longitude']],
popup=folium.Popup(popup_content, max_width=300),
icon=folium.Icon(icon='info-sign')
).add_to(marker_cluster)
# Save the map to an HTML file
folium_map.save(map_filename)
def nearby_search(location, radius, max_results, included_types):
url = "https://maps.googleapis.com/v1/places:searchNearby"
params = {
"locationRestriction.circle.center.latitude": location['latitude'],
"locationRestriction.circle.center.longitude": location['longitude'],
"locationRestriction.circle.radius": radius,
"maxResultCount": max_results,
"includedTypes": included_types
}
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": API_KEY,
"X-Goog-FieldMask": "places.displayName"
}
response = requests.post(url, json=params, headers=headers)
response.raise_for_status()
return response.json()
def place_details_search(place_id):
url = f"https://places.googleapis.com/v1/places/{place_id}"
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": API_KEY,
"X-Goog-FieldMask": "id,displayName"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def text_search_new(text_query, location, radius=50000.0):
url = "https://places.googleapis.com/v1/places:searchText"
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": API_KEY,
"X-Goog-FieldMask": "nextPageToken,places.reviews,places.id,places.rating,places.userRatingCount,places.nationalPhoneNumber,places.internationalPhoneNumber,places.websiteUri,places.displayName,places.formattedAddress,places.priceLevel,places.location"
}
data = {
"textQuery": text_query,
"pageSize": 20,
"rankPreference": "RELEVANCE",
"locationBias": {
"circle": {
"center": {
"latitude": location['latitude'],
"longitude": location['longitude']
},
"radius": radius
}
}
}
all_places = []
next_page_token = None
page_token = None
while True:
if next_page_token:
data["pageToken"] = page_token
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
places = result.get('places', [])
all_places.extend(places)
# Debug statements
if len( places ) == 0:
break
print(f"Retrieved {len(places)} places, total so far: {len(all_places)}")
next_page_token = result.get('nextPageToken')
if next_page_token:
if next_page_token != page_token:
page_token = next_page_token
print(f"Next page token: {next_page_token}")
time.sleep( 1 )
else:
print("No more pages.")
break
else:
print("No more pages.")
break
return all_places
# https://developers.google.com/maps/documentation/places/web-service/supported_types
def text_search(text_query, location, radius=5000, rankby="prominence"):
url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
query = {
"keyword": text_query,
"location": f"{location['latitude']},{location['longitude']}",
"radius": radius,
"rankby": rankby,
"key": API_KEY,
}
headers = {
"Content-Type": "application/json",
}
all_results = []
while True:
response = requests.get(url, params=query, headers=headers)
response.raise_for_status()
results = response.json()
all_results.extend(results.get('results', []))
print(f"Total results so far: {len(all_results)}")
next_page_token = results.get('next_page_token')
if not next_page_token or len(all_results) >= 60:
break
print(f"Next page token: {next_page_token}")
query['pagetoken'] = next_page_token
print("Waiting for next page token to become valid...")
time.sleep(2) # Required delay to let the next_page_token become valid
return all_results
def filter_unique_ids( results ):
seen_ids = set()
unique = []
for result in results:
if result["id"] not in seen_ids:
unique.append( result )
seen_ids.add( result[ "id" ] )
return unique
if __name__ == "__main__":
search_term = "endodontist general anesthesia"
save_file_name = "test-3"
cities = [
{ "name": "Huber Heights, Ohio" , "location": { 'latitude': 39.890783 , 'longitude': -84.095164 } } ,
{ "name": "Cincinnati, Ohio" , "location": { 'latitude': 39.140772 , 'longitude': -84.504651 } } ,
{ "name": "Columbus, Ohio" , "location": { 'latitude': 39.967962 , 'longitude': -83.004995 } } ,
{ "name": "Cleveland, Ohio" , "location": { 'latitude': 41.485341 , 'longitude': -81.683451 } } ,
{ "name": "Toledo, Ohio" , "location": { 'latitude': 41.647074 , 'longitude': -83.522599 } } ,
]
results = []
for city in cities:
results.extend( text_search_new( f"{search_term} in {city['name']}" , city['location'] , radius=50000.0 ) )
results = filter_unique_ids( results )
sorted_results = sorted( results , key=lambda x: ( -x.get( 'rating' , 0 ) , -x.get( 'userRatingCount' , 0 ) ) )
pprint( sorted_results )
print( len( sorted_results ) )
write_json( f"{save_file_name}.json" , sorted_results )
write_csv( f"{save_file_name}.csv" , sorted_results )
create_map( f"{save_file_name}.html" , sorted_results )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment