Skip to content

Instantly share code, notes, and snippets.

@yoheiMune
Created August 24, 2018 08:42
Show Gist options
  • Save yoheiMune/7baf29a0475c3a77b47667c72cfe210c to your computer and use it in GitHub Desktop.
Save yoheiMune/7baf29a0475c3a77b47667c72cfe210c to your computer and use it in GitHub Desktop.
google geo location api and place api.
"""
Google Geocoding API を用いて、位置情報などを扱う.
"""
import os
import json
import time
import re
import csv
from pprint import pprint
from collections import OrderedDict
import random
import requests
import Levenshtein
from credentials import PLACE_API_KEYS, GEOCODING_API_KEYS
from utils.log import logger
from utils.string import trim_space, zenkaku_to_hankaku, compare_string
from utils.shop import distance as calc_distance
def get_latlng(address):
"""
住所から緯度経度を取得します.
"""
# 住所の整形
address = re.sub("〒\d{3}-\d{4}\s", "", address)
address = re.sub("〒\s", "", address)
address = re.sub("〒", "", address)
address = remove_building_name(address)
# API
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {
"key" : _get_apikey("geocoding"),
"address" : address,
"language" : 'ja'
}
r = requests.get(url, params=params)
result = r.json()
if not result["results"]:
logger.warn("Geo情報取得に失敗しました. address={}, response={}".format(address, json.dumps(result, ensure_ascii=False)))
return None, None
# 取得したいデータ
lat = result["results"][0]["geometry"]["location"]["lat"]
lng = result["results"][0]["geometry"]["location"]["lng"]
# 返却
return lat, lng
def add_geocoding(shop):
# ビル名を除去
address = remove_building_name(shop.address)
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {
"key" : _get_apikey("geocoding"),
"address" : address,
"language" : 'ja'
}
r = requests.get(url, params=params)
result = r.json()
if not result["results"]:
logger.warn("Geo情報取得に失敗しました. address={}, response={}".format(shop.address, json.dumps(result, ensure_ascii=False)))
return
_extract(result, shop)
def _extract(result, shop):
# 取得したいデータ
post_code = ""
prefecture = ""
city = ""
latitude = ""
longitude = ""
place_id = ""
for c in result["results"][0]["address_components"]:
types = c["types"]
name = c["long_name"]
if "postal_code" in types:
post_code = name
elif "administrative_area_level_1" in types:
prefecture = name
elif "locality" in types:
city = name
latitude = result["results"][0]["geometry"]["location"]["lat"]
longitude = result["results"][0]["geometry"]["location"]["lng"]
place_id = result["results"][0]["place_id"]
shop.post_code = post_code
shop.lat_address = latitude
shop.lng_address = longitude
shop.prefecture = prefecture
shop.city = city
shop.place_id = place_id
def reverse_geocoding(shop):
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {
"key" : _get_apikey(),
"latlng" : "{},{}".format(shop.lat_google, shop.lng_google),
"language" : 'ja'
}
r = requests.get(url, params=params)
# print(r.text)
result = r.json()
if result["results"]:
prefecture = ""
city = ""
for c in result["results"][0]["address_components"]:
types = c["types"]
name = c["long_name"]
if "administrative_area_level_1" in types:
prefecture = name
elif "locality" in types:
city = name
shop.prefecture = prefecture
shop.city = city
def reverse_geocoding2(latitude, longitude):
url = "https://maps.googleapis.com/maps/api/geocode/json"
params = {
"key" : _get_apikey(),
"latlng" : "{},{}".format(latitude, longitude),
"language" : 'ja'
}
r = requests.get(url, params=params)
result = r.json()
prefecture = None
city = None
address = result["results"][0]["formatted_address"].replace("日本、", "")
for c in result["results"][0]["address_components"]:
types = c["types"]
name = c["long_name"]
if "administrative_area_level_1" in types:
prefecture = name
elif "locality" in types:
city = name
return prefecture, city, address
def get_place_detail(shop):
"""
PlaceIDを用いて、場所の詳細を取得します.
https://developers.google.com/places/web-service/details?hl=ja
"""
url = "https://maps.googleapis.com/maps/api/place/details/json"
params = {
"key" : _get_apikey(),
"placeid" : shop.place_id,
"language" : 'ja'
}
r = requests.get(url, params=params)
return r.json()
def _get_apikey(type_="place"):
if type_ == "place":
return random.choice(PLACE_API_KEYS)
elif type_ == "geocoding":
return random.choice(GEOCODING_API_KEYS)
else:
raise Exception("type_の値が不正です。type_={}".format(type_))
def add_place_detail(shop, keyword, radius=5000):
"""
周辺検索APIを用いて場所の詳細を取得します.
https://developers.google.com/places/web-service/search?hl=ja
"""
rr = None
tmp_r = None
next_token = None
for i in range(10):
url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
query = "+".join([s for s in [keyword, shop.name] if s])
params = {
"key" : _get_apikey(),
"keyword" : keyword,
"location" : "{},{}".format(shop.lat_google, shop.lng_google),
"rankby" : "distance",
"language" : 'ja'
}
if next_token:
params = {
"key" : _get_apikey(),
"pagetoken" : next_token
}
logger.info("Next page {}, token={}".format(i, next_token))
time.sleep(2)
r = requests.get(url, params=params)
result = r.json()
if result["status"] != "OK":
logger.info(result)
# 次ページへ
page_break = False
if result.get("next_page_token"):
next_token = result.get("next_page_token")
else:
page_break = True
if result["results"]:
for rrr in result["results"]:
logger.info("searching... {}".format(rrr["name"]))
if rrr["name"].find(shop.name) != -1:
logger.info("FOUND. name=" + shop.name)
rr = rrr
break
if rr or page_break:
break
if not tmp_r and result["results"]:
tmp_r = result["results"][0]
if not rr:
logger.info("NOT FOUND. name=" + shop.name)
rr = tmp_r
shop.json = json.dumps(rr, ensure_ascii=False)
shop.place_id = rr["place_id"]
shop.lat_google = rr["geometry"]["location"]["lat"]
shop.lng_google = rr["geometry"]["location"]["lng"]
return result
def place_text_search(params):
"""
テキスト検索を行います.
"""
url = "https://maps.googleapis.com/maps/api/place/textsearch/json"
params["key"] = _get_apikey()
result = requests.get(url, params=params).json()
# エラーチェック
if result.get("status") == "OVER_QUERY_LIMIT":
raise Exception(json.dumps(result, ensure_ascii=False))
# ページング(あれば)
if result.get("next_page_token"):
pagetoken = result.get("next_page_token")
# 最大10回
for i in range(10):
logger.info("ページング:{}".format(i+2))
# APIの準備が整うまでちょっと待つ
# time.sleep(5)
params = {
"key" : _get_apikey(),
"pagetoken" : pagetoken
}
tmp_result = requests.get(url, params=params).json()
# エラーチェック
if result.get("status") == "OVER_QUERY_LIMIT":
raise Exception(json.dumps(result, ensure_ascii=False))
# 結果を追加
result["results"] += tmp_result["results"]
# ページトークンの取得
if not tmp_result.get("next_page_token"):
break
else:
# ページトークンを更新.
pagetoken = tmp_result.get("next_page_token")
return result
def add_place_detail_2(shop, keyword, must_words=[], use_geo=True):
"""
Placeテキスト検索リクエストを用いて、場所の詳細を取得します.
https://developers.google.com/places/web-service/search?hl=ja#TextSearchRequests
"""
params = {
"query" : keyword,
"language" : 'ja'
}
if use_geo:
params["location"] = "{},{}".format(shop.lat_address, shop.lng_address)
params["radius"] = 5000
result = place_text_search(params)
# エラーチェック
if result.get("status") == "OVER_QUERY_LIMIT":
raise Exception(json.dumps(result, ensure_ascii=False))
# データを取得
if result["results"]:
candidates = []
for rr in result["results"]:
# 必須ワード指定がある場合には、それが含まれるもののみを対象とする.
if must_words:
if not contains_word(rr["name"], must_words+[keyword]):
continue
else:
candidates.append(rr)
# 2件以上の結果がある場合は、APIも迷っているので、それを残しておく.
if len(candidates) >= 2:
shop.ambiguous = json.dumps(result, ensure_ascii=False)
# 対象を1つ決める.
r = get_one_from_candidates2(shop, candidates)
if not r:
logger.warn("Placeが見つかりません。 name={}".format(shop.name))
shop.json = None
shop.place_id = None
shop.lat_google = None
shop.lng_google = None
return result
# JSON
shop.json = json.dumps(r, ensure_ascii=False)
# PlaceID
shop.place_id = r["place_id"]
# 緯度経度
shop.lat_google = r["geometry"]["location"]["lat"]
shop.lng_google = r["geometry"]["location"]["lng"]
# 結果表示(問題がある場合)
if not compare_strings(keyword, r["name"]) or shop.error_meters >= 200:
logger.warn("違うかもしれません.\n\tshop.name={}\n\tr.name={}\n\tdistance={}".format(keyword, r["name"], shop.error_meters))
# 返却
return result
def contains_word(s, words):
for w in words:
if s.find(w) != -1:
return True
return False
def get_one_from_candidates2(shop, candidates):
"""
候補から、最も有力なものを選びます.
"""
# 距離 <= 5km
tmp_candidates = []
for c in candidates:
distance = calc_distance(
shop.lat_address,
shop.lng_address,
c["geometry"]["location"]["lat"],
c["geometry"]["location"]["lng"]
)
if distance != None and distance <= 5000:
tmp_candidates.append(c)
else:
logger.info("距離>5kmのため除去。name={}, distance={}".format(c["name"], int(distance) if distance else -1))
candidates = tmp_candidates
# 住所らしきものは排除.
tmp_candidates = []
for c in candidates:
# NG例:婦中町笹倉489−2
name = zenkaku_to_hankaku(c["name"]).replace("ー", "-").replace("丁目", "-").replace("番地", "-")
if not re.search("\d+-\d+$", name):
tmp_candidates.append(c)
else:
logger.info("住所プレイスのため除去。 name={}".format(c["name"]))
candidates = tmp_candidates
# 名前が一致するものがあれば、それを返す.
for c in candidates:
if compare_strings(shop.name, c["name"]):
return c
# APIが有力候補と考えいるもの(1件目)を返す.
return candidates[0] if candidates else None
def get_one_from_candidates(lat, lng, results):
"""
候補から最も有力と思われるものを選びます.
"""
results = sorted(results, key=lambda r:calc_distance(lat, lng, r["geometry"]["location"]["lat"], r["geometry"]["location"]["lng"]))
# results = sorted(results, key=lambda r:Levenshtein.distance(keyword, r["name"]))
# debug
for r in results:
print("距離:", r["name"], calc_distance(lat, lng, r["geometry"]["location"]["lat"], r["geometry"]["location"]["lng"]))
return results[0]
def compare_strings(s1, s2, must_words=[]):
"""
s1 が s2 に含まれていれば True を返却します.
"""
for w in must_words:
s1 = s1.replace(w, "")
s2 = s2.replace(w, "")
s1 = s1.replace("株式会社", "")
s1 = s1.replace("(株)", "")
s1 = s1.replace("(株)", "")
s1 = s1.replace("有限会社", "")
s1 = s1.replace("(有)", "")
s1 = s1.replace("(有)", "")
s2 = s2.replace("株式会社", "")
s2 = s2.replace("(株)", "")
s2 = s2.replace("(株)", "")
s2 = s2.replace("有限会社", "")
s2 = s2.replace("(有)", "")
s2 = s2.replace("(有)", "")
s1 = compare_string(s1)
s2 = compare_string(s2)
# 検証1:含まれるか
if s1.find(s2) != -1:
return True
# 検証2:編集距離
return Levenshtein.distance(s1, s2) <= 2
address_list = None
def get_address_list():
global address_list
if address_list:
return address_list
address_list = []
with open("./dist/zenkoku2.csv", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter=',', quotechar='"')
for i, row in enumerate(reader):
# ヘッダーはスキップ
if i == 0:
continue
address_list.append({
"pref" : row[7],
"city" : row[9]
})
return address_list
def add_prefecture_and_city(shop):
address = re.sub("^〒\d{3}-\d{4}", "", shop.address).replace(" ", "")
address = address.replace(" ", "").replace(" ", "")
for a in get_address_list():
if address.find(a["pref"]) != -1 and address.find(a["city"]) != -1:
shop.prefecture = a["pref"]
shop.city = a["city"]
return
# 2つ目の手段:リバースジオ
print("RevGeo:", shop.name)
pref, city, address = reverse_geocoding2(shop.lat_google, shop.lng_google)
if not pref or not city:
logger.warn("住所が特定できません. \naddress={}".format(address))
shop.prefecture = pref
shop.city = city
shop.address = address
# 郵便番号データを保持する
post_code_map = None
def get_post_code(address):
"""
住所から郵便番号を取得します.
"""
# 住所整形
address = address.replace("山武郡大網白里町", "大網白里市")
address = address.replace("黒川郡富谷町", "富谷市")
address = address.replace("南埼玉郡白岡町", "白岡市")
global post_code_map
if not post_code_map:
load_post_code_data()
# 最長マッチングで、郵便番号を検索
for key in post_code_map.keys():
if address.find("".join(list(key))) == 0:
return post_code_map[key], key[0], key[1]
# 特別対応
if address == "長野県長野都市計画事業水沢上庭土地区画整理事業34街区1":
return "388-8019", "長野県", "長野都市計画事業水沢上庭土地区画整理事業34街区1"
elif address == "千葉県流山市鰭ヶ崎地区一体型特定土地区画整理事業23番街区1":
return "270-0156", "千葉県", "流山市"
elif address == "茨城県上菅谷駅前地区土地区画整理事業18街区7":
return "311-0105", "茨城県", "那珂市"
elif address == "青森県八戸市都市計画事業土地区画整理事業保留地61街区5号":
return "031-0011", "青森県", "八戸市"
elif address == "宮城県女川町被災市街地復興土地区画整理事業A0-15-1":
return "986-2200", "宮城県", "女川町"
elif address.find("東京都三宅村") == 0:
return "100-1212", "東京都", "三宅村"
# 見つからなかった.
return None, None, None
def load_post_code_data():
"""
郵便番号データを読み込みます.
"""
logger.info("郵便番号データを読み込み中..")
global post_code_map
data = []
with open("dist/zenkoku2.csv", newline="") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
# 事業所や廃止データは対象外
# if row["事業所フラグ"] == "1" or row["廃止フラグ"] == "1":
if row["事業所フラグ"] == "1":
continue
data.append(row)
# 最長マッチングをしたいので、長い住所になるように並び替え
data.sort(reverse=True, key=lambda d:d["都道府県"]+d["市区町村"]+d["町域"]+d["字丁目"])
# 郵便番号mapを作成
data = [((d["都道府県"],d["市区町村"],d["町域"],d["字丁目"]), d["郵便番号"]) for d in data]
post_code_map = OrderedDict(data)
prefs = None
def load_prefecture():
"""
都道府県データを読み込みます.
"""
global prefs
if not prefs:
with open("dist/prefs2.csv", newline="") as f:
reader = csv.DictReader(f, delimiter=",")
prefs = [row for row in reader]
return prefs
def normalize_address(address):
"""
都道府県が最初にない場合に、それを付与する.
"""
global post_code_map, prefs
if not post_code_map:
load_post_code_data()
if not prefs:
load_prefecture()
# 都道府県から始まる住所かをチェックします.
r = [pref for pref in prefs if address.find(pref["name"]) == 0]
if r:
# OK
return address
# 市町村から始まる場合には、都道府県を逆引きします.
r = [key[0] + address for key in post_code_map.keys() if address.find(key[1]) == 0]
if r:
return r[0]
# for key in post_code_map.keys():
# if address.find(key[1]) == 0:
# return key[0] + address
# 特別対応
if address.find("長野都市計画事業") == 0:
return "長野県" + address
elif address == "鰭ヶ崎地区一体型特定土地区画整理事業23番街区1":
return "千葉県流山市" + address
elif address.find("女川町") == 0:
return "宮城県牡鹿郡" + address
elif address.find("南三陸町") == 0:
return "宮城県本吉郡" + address
# 見つからない場合
raise Exception("都道府県を特定できません。address={}".format(address))
def remove_building_name(address):
"""
住所からビル名を取り除く対応
"""
# 全角から半角へ統一.
address = zenkaku_to_hankaku(address)
# x丁目を変換
matches = re.search("^(.*?)(\d+)丁目(.*)$", address)
if matches:
address = matches[1] + matches[2] + '-' + matches[3]
# x番を変換
matches = re.search("^(.*?)(\d+)番地?の?(.*)$", address)
if matches:
address = matches[1] + matches[2] + '-' + matches[3]
# x号を変換
matches = re.search("^(.*?)(\d+)号(.*)$", address)
if matches:
address = matches[1] + matches[2] + matches[3]
# ビル名を取り除く対応(111-222-333)
matches = re.search("^(.*?)(\d+-\d+-\d+)(.*)$", address)
if matches:
# print("v1")
# print("全体:", matches[0])
# print("住所1:", matches[1])
# print("住所2:", matches[2])
# print("ビル名:", matches[3])
address = matches[1] + matches[2]
else:
# ビル名を取り除く対応2(111-222)
matches = re.search("^(.*?)(\d+-\d+)(.*)$", address)
if matches:
# print("v2")
# print("全体:", matches[0])
# print("住所1:", matches[1])
# print("住所2:", matches[2])
# print("ビル名:", matches[3])
address = matches[1] + matches[2]
else:
# 札幌市の特別対応
prefix = ""
matches = re.search("^(北海道札幌市.*?区.*?\d+条)", address)
if matches:
prefix = matches[1]
address = address.replace(prefix, "")
# ビル名を取り除く対応3(111)
matches = re.search("^(.*?)(\d+)(.*)$", address)
if matches:
# print("v3")
# print("全体:", prefix + matches[0])
# print("住所1:", prefix + matches[1])
# print("住所2:", matches[2])
# print("ビル名:", matches[3])
address = prefix + matches[1] + matches[2]
return address
def get_placelist():
"""
住所一覧を返却します(ジェネレーター)
"""
for pref in load_prefecture():
yield get_placelist_by_pref(pref["name"])
def get_placelist_by_pref(pref_name):
"""
都道府県ごとに、場所一覧を返却します(ジェネレーター).
"""
pref_name = re.sub("(都|府|県)$", "", pref_name)
with open("dist/placelist_{}.csv".format(pref_name), newline="") as f:
reader = csv.reader(f, delimiter=",", quotechar='"')
for row in reader:
yield row
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment