Created
June 24, 2013 22:01
-
-
Save jessedhillon/5854039 to your computer and use it in GitHub Desktop.
Distance-decayed weighted average heatmaps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cStringIO import StringIO | |
import Image | |
import logging | |
import math | |
import numpy as np | |
import geoalchemy.functions as functions | |
from geoalchemy import WKTSpatialElement | |
from project.lib.models import Listing | |
import project.lib.util as util | |
logger = logging.getLogger('project') | |
def make_gradient(values): | |
colors = ( | |
(0, 43, 255, 255), | |
(0, 86, 255, 255), | |
(0, 127, 255, 255), | |
(0, 171, 255, 255), | |
(0, 214, 255, 255), | |
(0, 255, 255, 255), | |
(0, 255, 127, 255), | |
(0, 255, 0, 255), | |
(127, 155, 0, 255), | |
(255, 255, 0, 255), | |
(255 ,213, 0, 255), | |
(255, 171, 0, 255), | |
(255, 127, 0, 255), | |
(255, 86, 0, 255), | |
(255, 43, 0, 255), | |
(255, 0, 0, 255), | |
) | |
stops = zip(map(int, np.linspace(values[0], values[-1], len(colors))), colors) | |
def lerp(a, b, d): | |
l = [] | |
for i in range(len(a)): | |
s = float(a[i]) + (float(b[i] - a[i]) * d) | |
l.append(int(round(s))) #int(round(s / 2.0))) | |
return tuple(l) | |
gradient = {} | |
# first floor is 0 | |
floor = (0, (colors[0][0], colors[0][1], colors[0][2], 0)) | |
for stop, color in stops: | |
for i in range(floor[0], stop): | |
d = float(i - floor[0]) / stop | |
gradient[i] = lerp(floor[1], color, d) | |
floor = stop, color | |
return gradient | |
def color(v, gradient): | |
if v is None: | |
return (255, 255, 255, 0) | |
try: | |
v = int(round(v)) | |
if v in gradient: | |
return gradient[v] | |
else: | |
m = max(gradient.keys()) | |
if v >= m: | |
return gradient[m] | |
return gradient[min(gradient.keys())] | |
except ValueError as e: | |
logger.warn("Value conversion failure: {0!r}, {1!s}".format(v, e)) | |
return (255, 255, 255, 0) | |
def generate_heatmap(geo, listing_geos, listing_prices, threshold, gradient, size=(256, 256)): | |
n, e, w, s = geo | |
image = Image.new('RGBA', (256, 256)) | |
width = abs(e-w) | |
height = abs(n-s) | |
pixels = np.array([(x, y) for y in range(size[0]) for x in range(size[1])]) | |
lons = w + np.arange(size[0]) / (float(size[0]) - 1) * width | |
lats = n - np.arange(size[1]) / (float(size[1]) - 1) * height | |
geos = np.array([(lon, lat) for lat in lats for lon in lons]) | |
distances = util.haversine(listing_geos, geos) | |
inverse_d = [] | |
for i, d in enumerate(distances): | |
e = 1.0/d | |
np.putmask(e, d > threshold, [0.0]) | |
inverse_d.append(e) | |
counts = np.repeat(0.0, size[0] * size[1]) | |
denoms = np.repeat(0.0, size[0] * size[1]) | |
values = np.repeat(0.0, size[0] * size[1]) | |
for i, d in enumerate(inverse_d): | |
c = d.copy() | |
np.putmask(c, d == 0, [0]) | |
np.putmask(c, d > 0, [1]) | |
counts += c | |
denoms += d | |
values += (d * listing_prices[i]) | |
values /= denoms | |
np.putmask(values, counts == 0, [0]) | |
values.put(values == np.inf, [0]) | |
pixels = image.load() | |
for y in range(size[1]): | |
for x in range(size[0]): | |
pixels[x, y] = color(values[(y*size[1])+x], gradient) | |
return image | |
def view(context, request): | |
threshold = 1000.0 # meters | |
distance = threshold / 111111.0 # 111.111 km ~ 1' | |
lon, lat = util.tile_centroid(**{k: int(v) for k, v in request.matchdict.items()}) | |
s, w, n, e = util.tile_edges(**{k: int(v) for k, v in request.matchdict.items()}) | |
# find all properties within threshold | |
delta = abs(n - s) | |
radius = delta + (math.sqrt(2.0) * distance) | |
geom = WKTSpatialElement("POINT({0} {1})".format((e + w)/2.0, (n + s)/2.0)) | |
buff = functions.buffer(geom, radius) | |
listings = Listing.query.filter(Listing.location.within(buff)).filter(Listing.bedrooms > 0).filter(Listing.normal_price != None).all() | |
# compute the gradient | |
normal_prices = util.unique_values([int(l.normal_price) for l in Listing.query.filter(Listing.normal_price != None).order_by(Listing.normal_price.asc()).all()]) | |
l = len(normal_prices) | |
gradient = make_gradient(normal_prices[int(l/10):int(-l/10)]) | |
# generate the heatmap | |
locations = [l.location.coords for l in listings] | |
prices = [float(l.normal_price) for l in listings] | |
image = generate_heatmap((n, e, w, s), locations, prices, threshold, gradient) | |
# read out the image data | |
f = StringIO() | |
image.save(f, 'PNG') | |
f.seek(0) | |
request.response.content_type = 'image/png' | |
request.response.body = f.read() | |
return request.response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import numpy as np | |
def num_tiles(z): | |
return math.pow(2, z) | |
def lat_edges(y, z): | |
n = num_tiles(z) | |
unit = 1 / n | |
relY1 = y * unit | |
relY2 = relY1 + unit | |
lat1 = mercator_to_lat(math.pi * (1 - 2 * relY1)) | |
lat2 = mercator_to_lat(math.pi * (1 - 2 * relY2)) | |
return (lat1, lat2) | |
def lon_edges(x, z): | |
n = num_tiles(z) | |
unit = 360 / n | |
lon1 = -180 + x * unit | |
lon2 = lon1 + unit | |
return (lon1, lon2) | |
def tile_edges(xtile, ytile, zoom): | |
lat1, lat2 = lat_edges(ytile, zoom) | |
lon1, lon2 = lon_edges(xtile, zoom) | |
return (lat2, lon1, lat1, lon2) # S,W,N,E | |
def tile_centroid(xtile, ytile, zoom): | |
s, w, n, e = tile_edges(xtile, ytile, zoom) | |
return (0.5 * (w + e), 0.5 * (n + s)) | |
def mercator_to_lat(mercatorY): | |
return math.degrees(math.atan(math.sinh(mercatorY))) | |
def haversine(targets, sources): | |
distances = [] | |
for t in targets: | |
dlon = np.radians(sources[:,0]) - np.radians(t[0]) | |
dlat = np.radians(sources[:,1]) - np.radians(t[1]) | |
a = np.square(np.sin(dlat/2.0)) + math.cos(math.radians(t[1])) * np.cos(np.radians(sources[:,1])) * np.square(np.sin(dlon/2.0)) | |
c = 2 * np.arcsin(np.sqrt(a)) | |
distances.append(6367000 * c) | |
return distances | |
def unique_values(seq, idfn=None): | |
# order preserving | |
if idfn is None: | |
def idfn(x): return x | |
seen = {} | |
result = [] | |
for item in seq: | |
marker = idfn(item) | |
if marker not in seen: | |
seen.setdefault(marker, True) | |
result.append(item) | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment