Created
August 24, 2018 08:42
-
-
Save yoheiMune/7baf29a0475c3a77b47667c72cfe210c to your computer and use it in GitHub Desktop.
google geo location api and place api.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Google Geocoding API を用いて、位置情報などを扱う. | |
""" | |
import os | |
import json | |
import time | |
import re | |
import csv | |
from pprint import pprint | |
from collections import OrderedDict | |
import random | |
import requests | |
import Levenshtein | |
from credentials import PLACE_API_KEYS, GEOCODING_API_KEYS | |
from utils.log import logger | |
from utils.string import trim_space, zenkaku_to_hankaku, compare_string | |
from utils.shop import distance as calc_distance | |
def get_latlng(address): | |
""" | |
住所から緯度経度を取得します. | |
""" | |
# 住所の整形 | |
address = re.sub("〒\d{3}-\d{4}\s", "", address) | |
address = re.sub("〒\s", "", address) | |
address = re.sub("〒", "", address) | |
address = remove_building_name(address) | |
# API | |
url = "https://maps.googleapis.com/maps/api/geocode/json" | |
params = { | |
"key" : _get_apikey("geocoding"), | |
"address" : address, | |
"language" : 'ja' | |
} | |
r = requests.get(url, params=params) | |
result = r.json() | |
if not result["results"]: | |
logger.warn("Geo情報取得に失敗しました. address={}, response={}".format(address, json.dumps(result, ensure_ascii=False))) | |
return None, None | |
# 取得したいデータ | |
lat = result["results"][0]["geometry"]["location"]["lat"] | |
lng = result["results"][0]["geometry"]["location"]["lng"] | |
# 返却 | |
return lat, lng | |
def add_geocoding(shop): | |
# ビル名を除去 | |
address = remove_building_name(shop.address) | |
url = "https://maps.googleapis.com/maps/api/geocode/json" | |
params = { | |
"key" : _get_apikey("geocoding"), | |
"address" : address, | |
"language" : 'ja' | |
} | |
r = requests.get(url, params=params) | |
result = r.json() | |
if not result["results"]: | |
logger.warn("Geo情報取得に失敗しました. address={}, response={}".format(shop.address, json.dumps(result, ensure_ascii=False))) | |
return | |
_extract(result, shop) | |
def _extract(result, shop): | |
# 取得したいデータ | |
post_code = "" | |
prefecture = "" | |
city = "" | |
latitude = "" | |
longitude = "" | |
place_id = "" | |
for c in result["results"][0]["address_components"]: | |
types = c["types"] | |
name = c["long_name"] | |
if "postal_code" in types: | |
post_code = name | |
elif "administrative_area_level_1" in types: | |
prefecture = name | |
elif "locality" in types: | |
city = name | |
latitude = result["results"][0]["geometry"]["location"]["lat"] | |
longitude = result["results"][0]["geometry"]["location"]["lng"] | |
place_id = result["results"][0]["place_id"] | |
shop.post_code = post_code | |
shop.lat_address = latitude | |
shop.lng_address = longitude | |
shop.prefecture = prefecture | |
shop.city = city | |
shop.place_id = place_id | |
def reverse_geocoding(shop): | |
url = "https://maps.googleapis.com/maps/api/geocode/json" | |
params = { | |
"key" : _get_apikey(), | |
"latlng" : "{},{}".format(shop.lat_google, shop.lng_google), | |
"language" : 'ja' | |
} | |
r = requests.get(url, params=params) | |
# print(r.text) | |
result = r.json() | |
if result["results"]: | |
prefecture = "" | |
city = "" | |
for c in result["results"][0]["address_components"]: | |
types = c["types"] | |
name = c["long_name"] | |
if "administrative_area_level_1" in types: | |
prefecture = name | |
elif "locality" in types: | |
city = name | |
shop.prefecture = prefecture | |
shop.city = city | |
def reverse_geocoding2(latitude, longitude): | |
url = "https://maps.googleapis.com/maps/api/geocode/json" | |
params = { | |
"key" : _get_apikey(), | |
"latlng" : "{},{}".format(latitude, longitude), | |
"language" : 'ja' | |
} | |
r = requests.get(url, params=params) | |
result = r.json() | |
prefecture = None | |
city = None | |
address = result["results"][0]["formatted_address"].replace("日本、", "") | |
for c in result["results"][0]["address_components"]: | |
types = c["types"] | |
name = c["long_name"] | |
if "administrative_area_level_1" in types: | |
prefecture = name | |
elif "locality" in types: | |
city = name | |
return prefecture, city, address | |
def get_place_detail(shop): | |
""" | |
PlaceIDを用いて、場所の詳細を取得します. | |
https://developers.google.com/places/web-service/details?hl=ja | |
""" | |
url = "https://maps.googleapis.com/maps/api/place/details/json" | |
params = { | |
"key" : _get_apikey(), | |
"placeid" : shop.place_id, | |
"language" : 'ja' | |
} | |
r = requests.get(url, params=params) | |
return r.json() | |
def _get_apikey(type_="place"): | |
if type_ == "place": | |
return random.choice(PLACE_API_KEYS) | |
elif type_ == "geocoding": | |
return random.choice(GEOCODING_API_KEYS) | |
else: | |
raise Exception("type_の値が不正です。type_={}".format(type_)) | |
def add_place_detail(shop, keyword, radius=5000): | |
""" | |
周辺検索APIを用いて場所の詳細を取得します. | |
https://developers.google.com/places/web-service/search?hl=ja | |
""" | |
rr = None | |
tmp_r = None | |
next_token = None | |
for i in range(10): | |
url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json" | |
query = "+".join([s for s in [keyword, shop.name] if s]) | |
params = { | |
"key" : _get_apikey(), | |
"keyword" : keyword, | |
"location" : "{},{}".format(shop.lat_google, shop.lng_google), | |
"rankby" : "distance", | |
"language" : 'ja' | |
} | |
if next_token: | |
params = { | |
"key" : _get_apikey(), | |
"pagetoken" : next_token | |
} | |
logger.info("Next page {}, token={}".format(i, next_token)) | |
time.sleep(2) | |
r = requests.get(url, params=params) | |
result = r.json() | |
if result["status"] != "OK": | |
logger.info(result) | |
# 次ページへ | |
page_break = False | |
if result.get("next_page_token"): | |
next_token = result.get("next_page_token") | |
else: | |
page_break = True | |
if result["results"]: | |
for rrr in result["results"]: | |
logger.info("searching... {}".format(rrr["name"])) | |
if rrr["name"].find(shop.name) != -1: | |
logger.info("FOUND. name=" + shop.name) | |
rr = rrr | |
break | |
if rr or page_break: | |
break | |
if not tmp_r and result["results"]: | |
tmp_r = result["results"][0] | |
if not rr: | |
logger.info("NOT FOUND. name=" + shop.name) | |
rr = tmp_r | |
shop.json = json.dumps(rr, ensure_ascii=False) | |
shop.place_id = rr["place_id"] | |
shop.lat_google = rr["geometry"]["location"]["lat"] | |
shop.lng_google = rr["geometry"]["location"]["lng"] | |
return result | |
def place_text_search(params): | |
""" | |
テキスト検索を行います. | |
""" | |
url = "https://maps.googleapis.com/maps/api/place/textsearch/json" | |
params["key"] = _get_apikey() | |
result = requests.get(url, params=params).json() | |
# エラーチェック | |
if result.get("status") == "OVER_QUERY_LIMIT": | |
raise Exception(json.dumps(result, ensure_ascii=False)) | |
# ページング(あれば) | |
if result.get("next_page_token"): | |
pagetoken = result.get("next_page_token") | |
# 最大10回 | |
for i in range(10): | |
logger.info("ページング:{}".format(i+2)) | |
# APIの準備が整うまでちょっと待つ | |
# time.sleep(5) | |
params = { | |
"key" : _get_apikey(), | |
"pagetoken" : pagetoken | |
} | |
tmp_result = requests.get(url, params=params).json() | |
# エラーチェック | |
if result.get("status") == "OVER_QUERY_LIMIT": | |
raise Exception(json.dumps(result, ensure_ascii=False)) | |
# 結果を追加 | |
result["results"] += tmp_result["results"] | |
# ページトークンの取得 | |
if not tmp_result.get("next_page_token"): | |
break | |
else: | |
# ページトークンを更新. | |
pagetoken = tmp_result.get("next_page_token") | |
return result | |
def add_place_detail_2(shop, keyword, must_words=[], use_geo=True): | |
""" | |
Placeテキスト検索リクエストを用いて、場所の詳細を取得します. | |
https://developers.google.com/places/web-service/search?hl=ja#TextSearchRequests | |
""" | |
params = { | |
"query" : keyword, | |
"language" : 'ja' | |
} | |
if use_geo: | |
params["location"] = "{},{}".format(shop.lat_address, shop.lng_address) | |
params["radius"] = 5000 | |
result = place_text_search(params) | |
# エラーチェック | |
if result.get("status") == "OVER_QUERY_LIMIT": | |
raise Exception(json.dumps(result, ensure_ascii=False)) | |
# データを取得 | |
if result["results"]: | |
candidates = [] | |
for rr in result["results"]: | |
# 必須ワード指定がある場合には、それが含まれるもののみを対象とする. | |
if must_words: | |
if not contains_word(rr["name"], must_words+[keyword]): | |
continue | |
else: | |
candidates.append(rr) | |
# 2件以上の結果がある場合は、APIも迷っているので、それを残しておく. | |
if len(candidates) >= 2: | |
shop.ambiguous = json.dumps(result, ensure_ascii=False) | |
# 対象を1つ決める. | |
r = get_one_from_candidates2(shop, candidates) | |
if not r: | |
logger.warn("Placeが見つかりません。 name={}".format(shop.name)) | |
shop.json = None | |
shop.place_id = None | |
shop.lat_google = None | |
shop.lng_google = None | |
return result | |
# JSON | |
shop.json = json.dumps(r, ensure_ascii=False) | |
# PlaceID | |
shop.place_id = r["place_id"] | |
# 緯度経度 | |
shop.lat_google = r["geometry"]["location"]["lat"] | |
shop.lng_google = r["geometry"]["location"]["lng"] | |
# 結果表示(問題がある場合) | |
if not compare_strings(keyword, r["name"]) or shop.error_meters >= 200: | |
logger.warn("違うかもしれません.\n\tshop.name={}\n\tr.name={}\n\tdistance={}".format(keyword, r["name"], shop.error_meters)) | |
# 返却 | |
return result | |
def contains_word(s, words): | |
for w in words: | |
if s.find(w) != -1: | |
return True | |
return False | |
def get_one_from_candidates2(shop, candidates): | |
""" | |
候補から、最も有力なものを選びます. | |
""" | |
# 距離 <= 5km | |
tmp_candidates = [] | |
for c in candidates: | |
distance = calc_distance( | |
shop.lat_address, | |
shop.lng_address, | |
c["geometry"]["location"]["lat"], | |
c["geometry"]["location"]["lng"] | |
) | |
if distance != None and distance <= 5000: | |
tmp_candidates.append(c) | |
else: | |
logger.info("距離>5kmのため除去。name={}, distance={}".format(c["name"], int(distance) if distance else -1)) | |
candidates = tmp_candidates | |
# 住所らしきものは排除. | |
tmp_candidates = [] | |
for c in candidates: | |
# NG例:婦中町笹倉489−2 | |
name = zenkaku_to_hankaku(c["name"]).replace("ー", "-").replace("丁目", "-").replace("番地", "-") | |
if not re.search("\d+-\d+$", name): | |
tmp_candidates.append(c) | |
else: | |
logger.info("住所プレイスのため除去。 name={}".format(c["name"])) | |
candidates = tmp_candidates | |
# 名前が一致するものがあれば、それを返す. | |
for c in candidates: | |
if compare_strings(shop.name, c["name"]): | |
return c | |
# APIが有力候補と考えいるもの(1件目)を返す. | |
return candidates[0] if candidates else None | |
def get_one_from_candidates(lat, lng, results): | |
""" | |
候補から最も有力と思われるものを選びます. | |
""" | |
results = sorted(results, key=lambda r:calc_distance(lat, lng, r["geometry"]["location"]["lat"], r["geometry"]["location"]["lng"])) | |
# results = sorted(results, key=lambda r:Levenshtein.distance(keyword, r["name"])) | |
# debug | |
for r in results: | |
print("距離:", r["name"], calc_distance(lat, lng, r["geometry"]["location"]["lat"], r["geometry"]["location"]["lng"])) | |
return results[0] | |
def compare_strings(s1, s2, must_words=[]): | |
""" | |
s1 が s2 に含まれていれば True を返却します. | |
""" | |
for w in must_words: | |
s1 = s1.replace(w, "") | |
s2 = s2.replace(w, "") | |
s1 = s1.replace("株式会社", "") | |
s1 = s1.replace("(株)", "") | |
s1 = s1.replace("(株)", "") | |
s1 = s1.replace("有限会社", "") | |
s1 = s1.replace("(有)", "") | |
s1 = s1.replace("(有)", "") | |
s2 = s2.replace("株式会社", "") | |
s2 = s2.replace("(株)", "") | |
s2 = s2.replace("(株)", "") | |
s2 = s2.replace("有限会社", "") | |
s2 = s2.replace("(有)", "") | |
s2 = s2.replace("(有)", "") | |
s1 = compare_string(s1) | |
s2 = compare_string(s2) | |
# 検証1:含まれるか | |
if s1.find(s2) != -1: | |
return True | |
# 検証2:編集距離 | |
return Levenshtein.distance(s1, s2) <= 2 | |
address_list = None | |
def get_address_list(): | |
global address_list | |
if address_list: | |
return address_list | |
address_list = [] | |
with open("./dist/zenkoku2.csv", encoding="utf-8", newline="") as f: | |
reader = csv.reader(f, delimiter=',', quotechar='"') | |
for i, row in enumerate(reader): | |
# ヘッダーはスキップ | |
if i == 0: | |
continue | |
address_list.append({ | |
"pref" : row[7], | |
"city" : row[9] | |
}) | |
return address_list | |
def add_prefecture_and_city(shop): | |
address = re.sub("^〒\d{3}-\d{4}", "", shop.address).replace(" ", "") | |
address = address.replace(" ", "").replace(" ", "") | |
for a in get_address_list(): | |
if address.find(a["pref"]) != -1 and address.find(a["city"]) != -1: | |
shop.prefecture = a["pref"] | |
shop.city = a["city"] | |
return | |
# 2つ目の手段:リバースジオ | |
print("RevGeo:", shop.name) | |
pref, city, address = reverse_geocoding2(shop.lat_google, shop.lng_google) | |
if not pref or not city: | |
logger.warn("住所が特定できません. \naddress={}".format(address)) | |
shop.prefecture = pref | |
shop.city = city | |
shop.address = address | |
# 郵便番号データを保持する | |
post_code_map = None | |
def get_post_code(address): | |
""" | |
住所から郵便番号を取得します. | |
""" | |
# 住所整形 | |
address = address.replace("山武郡大網白里町", "大網白里市") | |
address = address.replace("黒川郡富谷町", "富谷市") | |
address = address.replace("南埼玉郡白岡町", "白岡市") | |
global post_code_map | |
if not post_code_map: | |
load_post_code_data() | |
# 最長マッチングで、郵便番号を検索 | |
for key in post_code_map.keys(): | |
if address.find("".join(list(key))) == 0: | |
return post_code_map[key], key[0], key[1] | |
# 特別対応 | |
if address == "長野県長野都市計画事業水沢上庭土地区画整理事業34街区1": | |
return "388-8019", "長野県", "長野都市計画事業水沢上庭土地区画整理事業34街区1" | |
elif address == "千葉県流山市鰭ヶ崎地区一体型特定土地区画整理事業23番街区1": | |
return "270-0156", "千葉県", "流山市" | |
elif address == "茨城県上菅谷駅前地区土地区画整理事業18街区7": | |
return "311-0105", "茨城県", "那珂市" | |
elif address == "青森県八戸市都市計画事業土地区画整理事業保留地61街区5号": | |
return "031-0011", "青森県", "八戸市" | |
elif address == "宮城県女川町被災市街地復興土地区画整理事業A0-15-1": | |
return "986-2200", "宮城県", "女川町" | |
elif address.find("東京都三宅村") == 0: | |
return "100-1212", "東京都", "三宅村" | |
# 見つからなかった. | |
return None, None, None | |
def load_post_code_data(): | |
""" | |
郵便番号データを読み込みます. | |
""" | |
logger.info("郵便番号データを読み込み中..") | |
global post_code_map | |
data = [] | |
with open("dist/zenkoku2.csv", newline="") as f: | |
reader = csv.DictReader(f, delimiter=",") | |
for row in reader: | |
# 事業所や廃止データは対象外 | |
# if row["事業所フラグ"] == "1" or row["廃止フラグ"] == "1": | |
if row["事業所フラグ"] == "1": | |
continue | |
data.append(row) | |
# 最長マッチングをしたいので、長い住所になるように並び替え | |
data.sort(reverse=True, key=lambda d:d["都道府県"]+d["市区町村"]+d["町域"]+d["字丁目"]) | |
# 郵便番号mapを作成 | |
data = [((d["都道府県"],d["市区町村"],d["町域"],d["字丁目"]), d["郵便番号"]) for d in data] | |
post_code_map = OrderedDict(data) | |
prefs = None | |
def load_prefecture(): | |
""" | |
都道府県データを読み込みます. | |
""" | |
global prefs | |
if not prefs: | |
with open("dist/prefs2.csv", newline="") as f: | |
reader = csv.DictReader(f, delimiter=",") | |
prefs = [row for row in reader] | |
return prefs | |
def normalize_address(address): | |
""" | |
都道府県が最初にない場合に、それを付与する. | |
""" | |
global post_code_map, prefs | |
if not post_code_map: | |
load_post_code_data() | |
if not prefs: | |
load_prefecture() | |
# 都道府県から始まる住所かをチェックします. | |
r = [pref for pref in prefs if address.find(pref["name"]) == 0] | |
if r: | |
# OK | |
return address | |
# 市町村から始まる場合には、都道府県を逆引きします. | |
r = [key[0] + address for key in post_code_map.keys() if address.find(key[1]) == 0] | |
if r: | |
return r[0] | |
# for key in post_code_map.keys(): | |
# if address.find(key[1]) == 0: | |
# return key[0] + address | |
# 特別対応 | |
if address.find("長野都市計画事業") == 0: | |
return "長野県" + address | |
elif address == "鰭ヶ崎地区一体型特定土地区画整理事業23番街区1": | |
return "千葉県流山市" + address | |
elif address.find("女川町") == 0: | |
return "宮城県牡鹿郡" + address | |
elif address.find("南三陸町") == 0: | |
return "宮城県本吉郡" + address | |
# 見つからない場合 | |
raise Exception("都道府県を特定できません。address={}".format(address)) | |
def remove_building_name(address): | |
""" | |
住所からビル名を取り除く対応 | |
""" | |
# 全角から半角へ統一. | |
address = zenkaku_to_hankaku(address) | |
# x丁目を変換 | |
matches = re.search("^(.*?)(\d+)丁目(.*)$", address) | |
if matches: | |
address = matches[1] + matches[2] + '-' + matches[3] | |
# x番を変換 | |
matches = re.search("^(.*?)(\d+)番地?の?(.*)$", address) | |
if matches: | |
address = matches[1] + matches[2] + '-' + matches[3] | |
# x号を変換 | |
matches = re.search("^(.*?)(\d+)号(.*)$", address) | |
if matches: | |
address = matches[1] + matches[2] + matches[3] | |
# ビル名を取り除く対応(111-222-333) | |
matches = re.search("^(.*?)(\d+-\d+-\d+)(.*)$", address) | |
if matches: | |
# print("v1") | |
# print("全体:", matches[0]) | |
# print("住所1:", matches[1]) | |
# print("住所2:", matches[2]) | |
# print("ビル名:", matches[3]) | |
address = matches[1] + matches[2] | |
else: | |
# ビル名を取り除く対応2(111-222) | |
matches = re.search("^(.*?)(\d+-\d+)(.*)$", address) | |
if matches: | |
# print("v2") | |
# print("全体:", matches[0]) | |
# print("住所1:", matches[1]) | |
# print("住所2:", matches[2]) | |
# print("ビル名:", matches[3]) | |
address = matches[1] + matches[2] | |
else: | |
# 札幌市の特別対応 | |
prefix = "" | |
matches = re.search("^(北海道札幌市.*?区.*?\d+条)", address) | |
if matches: | |
prefix = matches[1] | |
address = address.replace(prefix, "") | |
# ビル名を取り除く対応3(111) | |
matches = re.search("^(.*?)(\d+)(.*)$", address) | |
if matches: | |
# print("v3") | |
# print("全体:", prefix + matches[0]) | |
# print("住所1:", prefix + matches[1]) | |
# print("住所2:", matches[2]) | |
# print("ビル名:", matches[3]) | |
address = prefix + matches[1] + matches[2] | |
return address | |
def get_placelist(): | |
""" | |
住所一覧を返却します(ジェネレーター) | |
""" | |
for pref in load_prefecture(): | |
yield get_placelist_by_pref(pref["name"]) | |
def get_placelist_by_pref(pref_name): | |
""" | |
都道府県ごとに、場所一覧を返却します(ジェネレーター). | |
""" | |
pref_name = re.sub("(都|府|県)$", "", pref_name) | |
with open("dist/placelist_{}.csv".format(pref_name), newline="") as f: | |
reader = csv.reader(f, delimiter=",", quotechar='"') | |
for row in reader: | |
yield row |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment