Skip to content

Instantly share code, notes, and snippets.

@hobu
Last active November 30, 2018 14:04
Show Gist options
  • Save hobu/ee888fcd76da5c3801db89cfec98cbcb to your computer and use it in GitHub Desktop.
Save hobu/ee888fcd76da5c3801db89cfec98cbcb to your computer and use it in GitHub Desktop.
import sys
import pdal
import gdal, osr, ogr
gdal.UseExceptions()
import argparse
import logging
import subprocess
def createGeoPackage(args):
args.gpkg = 'address.gpkg'
args.dtm = 'dtm.tif'
command = "del %s" % args.gpkg
subprocess.call(command,shell=True)
driver = ogr.GetDriverByName('GPKG')
ds = driver.CreateDataSource(args.gpkg)
web_mercator = osr.SpatialReference()
web_mercator.ImportFromEPSG(3857)
layer = ds.CreateLayer('address', web_mercator, ogr.wkbPoint, ['OVERWRITE=YES'])
fld = ogr.FieldDefn("address", ogr.OFTString)
fld.SetWidth(200)
layer.CreateField(fld)
fld = ogr.FieldDefn("id", ogr.OFTInteger)
layer.CreateField(fld)
defn = layer.GetLayerDefn()
pt = ogr.Geometry(ogr.wkbPoint)
import random
args.join_id = random.randint(0, 100)
p = pt.AddPoint(args.x, args.y)
feature = ogr.Feature(defn)
feature.SetGeometry(pt)
feature.SetField("address", "%.5f, %.5f" % (args.longitude, args.latitude))
feature.SetField("id", args.join_id)
layer.CreateFeature(feature)
feature = None
layer2 = ds.CreateLayer('boundary', web_mercator, ogr.wkbPolygon)
fld = ogr.FieldDefn("address", ogr.OFTString)
fld.SetWidth(200)
layer2.CreateField(fld)
fld = ogr.FieldDefn("id", ogr.OFTInteger)
layer2.CreateField(fld)
defn = layer2.GetLayerDefn()
ply = ogr.Geometry(ogr.wkbPolygon)
feature = ogr.Feature(defn)
feature.SetGeometry(args.boundary)
feature.SetField("address", "%.5f, %.5f" % (args.longitude, args.latitude))
feature.SetField("id", args.join_id)
layer2.CreateFeature(feature)
feature = None
ds = None
def hillshade(args):
subs = {}
subs['description'] = 'Ground-only DTM'
subs['table'] = 'dtm'
subs['gpkg'] = args.gpkg
subs['dtm'] = args.dtm
command = """gdal_translate %(dtm)s %(gpkg)s -ot Float32 -of GPKG -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co APPEND_SUBDATASET=YES -co RASTER_IDENTIFIER="%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
subs = {}
subs['description'] = 'Hillshade for ground-only DTM'
subs['table'] = 'hillshade'
subs['gpkg'] = args.gpkg
subs['dtm'] = args.dtm
command = """gdaldem hillshade -az 45 -z 1.3 %(dtm)s %(gpkg)s -of GPKG -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co APPEND_SUBDATASET=YES -co RASTER_IDENTIFIER="%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
def slope(args):
subs = {}
subs['description'] = 'Slope for ground-only DTM'
subs['table'] = 'slope'
subs['gpkg'] = args.gpkg
subs['dtm'] = args.dtm
command = """gdaldem slope %(dtm)s %(gpkg)s -of GPKG -co APPEND_SUBDATASET=YES -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co RASTER_IDENTIFIER="%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
def stats(args):
ds = gdal.Open('GPKG:'+args.gpkg+':dtm')
band = ds.GetRasterBand(1)
stats = band.GetStatistics(True, True)
min, max, mean, stddev = stats
first = mean - 2*stddev
second = mean
third = mean + 2*stddev
ramp="""%d 110 220 110
%d 240 250 160
%d 230 220 170
%d 220 220 220
%d 250 250 250""" % ( min, first, second, third, max)
args.ramp = ramp
def relief(args):
f = open('color-relief.txt','wb')
f.write(args.ramp.encode('utf-8'))
f.close()
shade="""0 255 255 255
90 0 0 0"""
f = open('color-slope.txt','wb')
f.write(shade.encode('utf-8'))
f.close()
subs = {}
subs['description'] = 'Slope shade for ground-only DTM'
subs['table'] = 'slope_shade'
subs['gpkg'] = args.gpkg
subs['dtm'] = args.dtm
command = """gdaldem color-relief GPKG:%(gpkg)s:slope color-slope.txt %(gpkg)s -of GPKG -co APPEND_SUBDATASET=YES -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co RASTER_IDENTIFIER="%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
subs['description'] = 'Shaded relief'
subs['table'] = 'relief'
command = """gdaldem color-relief %(dtm)s color-relief.txt %(gpkg)s -of GPKG -co APPEND_SUBDATASET=YES -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co RASTER_IDENTIFIER="%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
def contour(args):
import pint
registry = pint.UnitRegistry()
increment = args.increment * registry.feet
increment = increment.to(registry.meters)
subs = {}
subs['increment'] = increment.magnitude
subs['dtm'] = args.dtm
subs['table'] = 'contours'
subs['description'] = '%.2f contours' % increment.magnitude
subs['gpkg'] = args.gpkg
command = """gdal_contour -a ELEVATION -inodata -i %(increment).2f -f "SQLite" %(dtm)s contours.sqlite""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
command = """ogr2ogr -f GPKG %(gpkg)s contours.sqlite contour -lco DESCRIPTION="%(description)s" -lco IDENTIFIER="%(table)s" -lco OVERWRITE=YES -update """ % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
def dtm(args):
template = """
{
"pipeline":[
%(ept)s,
{
"type":"filters.assign",
"assignment":"Classification[0:53]=0"
},
{
"type":"filters.smrf",
"slope":"0.05",
"window":"33"
},
{
"type":"filters.range",
"limits":"Classification[2:2]"
},
{
"type":"writers.gdal",
"resolution":"2.0",
"output_type":"idw",
"window_size":"10",
"filename":"dtm.tif"
}
]
}
"""
box = {}
box['minx'] = args.box[0]
box['maxx'] = args.box[1]
box['miny'] = args.box[2]
box['maxy'] = args.box[3]
box['ept'] = args.ept
args.pipeline = template % box
if args.verbose:
logging.debug('%s' % args.pipeline)
pipe = pdal.Pipeline(args.pipeline)
pipe.validate()
pipe.execute()
def reproject(args):
in_srs = osr.SpatialReference()
in_srs.ImportFromEPSG(4326)
out_srs = osr.SpatialReference()
out_srs.ImportFromEPSG(3857)
point = args.position
wkt = 'POINT(%.6f %.6f)' % (point[0], point[1])
g = ogr.CreateGeometryFromWkt(wkt, reference=in_srs)
g.TransformTo(out_srs)
args.x = g.GetX(0)
args.y = g.GetY(0)
args.boundary = g.Buffer(args.buffer)
box = args.boundary.GetEnvelope()
args.box = box
if args.verbose:
logging.debug("Fetched box '%s'" % str(args.box))
#def geocode(args):
# import mapbox
# MAPBOX_TOKEN=''
# geocoder = mapbox.Geocoder(access_token=MAPBOX_TOKEN)
# response = geocoder.forward(args.address)
# args.position = [0, 0]
# features = response.geojson()['features']
# # mapbox sorts by relevance, grab the first one
# # and call it good
# if len(features):
# args.position = features[0]['center']
# else:
# raise Exception("no geocode location found for %s" % args.address)
def classification(args):
ramp = """
# QGIS Generated Color Map Export File
0,255,0,0,255
1,234,234,234,255
2,222,126,44,255
3,30,62,15,255
4,45,255,6,255
5,209,230,201,255
6,249,12,6,255
7,146,197,222,255
8,133,5,2,255
9,0,0,255,255
10,255,255,255,255
"""
template = """
{
"pipeline":[
%(ept)s,
{
"type":"writers.gdal",
"resolution":"5.0",
"output_type":"mean",
"dimension":"Classification",
"data_type":"uint16_t",
"filename":"classified.tif"
}
]
}
"""
subs = {}
subs['ept'] = args.ept
args.pipeline = template % subs
pipe = pdal.Pipeline(args.pipeline)
pipe.validate()
pipe.execute()
f = open('asprs.txt','wb')
f.write(ramp.encode('utf-8'))
f.close()
subs = {}
subs['description'] = 'Classification (%.2fm)' % (args.resolution * 5)
subs['table'] = 'classification'
subs['gpkg'] = args.gpkg
subs['dtm'] = args.dtm
command = """gdaldem color-relief GPKG:%(gpkg)s:slope asprs.txt %(gpkg)s -of GPKG -co APPEND_SUBDATASET=YES -co TILE_FORMAT=PNG -co RASTER_TABLE=%(table)s -co RASTER_IDENTIFIER=\"%(description)s\"""" % subs
if args.verbose:
logging.debug(command)
subprocess.call(command,shell=True)
def vegetation(args):
pipeline="""{
"pipeline":[
{
"type":"filters.assign",
"assignment":"Classification[:]=0"
},
{
"type":"filters.smrf",
"cell":3.0
},
{
"type":"filters.range",
"limits":"Classification![2:2]"
},
{
"type":"filters.returns",
"groups":"first,intermediate,last"
},
{
"type":"filters.merge"
},
{
"type":"filters.estimaterank"
},
{
"type":"filters.range",
"limits":"Rank[3:3]"
}
]
}"""
def makeept(args):
stage ="""
{
"type":"readers.ept",
"filename":"%(ept)s",
"bounds": "([%(minx).3f, %(maxx).3f], [%(miny).3f, %(maxy).3f],[0.0, 10000])"
}"""
subs = {}
subs['minx'] = args.box[0]
subs['maxx'] = args.box[1]
subs['miny'] = args.box[2]
subs['maxy'] = args.box[3]
subs['ept'] = args.ept.strip("'")
args.ept = (stage % subs)
def main(args):
if args.verbose:
logging.debug('Fetching to %.3f' % args.buffer)
if args.verbose:
logging.debug('%s' % args)
reproject(args)
makeept(args)
createGeoPackage(args)
dtm(args)
contour(args)
hillshade(args)
stats(args)
slope(args)
relief(args)
classification(args)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Fetch GDAL DTM for address')
parser.add_argument('longitude', type=float, help='Address to query (quoted)')
parser.add_argument('latitude', type=float, help='Address to query (quoted)')
parser.add_argument('--ept', help='EPT Resource')
parser.add_argument('--buffer', type=float,help='Buffer')
parser.add_argument('--resolution',type=float, help='Data resolution')
parser.add_argument('--increment',type=float, help='Contour interval (ft)')
parser.add_argument('-v', '--verbose', help='Verbose', action="store_true")
parser.set_defaults(buffer=200.0, type=float)
parser.set_defaults(ept='http://na.entwine.io/nyc', type=str)
parser.set_defaults(increment=2.0, type=float)
parser.set_defaults(resolution=1.0, type=float)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
args.position = (args.longitude, args.latitude)
main(args)
# conda env create -f environment.yml
# python package.py -93.110001 44.946674 --ept 'http://na-c.entwine.io/mn/' --buffer=100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment