Skip to content

Instantly share code, notes, and snippets.

@zhehaowang
Created December 26, 2020 15:25
Show Gist options
  • Save zhehaowang/d94ec94cc184b90a4601ab0d890ae34e to your computer and use it in GitHub Desktop.
Save zhehaowang/d94ec94cc184b90a4601ab0d890ae34e to your computer and use it in GitHub Desktop.
annotate communes with poi
#!/usr/bin/env python3
import argparse
import glob
import json
import requests
import os
from json.decoder import JSONDecodeError
# import hack
import sys
from pathlib import Path
file = Path(__file__).resolve()
parent, root = file.parent, file.parents[1]
sys.path.append(str(root))
try:
sys.path.remove(str(parent))
except ValueError: # Already removed
pass
from lianjia.driver_util import populate_level_one_district_mapping
class BaiduQueryError(Exception):
def __init__(self, message):
super().__init__(message)
def parse_args():
parser = argparse.ArgumentParser(
"""
Given communities, find what POI (e.g. train stations are nearby).
example usage:
./commune_annotator.py --dryrun --only 石景山 --glob "./test/communes/*.json" --join
"""
)
parser.add_argument(
"--geocache",
help="when specified, load up a cache with geocode results",
default="test/meta/geocache.json",
)
parser.add_argument(
"--meta", help="districts metadata file", default="test/meta/districts.json"
)
parser.add_argument("--only", help="only districts matching")
parser.add_argument(
"--glob",
help="the profile files (json) glob",
default="test/communes/*.json",
required=True,
)
parser.add_argument(
"--dryrun",
help="print but do not request anything",
default=False,
action="store_true",
)
parser.add_argument(
"--cred", help="credentials file", default="credentials/baidu.key"
)
parser.add_argument(
"--join",
help="join mode. given a cache and operate on the files",
default=False,
action="store_true",
)
args = parser.parse_args()
return args
class GeoFetcher:
def __init__(self, geocache, baidu_key, dryrun):
self.baidu_key = baidu_key
self.dryrun = dryrun
self.geocache = {}
self.geocache_file = geocache
self.api_request_cnt = 0
if geocache and os.path.exists(geocache):
with open(geocache, "r") as geocache_in:
self.geocache = json.loads(geocache_in.read())
def __del__(self):
if self.geocache_file:
with open(self.geocache_file, "w") as geocache_out:
geocache_out.write(json.dumps(self.geocache, ensure_ascii=False))
print(f"total api request made: {self.api_request_cnt}")
def _parse_baidu_response(self, response, error_key):
res = {}
if response.status_code != 200:
raise BaiduQueryError(f"{error_key} error response {response.status_code}")
try:
res = json.loads(response.text)
except JSONDecodeError:
raise BaiduQueryError(f"{error_key} respones decode error {response.text}")
if res["status"] != 0:
raise BaiduQueryError(
f"{error_key} response unexpected status {response.text}"
)
if not "result" in res and not "results" in res:
raise BaiduQueryError(f"{error_key} response no result {response.text}")
return res["results"] if "results" in res else res["result"]
def get_from_cache(self, commune_name):
if commune_name in self.geocache:
return self.geocache[commune_name]
else:
return {}
def fetch_commune_geocode(self, commune_name):
if commune_name in self.geocache:
return self.geocache[commune_name]
geocode_url = f"http://api.map.baidu.com/geocoding/v3/?address={commune_name}&output=json&ak={self.baidu_key}"
if self.dryrun:
print(geocode_url)
return {}
geocode_response = requests.get(geocode_url)
self.api_request_cnt += 1
try:
res = self._parse_baidu_response(geocode_response, "geocode")
if "location" not in res or "lat" not in res["location"]:
print(f"geocode response invalid {geocode_url}")
# don't cache errors
return {}
if self.geocache is not None:
self.geocache[commune_name] = res
return res
except BaiduQueryError as e:
print(str(e))
return {}
def fetch_commune_poi(self, commune_name, radius, query):
cache_key = f"{query}_{radius}"
if commune_name in self.geocache:
if cache_key in self.geocache[commune_name]:
return self.geocache[commune_name][cache_key]
res = {}
geocode = self.fetch_commune_geocode(commune_name)
if not geocode:
print("cannot get poi for failed geocode request")
return res
lat = geocode["location"]["lat"]
lng = geocode["location"]["lng"]
poi_url = f"http://api.map.baidu.com/place/v2/search?query={query}&location={lat},{lng}&output=json&ak={self.baidu_key}&radius={radius}"
if self.dryrun:
print(poi_url)
return res
poi_response = requests.get(poi_url)
self.api_request_cnt += 1
try:
res = self._parse_baidu_response(poi_response, "poi")
if len(res) == 0:
# empty response should cached, error responses should not be
print(f"empty poi response {poi_url}")
if self.geocache is not None:
self.geocache[commune_name][cache_key] = res
except BaiduQueryError as e:
print(e)
return res
def purge_miscached(self):
for query_text in self.geocache:
if (
"地铁站_1000" in self.geocache[query_text]
and "status" in self.geocache[query_text]["地铁站_1000"]
and self.geocache[query_text]["地铁站_1000"]["status"] == 302
):
print(f"miscached {query_text}")
del self.geocache[query_text]["地铁站_1000"]
def get_key(credential_file):
with open(credential_file, "r") as rfile:
return rfile.read()
def main():
args = parse_args()
commune_mapping = populate_level_one_district_mapping(args.meta)
geo = GeoFetcher(args.geocache, get_key(args.cred), args.dryrun)
files = glob.glob(args.glob)
geo.purge_miscached()
for f in files:
commune = {}
commune_name = ""
with open(f, "r") as infile:
commune = json.loads(infile.read())
if not ("commune_name" in commune and commune["commune_name"]):
print(f"commune_name not present in {f}")
continue
if not ("district" in commune and commune["district"]):
print(f"district not present in {f}")
continue
district_name = ""
if commune["district"] in commune_mapping:
district_name = commune_mapping[commune["district"]]
else:
print(f"district {commune['district']} does not have a commune meta")
district_name_sanitized = district_name.strip("区")
if args.only:
only_districts = args.only.split(",")
if district_name_sanitized not in only_districts:
continue
commune_name = f"北京市{district_name_sanitized}区{commune['commune_name']}"
geo.fetch_commune_poi(commune_name, 1000, "地铁站")
if args.join:
geo_info = geo.get_from_cache(commune_name)
if geo_info:
with open(f, "w") as wfile:
commune["geo"] = geo_info
commune["query_text"] = commune_name
wfile.write(json.dumps(commune, indent=4, ensure_ascii=False))
return
if __name__ == "__main__":
main()
@zhehaowang
Copy link
Author

line 117: given name, return baidu lat, lng.

line 151: given baidu lat, lng, return "地铁站" within 1000m.

line 90: parse response.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment