Skip to content

Instantly share code, notes, and snippets.

@giohappy
Created June 16, 2017 16:44
Show Gist options
  • Save giohappy/9586e201dcd46d67b527d4452aeb5251 to your computer and use it in GitHub Desktop.
Save giohappy/9586e201dcd46d67b527d4452aeb5251 to your computer and use it in GitHub Desktop.
mapblender.py
#!/usr/bin/env python
import os
import sys
from optparse import OptionParser
from collections import OrderedDict
from osgeo import gdal
import numpy as np
from PIL import Image
from matplotlib.colors import LinearSegmentedColormap
RED = "\033[1;31m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
def rgb_to_hsv(rgb):
rgb = rgb.astype('float')
hsv = np.zeros_like(rgb)
hsv[..., 3:] = rgb[..., 3:]
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2]
maxc = np.max(rgb[..., :3], axis=-1)
minc = np.min(rgb[..., :3], axis=-1)
hsv[..., 2] = maxc
mask = maxc != minc
hsv[mask, 1] = (maxc - minc)[mask] / maxc[mask]
rc = np.zeros_like(r)
gc = np.zeros_like(g)
bc = np.zeros_like(b)
rc[mask] = (maxc - r)[mask] / (maxc - minc)[mask]
gc[mask] = (maxc - g)[mask] / (maxc - minc)[mask]
bc[mask] = (maxc - b)[mask] / (maxc - minc)[mask]
hsv[..., 0] = np.select(
[r == maxc, g == maxc], [bc - gc, 2.0 + rc - bc], default=4.0 + gc - rc)
hsv[..., 0] = (hsv[..., 0] / 6.0) % 1.0
return hsv
def hsv_to_rgb(hsv):
rgb = np.empty_like(hsv)
rgb[..., 3:] = hsv[..., 3:]
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2]
i = (h * 6.0).astype('uint8')
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
i = i % 6
conditions = [s == 0.0, i == 1, i == 2, i == 3, i == 4, i == 5]
rgb[..., 0] = np.select(conditions, [v, q, p, p, t, v], default=v)
rgb[..., 1] = np.select(conditions, [v, v, v, q, p, p], default=t)
rgb[..., 2] = np.select(conditions, [v, p, t, v, v, q], default=p)
return rgb.astype('uint8')
stretch = False
stretch_suf = "_stretch" if stretch else ""
invert = False
satclasses1 = OrderedDict([(0.05, 1.0), (0.20, 0.75), (0.50, 0.5), (1.0, 0.25)])
satclasses2 = OrderedDict([(0.05, 1.0), (0.20, 0.70), (0.50, 0.40), (1.0, 0.10)])
satclasses3 = OrderedDict([(0.05, 1.0), (0.20, 0.75), (0.50, 0.5), (1.0, 0.35)])
satclasses = satclasses3
def run(options):
raster_max = options.raster_max
raster_median = options.raster_median
output = options.output
debug = options.debug
def valuescalculator(minval, maxval, stretch=True, rmin=0, rmax=1, invert=False, classes=None):
if classes is not None:
maxclass = classes.keys()[-1]
if stretch:
def normalize(x):
return ((x - minval) / (maxval - minval)) * (rmax - rmin) + rmin
else:
def normalize(x):
return ((rmax - rmin) * x) + rmin
if invert:
def classify(out):
return (rmax - out) + rmin
elif classes:
def classify(out):
try:
if out < classes.keys()[-1]:
return next(classes.values()[x[0]] for x in enumerate(classes.keys()) if x[1] > out)
else:
return classes.values()[-1]
except StopIteration, e:
return 1.0
else:
def classify(out):
return out
def fnc(x):
x = x if x > 0 else 0.0
out = normalize(x)
return classify(out)
return fnc
ds_max = gdal.Open(raster_max)
maxval_arr = np.array(ds_max.GetRasterBand(1).ReadAsArray())
geotransform = ds_max.GetGeoTransform()
proj = ds_max.GetProjection()
del ds_max
maxval_arr_nonan = np.nan_to_num(maxval_arr)
maxval_arr_nonan[maxval_arr_nonan < 0] = 0
ds_50p = gdal.Open(raster_median)
medianval_arr = np.array(ds_50p.GetRasterBand(1).ReadAsArray())
del ds_50p
medianval_arr_nonan = np.nan_to_num(medianval_arr)
medianval_arr_nonan[medianval_arr_nonan < 0] = 0
modulation_array = maxval_arr_nonan - medianval_arr_nonan
modulation_array = np.ma.array(modulation_array, mask=np.isnan(modulation_array))
modulation_minval = np.nanmin(modulation_array)
modulation_maxval = np.nanmax(modulation_array)
max_arr_masked = np.ma.array(maxval_arr, mask=np.isnan(maxval_arr))
max_maxval = np.nanmax(max_arr_masked)
max_minval = np.nanmin(max_arr_masked)
normmaxfnc = np.vectorize(valuescalculator(max_minval, max_maxval))
max_arr_normalized = normmaxfnc(max_arr_masked)
cdict = {'red': ((0.0, 1.0, 1.0),
(1.0, 1.0, 1.0)),
'green': ((0.0, 1.0, 1.0),
(1.0, 0.0, 0.0)),
'blue': ((0.0, 0.0, 0.0),
(1.0, 0.0, 0.0))
}
cmap = LinearSegmentedColormap('YellowRed', cdict)
cmap.set_bad('black')
max_img = cmap(max_arr_normalized, bytes=True)
if debug:
Image.fromarray(max_img, 'RGBA').save(os.path.join("./max.png"))
max_hsv = rgb_to_hsv(max_img)
rgbshape = (maxval_arr.shape[0], maxval_arr.shape[1], 3)
modulation_hsv = np.zeros(rgbshape, dtype="float32")
modulation_hsv[..., 0] = 180 / 360.
nomrfnc = np.vectorize(valuescalculator(modulation_minval, modulation_maxval, stretch, classes=satclasses))
valuesarray = nomrfnc(modulation_array)
modulation_hsv[..., 1] = valuesarray
modulation_hsv[..., 2] = 255.
max_hsv[..., 1] = valuesarray
modulated_rgb = hsv_to_rgb(max_hsv)
nx = maxval_arr.shape[0]
ny = maxval_arr.shape[1]
if os.path.exists(output):
os.remove(output)
try:
dst_ds = gdal.GetDriverByName('GTiff').Create(output, ny, nx, 3, gdal.GDT_Byte)
dst_ds.SetGeoTransform(geotransform)
dst_ds.SetProjection(proj)
dst_ds.GetRasterBand(1).WriteArray(modulated_rgb[..., 0])
dst_ds.GetRasterBand(2).WriteArray(modulated_rgb[..., 1])
dst_ds.GetRasterBand(3).WriteArray(modulated_rgb[..., 2])
dst_ds.FlushCache()
except:
sys.stdout.write(RED)
print "ERROR: Error writing output image {}".format(output)
if dst_ds:
dst_ds = None
exit(1)
if debug:
modulated_img = Image.fromarray(modulated_rgb)
modulated_img.save(output+'.png')
sys.stdout.write(GREEN)
print "Process completed. Output image written to {}".format(output)
fileoptions = ['raster_max','raster_median']
parser = OptionParser()
parser.add_option("-M", "--max", dest="raster_max",
help="path to max raster FILE", metavar="FILE")
parser.add_option("-m", "--median", dest="raster_median",
help="path to median raster FILE", metavar="FILE")
parser.add_option("-o", "--output", dest="output",
help="path to output raster FILE (Geotiff)", metavar="FILE", default="./output.tiff")
parser.add_option("-d", "--debug", dest="debug", action="store_true",
help="write PNG images for visual inspection", metavar="FILE", default=False)
if __name__ == '__main__':
(options, args) = parser.parse_args()
error = False
errors = []
if not options.raster_max:
errors.append("MAX raster file required (-M)")
error = True
if not options.raster_median:
errors.append("MEDIAN raster file required (-m)")
error = True
if not error:
options_dict = vars(options)
for fileoption in fileoptions:
if not os.path.exists(options_dict[fileoption]):
errors.append("{} file doesn't exist".format(options_dict[fileoption]))
error = True
if error:
sys.stdout.write(RED)
print "ERROR:"
for e in errors:
print e
print
sys.stdout.write(RESET)
print parser.print_help()
exit(1)
run(options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment