Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@sharifulgeo
Forked from xydrolase/cyrides-utils.py
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharifulgeo/2b48eda5c7f5d4b3e5e0 to your computer and use it in GitHub Desktop.
Save sharifulgeo/2b48eda5c7f5d4b3e5e0 to your computer and use it in GitHub Desktop.
Sample code for retrieving all bus stops using Google's webservice. Theoretically, one could extract all venues within proximity of a given location.
#!/usr/bin/env python
import json
import re
import urllib
import random
import itertools
import time
import sys
class GMapsBusQuery:
def __init__(self):
self.bounded = False
self.boundbox = []
self.zoomlevel = 0
self.busstops = {}
def bound_by_tile(self, bb, zoom):
if not len(bb) == 4:
return None
self.boundbox = bb
self.zoomlevel = zoom
self.bounded = True
def generate_callback(self):
"""Generate a random callback hash (for simulating "real" request
purpose)"""
return ''.join([chr(97+x) if x < 26 else chr(48+x-26)
for x in [random.randint(0, 35) for i in range(9)]])
def geohash_to_tile(self, hash):
"""Extract the tile number (x, y) from the 4-radix "geospatial hash" (kind
of) representataion.
Each 4-base digit is encoded with characters 't', 'u', 'v' ,'w',
corresponding to 0, 1, 2, 3 respetively.
Consider each digit within the 4-radix number as two bits in binary. The
higher bit maps to x, while the lower bit maps to y.
"""
exponent = len(hash)-1
bits = map(lambda x: ord(x) - 116, list(hash))
x = y = 0
for idx in range(len(bits)):
shift = exponent-idx
x += ((bits[idx] & 2) >> 1) << shift
y += (bits[idx] & 1) << shift
return x, y
def tile_to_geohash(self, x, y, zoom=None):
if zoom == None:
zoom = self.zoomlevel
x_bits = map(lambda b: int(b), list(bin(x)[2:]))
y_bits = map(lambda b: int(b), list(bin(y)[2:]))
x_bits = [0] * (zoom - len(x_bits)) + x_bits
y_bits = [0] * (zoom - len(y_bits)) + y_bits
return ''.join([chr(((bx << 1) | by) + 116)
for bx, by in zip(x_bits, y_bits)])
def query_landmark(self, tx, ty, step):
# Find all possible combinations of tile (x, y), convert them into
# service compatible tile hashes.
tile_hash = [self.tile_to_geohash(x, y)
for x, y in itertools.product(
range(tx, tx+step if tx+step < self.boundbox[2] + 2 else
self.boundbox[2] + 1),
range(ty, ty+step if ty+step < self.boundbox[3] + 2 else
self.boundbox[3] + 1)
)]
url_params = urllib.urlencode(
{
'lyrs': 'm@140', # default layer
'las': ','.join(tile_hash), # tiles to query
'gl': 'us',
'hl': 'en',
'xc': 1,
'z': self.zoomlevel,
'opts': 'z',
'callback': '_xdc_._%s' % self.generate_callback()
})
# Randomly balance requests to different servers.
url = 'http://mt%(server)d.google.com/vt/ft?%(param)s' % \
({
'server': random.randint(0, 1),
'param': url_params
})
try:
f = urllib.urlopen(url)
assert f.getcode() == 200
return f.read()
except Exception:
return None
def extract_bus_stops(self, response):
"""Find all bus stops within response from API service:
http://mt0.google.com/vt/ft?lyrs=...&las=...
We could identify bus stop metadata by recognizing its unique (or not?)
bounding box [-7, -7, 6, 6]."""
# Seems to be response w/o useful data
if response.find('features') == -1:
return None
# Some cleaning before JSON library could start its job.
json_raw = re.sub(r'([{|,])([a-z_]+):', r'\1"\2":',
response[response.find('(') + 1:response.rfind(')')])
json_raw = re.sub(r'\"c\":\"\{1:\{(.+?)\}\}"', r'"c":{\1}', json_raw)
json_raw = json_raw.replace(r'\"', '"')
try:
# Find out all nodes with "features" list.
metadata = filter(lambda d: 'features' in d, json.loads(json_raw))
for node in metadata:
for feat in node['features']:
# Identify bus stop by its bounding box (might has problems?)
if feat['bb'] == [-7, -7, 6, 6] and \
feat['id'] not in self.busstops:
self.busstops.setdefault(feat['id'], {})
self.busstops[feat['id']]['caption'] = feat['c']['title']
except ValueError:
return False
def update_stops_detail(self, pause=0.5):
convert_chars = lambda match: chr(int(match.group(1), 16)) \
if match.group(1) else match.group(0)
for id in self.busstops.iterkeys():
print 'Update ', self.busstops[id]['caption']
# Construct URL request
url_params = {
'ftid': id,
'lyr' : 'm@140', # default layer
'iwp' : 'maps_app',
'callback' : '_xdc_._%s' % self.generate_callback
}
uri = "http://maps.google.com/maps/iw?%s" % \
urllib.urlencode(url_params)
# Convert all \x[a-z0-9]{2} representation into characters.
try:
f = urllib.urlopen(uri)
assert f.getcode() == 200
response = re.sub(
r'\\x(\w{2})',
convert_chars,
f.read())
self.update_stop(id, response)
time.sleep(pause)
except Exception:
continue
def update_stop(self, id, response):
bus_node = self.busstops[id]
meta = json.loads(response)
bus_node['latlng'] = (meta['latlng']['lat'], meta['latlng']['lng'])
bus_node['name'] = meta['name']
if 'infoWindow' in meta:
info = meta['infoWindow']
schedule = info['transitSchedules']['stationSchedules']
bus_node['agency'] = schedule['agencies'][0]['agency_name']
bus_node['lines'] = []
lines = schedule['line_groups'][0]['lines']
for line in lines:
line_node = {}
line_node['name'] = line['name']
line_node['color'] = line['backgroundColor']
bus_node['lines'].append(line_node)
def run(self, step=3, pause=0.5):
if not self.bounded:
return None
for tilex in range(self.boundbox[0], self.boundbox[2] + 1, step):
for tiley in range(self.boundbox[1], self.boundbox[3] + 1, step):
response = self.query_landmark(tilex, tiley, step)
if response:
self.extract_bus_stops(response)
time.sleep(pause) # stop for a short while
self.update_stops_detail(pause)
return self.busstops
def main():
busq = GMapsBusQuery()
#busq.bound_by_tile([31451, 48646, 31454, 48667], 17)
fd = open('cyride_stops.json', 'r')
busq.busstops = json.load(fd)
busq.update_stops_detail()
fd.close()
fd = open("cyride_full_stops.json", 'w')
json.dump(busq.busstops, fd)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment