Skip to content

Instantly share code, notes, and snippets.

@jessedhillon
Created June 24, 2013 22:01
Show Gist options
  • Save jessedhillon/5854039 to your computer and use it in GitHub Desktop.
Save jessedhillon/5854039 to your computer and use it in GitHub Desktop.
Distance-decayed weighted average heatmaps
from cStringIO import StringIO
import Image
import logging
import math
import numpy as np
import geoalchemy.functions as functions
from geoalchemy import WKTSpatialElement
from project.lib.models import Listing
import project.lib.util as util
logger = logging.getLogger('project')
def make_gradient(values):
colors = (
(0, 43, 255, 255),
(0, 86, 255, 255),
(0, 127, 255, 255),
(0, 171, 255, 255),
(0, 214, 255, 255),
(0, 255, 255, 255),
(0, 255, 127, 255),
(0, 255, 0, 255),
(127, 155, 0, 255),
(255, 255, 0, 255),
(255 ,213, 0, 255),
(255, 171, 0, 255),
(255, 127, 0, 255),
(255, 86, 0, 255),
(255, 43, 0, 255),
(255, 0, 0, 255),
)
stops = zip(map(int, np.linspace(values[0], values[-1], len(colors))), colors)
def lerp(a, b, d):
l = []
for i in range(len(a)):
s = float(a[i]) + (float(b[i] - a[i]) * d)
l.append(int(round(s))) #int(round(s / 2.0)))
return tuple(l)
gradient = {}
# first floor is 0
floor = (0, (colors[0][0], colors[0][1], colors[0][2], 0))
for stop, color in stops:
for i in range(floor[0], stop):
d = float(i - floor[0]) / stop
gradient[i] = lerp(floor[1], color, d)
floor = stop, color
return gradient
def color(v, gradient):
if v is None:
return (255, 255, 255, 0)
try:
v = int(round(v))
if v in gradient:
return gradient[v]
else:
m = max(gradient.keys())
if v >= m:
return gradient[m]
return gradient[min(gradient.keys())]
except ValueError as e:
logger.warn("Value conversion failure: {0!r}, {1!s}".format(v, e))
return (255, 255, 255, 0)
def generate_heatmap(geo, listing_geos, listing_prices, threshold, gradient, size=(256, 256)):
n, e, w, s = geo
image = Image.new('RGBA', (256, 256))
width = abs(e-w)
height = abs(n-s)
pixels = np.array([(x, y) for y in range(size[0]) for x in range(size[1])])
lons = w + np.arange(size[0]) / (float(size[0]) - 1) * width
lats = n - np.arange(size[1]) / (float(size[1]) - 1) * height
geos = np.array([(lon, lat) for lat in lats for lon in lons])
distances = util.haversine(listing_geos, geos)
inverse_d = []
for i, d in enumerate(distances):
e = 1.0/d
np.putmask(e, d > threshold, [0.0])
inverse_d.append(e)
counts = np.repeat(0.0, size[0] * size[1])
denoms = np.repeat(0.0, size[0] * size[1])
values = np.repeat(0.0, size[0] * size[1])
for i, d in enumerate(inverse_d):
c = d.copy()
np.putmask(c, d == 0, [0])
np.putmask(c, d > 0, [1])
counts += c
denoms += d
values += (d * listing_prices[i])
values /= denoms
np.putmask(values, counts == 0, [0])
values.put(values == np.inf, [0])
pixels = image.load()
for y in range(size[1]):
for x in range(size[0]):
pixels[x, y] = color(values[(y*size[1])+x], gradient)
return image
def view(context, request):
threshold = 1000.0 # meters
distance = threshold / 111111.0 # 111.111 km ~ 1'
lon, lat = util.tile_centroid(**{k: int(v) for k, v in request.matchdict.items()})
s, w, n, e = util.tile_edges(**{k: int(v) for k, v in request.matchdict.items()})
# find all properties within threshold
delta = abs(n - s)
radius = delta + (math.sqrt(2.0) * distance)
geom = WKTSpatialElement("POINT({0} {1})".format((e + w)/2.0, (n + s)/2.0))
buff = functions.buffer(geom, radius)
listings = Listing.query.filter(Listing.location.within(buff)).filter(Listing.bedrooms > 0).filter(Listing.normal_price != None).all()
# compute the gradient
normal_prices = util.unique_values([int(l.normal_price) for l in Listing.query.filter(Listing.normal_price != None).order_by(Listing.normal_price.asc()).all()])
l = len(normal_prices)
gradient = make_gradient(normal_prices[int(l/10):int(-l/10)])
# generate the heatmap
locations = [l.location.coords for l in listings]
prices = [float(l.normal_price) for l in listings]
image = generate_heatmap((n, e, w, s), locations, prices, threshold, gradient)
# read out the image data
f = StringIO()
image.save(f, 'PNG')
f.seek(0)
request.response.content_type = 'image/png'
request.response.body = f.read()
return request.response
import math
import numpy as np
def num_tiles(z):
return math.pow(2, z)
def lat_edges(y, z):
n = num_tiles(z)
unit = 1 / n
relY1 = y * unit
relY2 = relY1 + unit
lat1 = mercator_to_lat(math.pi * (1 - 2 * relY1))
lat2 = mercator_to_lat(math.pi * (1 - 2 * relY2))
return (lat1, lat2)
def lon_edges(x, z):
n = num_tiles(z)
unit = 360 / n
lon1 = -180 + x * unit
lon2 = lon1 + unit
return (lon1, lon2)
def tile_edges(xtile, ytile, zoom):
lat1, lat2 = lat_edges(ytile, zoom)
lon1, lon2 = lon_edges(xtile, zoom)
return (lat2, lon1, lat1, lon2) # S,W,N,E
def tile_centroid(xtile, ytile, zoom):
s, w, n, e = tile_edges(xtile, ytile, zoom)
return (0.5 * (w + e), 0.5 * (n + s))
def mercator_to_lat(mercatorY):
return math.degrees(math.atan(math.sinh(mercatorY)))
def haversine(targets, sources):
distances = []
for t in targets:
dlon = np.radians(sources[:,0]) - np.radians(t[0])
dlat = np.radians(sources[:,1]) - np.radians(t[1])
a = np.square(np.sin(dlat/2.0)) + math.cos(math.radians(t[1])) * np.cos(np.radians(sources[:,1])) * np.square(np.sin(dlon/2.0))
c = 2 * np.arcsin(np.sqrt(a))
distances.append(6367000 * c)
return distances
def unique_values(seq, idfn=None):
# order preserving
if idfn is None:
def idfn(x): return x
seen = {}
result = []
for item in seq:
marker = idfn(item)
if marker not in seen:
seen.setdefault(marker, True)
result.append(item)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment