Last active
October 13, 2016 01:43
-
-
Save TaylorMutch/fd2b0b67d0cec474fbd989546de5d3d9 to your computer and use it in GitHub Desktop.
Converts a 2D shapefile to a 3D shapefile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Converts a 2D shapefile to a 3D shapefile """ | |
import sys | |
import os | |
import struct | |
import numpy as np | |
import fiona | |
from fiona.transform import transform_geom, transform | |
from fiona.crs import from_epsg | |
from PIL import Image | |
import logging | |
class XTR(object): | |
""" Mike Bailey's extract (.xtr) format, read in to a numpy array. """ | |
def __init__(self, path): | |
#self.crs = from_epsg(4269) # NAD83 Datum / projection | |
self.crs = from_epsg(4326) # wgs84 | |
# parse .xtr file | |
with open(path, 'rb') as f_in: | |
lngs = f_in.readline() | |
lats = f_in.readline() | |
form_feed = b'\x0c' | |
f_in.seek(0) | |
raw_data = f_in.read() | |
index = 0 | |
while raw_data[index:index + 1] != form_feed: | |
index += 1 | |
index += 2 # skip form feed and comments | |
raw_data = raw_data[index:] | |
minlng, maxlng, numlngs = lngs.strip().decode('ascii').split() | |
minlat, maxlat, numlats = lats.strip().decode('ascii').split() | |
self.numlngs = int(numlngs) | |
self.numlats = int(numlats) | |
self.minlng = float(minlng) | |
self.maxlng = float(maxlng) | |
self.minlat = float(minlat) | |
self.maxlat = float(maxlat) | |
self.data = np.asarray(struct.unpack('f' * self.numlngs * self.numlats, raw_data), | |
dtype='float32').reshape((self.numlats, self.numlngs)) | |
def nn_index(self, lng, lat): | |
tx = round(((lng - self.minlng) / (self.maxlng - self.minlng)) * self.numlngs) | |
ty = round(((lat - self.minlat) / (self.maxlat - self.minlat)) * self.numlats) | |
return tx, ty | |
def nn_elevation(self, pair, orig_pair): | |
""" | |
Nearest neighbor | |
:param pair: Projected coordinates that match xtr projection | |
:param orig_pair: Un-projected coordinates of source | |
:return: Tuple with un-projected coordinates with elevation | |
""" | |
lng, lat = pair | |
# compute texel coordinates | |
''' | |
tx = ((lng - self.minlng) / (self.maxlng - self.minlng)) * self.numlngs | |
ty = ((lat - self.minlat) / (self.maxlat - self.minlat)) * self.numlats | |
# adjust to nearest neighbor | |
tx = int(tx) if tx % 1 < 0.5 else int(tx + 1) | |
ty = int(ty) if ty % 1 < 0.5 else int(ty + 1) | |
''' | |
tx, ty = self.nn_index(lng, lat) | |
z = self.data[ty][tx] # height at row ty, column tx | |
orig_x, orig_y = orig_pair # un-projected lng, lat coordinates | |
return tuple((orig_x, orig_y, z)) | |
def nn_elevation_value(self, lng, lat): | |
value_container = self.nn_elevation((lng, lat), (0,0)) | |
return value_container[2] | |
# TODO - add bicubic interpolation? | |
def squeeze_lngs(self): | |
""" sqeeze number of longitude points to match number of latitude points """ | |
square = np.array([], dtype='float32') | |
for i in range(self.numlats): | |
row = list() | |
for j in range(self.numlats): | |
idx = round((j * self.numlngs) / self.numlats) | |
row.append(self.data[i][idx]) | |
square = np.append(square, row) | |
square = np.reshape(square, (self.numlats, self.numlats)) | |
return square | |
def output_heightmap(self, path): | |
image = Image.new('L', (self.numlngs, self.numlats)) | |
minimum = self.data.min() | |
maximum = self.data.max() | |
image_data = [round((x - minimum) / (maximum - minimum) * 255) for x in self.data.ravel()] | |
image.putdata(image_data) | |
image.save(path) | |
def convert_shapefile(xtr, shp_in_path, shp_out_path): | |
""" Converts an existing 2D shapefile to a 3D shapefile adding elevation from an XTR """ | |
with fiona.open(shp_in_path, 'r') as shp_in: | |
shp_crs = shp_in.crs | |
bounds = shp_in.bounds | |
meta = shp_in.meta | |
meta['schema']['geometry'] = '3D Polygon' | |
i = 0 | |
total = len(shp_in) | |
with fiona.open(shp_out_path, 'w', **meta) as shp_out: | |
for feature in shp_in: | |
try: | |
geometry = feature['geometry'] | |
t_geometry = transform_geom(shp_crs, xtr.crs, geometry) | |
new_coordinates = list() | |
if geometry['type'] == 'Polygon': | |
for i in range(0, len(geometry['coordinates'])): | |
ring = t_geometry['coordinates'][i] | |
new_ring = list() | |
for j in range(0, len(ring)): | |
new_ring.append(xtr.nn_elevation(ring[j], geometry['coordinates'][i][j])) | |
new_coordinates.append(new_ring) | |
elif geometry['type'] == 'MultiPolygon': | |
# do each polygon in the list | |
for i in range(0, len(geometry['coordinates'])): | |
polygon = t_geometry['coordinates'][i] | |
new_polygon = list() | |
for j in range(0, len(polygon)): | |
ring = polygon[j] | |
new_ring = list() | |
for k in range(0, len(ring)): | |
new_ring.append(xtr.nn_elevation(ring[k], geometry['coordinates'][i][j][k])) | |
new_polygon.append(new_ring) | |
new_coordinates.append(new_polygon) | |
else: | |
print('Skipping feature {id}'.format(id=feature['id'])) | |
print(feature) | |
continue # skip for now - TODO - handle other geometry types | |
geometry['coordinates'] = new_coordinates | |
feature['geometry'] = geometry | |
shp_out.write(feature) | |
except Exception: | |
logging.exception("Error processing feature %s:", feature['id']) | |
return bounds, shp_crs | |
def normalize_v3(arr): | |
''' Normalize a numpy array of 3 component vectors shape=(n,3) ''' | |
lens = np.sqrt( arr[0]**2 + arr[1]**2 + arr[2]**2 ) | |
arr[0] /= lens | |
arr[1] /= lens | |
arr[2] /= lens | |
return arr | |
def create_normalmap(xtr, output_path, bounds, crs): | |
""" Create a normalmap from XTR """ | |
normal_image = Image.new('RGB', (xtr.numlats, xtr.numlats)) | |
normals = np.zeros((xtr.numlats, xtr.numlats, 3), dtype='float32') | |
left, bot, right, top = bounds | |
xs, ys = transform(crs, xtr.crs, [left,right], [bot,top]) # project to WGS 84 | |
left, right = xs | |
bot, top = ys | |
dx = (right - left) / xtr.numlats | |
dy = (top - bot) / xtr.numlats | |
total = xtr.numlats**2 | |
for i in range(xtr.numlats): | |
for j in range(xtr.numlats): | |
# unprojected center | |
center_y = top - dy * i | |
center_x = left + dx * j | |
n = center_y + dy | |
s = center_y - dy | |
e = center_x + dx | |
w = center_x - dx | |
h = [2.0, 0.0, xtr.nn_elevation_value(e, center_y) - xtr.nn_elevation_value(w, center_y)] | |
v = [0.0, 2.0, xtr.nn_elevation_value(center_x, n) - xtr.nn_elevation_value(center_x, s)] | |
normals[i][j] = (normalize_v3(np.cross(h,v)) * 0.5 + 0.5) * 255 # compute cross and convert to normal map format | |
normals = normals.astype(int) | |
image_data = list() | |
for i in range(xtr.numlats): | |
for j in range(xtr.numlats): | |
image_data.append(tuple(normals[i][j])) | |
normal_image.putdata(image_data) | |
normal_image.save(output_path) | |
if __name__ == '__main__': | |
if len(sys.argv) == 4: | |
dem_path = sys.argv[1] | |
shp_in_path = sys.argv[2] | |
if os.path.exists(dem_path) and os.path.exists(shp_in_path): | |
shp_out_path = sys.argv[3] | |
if os.path.exists(shp_out_path): | |
os.remove(shp_out_path) | |
xtr = XTR(dem_path) | |
print('Converting shapefile...') | |
shp_bounds, shp_crs = convert_shapefile(xtr, shp_in_path, shp_out_path) | |
print('Shapefile converted!') | |
extension = 'Normals.bmp' | |
bmp_path = "".join([shp_out_path.split('.')[0], extension]) | |
if os.path.exists(bmp_path): | |
os.remove(bmp_path) | |
print('Creating normal map...') | |
create_normalmap(xtr, bmp_path, shp_bounds, shp_crs) | |
print('Normal map created!') | |
else: | |
print('Invalid parameters - check input and output paths') | |
else: | |
print('usage: xtr_path "path/to/file.shp" "output/dir/file.shp"') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment