Skip to content

Instantly share code, notes, and snippets.

@lossyrob
Created August 7, 2017 21:12
Show Gist options
  • Save lossyrob/cc7ab246d020f355fad016e35207032e to your computer and use it in GitHub Desktop.
Save lossyrob/cc7ab246d020f355fad016e35207032e to your computer and use it in GitHub Desktop.
Example landsat geopyspark ingest
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import rasterio\n",
"import rasterio.features\n",
"import rasterio.warp\n",
"import geopyspark as gps\n",
"import numpy as np\n",
"import csv, os\n",
"import matplotlib as mpl\n",
"import matplotlib.pyplot as plt\n",
"import math\n",
"from PIL import Image\n",
"\n",
"from datetime import datetime\n",
"from pyspark import SparkContext\n",
"from geonotebook.wrappers import TMSRasterData\n",
"from osgeo import osr\n",
"\n",
"%matplotlib inline\n",
"\n",
"curdir = os.path.abspath(os.path.curdir)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"#import pkg_resources\n",
"#pkg_resources.get_distribution(\"geopyspark\").version\n",
"#%autoreload?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"gps.geotrellis.boom()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['/opt/jars']\n",
"possible\n",
"/home/hadoop/.local/lib/python3.4/site-packages/geopyspark-0.2.0rc5-py3.4.egg/geopyspark/jars/*.jar\n",
"/home/hadoop/notebooks/jars/*.jar\n",
"/home/hadoop/geopyspark/jars/*.jar\n",
"/opt/jars/*.jar\n",
"a\n",
"/home/hadoop/.local/lib/python3.4/site-packages/geopyspark-0.2.0rc5-py3.4.egg/geopyspark/jars/geotrellis-backend-assembly-0.2.0.jar\n",
"/opt/jars/gddp-assembly-0.2.0.jar\n",
"/home/hadoop/notebooks\n",
"/home/hadoop/.local/lib/python3.4/site-packages/geopyspark-0.2.0rc5-py3.4.egg/geopyspark\n",
"/home/hadoop/.local/lib/python3.4/site-packages/geopyspark-0.2.0rc5-py3.4.egg/geopyspark/jars\n"
]
}
],
"source": [
"conf = gps.geopyspark_conf(appName=\"Landsat\") \\\n",
" .setMaster(\"local[*]\") \\\n",
" .set(key='spark.ui.enabled', value='true') \\\n",
" .set(key='spark.local.dir', value=os.path.join(curdir, 'spark-tmp')) \\\n",
" .set(key=\"spark.driver.memory\", value=\"8G\")\n",
"sc = SparkContext(conf=conf)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"desired_bands = { \"Ultra Blue\" : 0,\n",
" \"Blue\": 1,\n",
" \"Green\": 2,\n",
" \"Red\": 3,\n",
" \"NIR\": 4,\n",
" \"SW1\": 5,\n",
" \"SW2\": 6,\n",
" \"QA\": 11 }\n",
"desired_band_nos = desired_bands.values()\n",
"csv_data = []\n",
"with open(\"l8-scenes.csv\") as csvfile:\n",
" csv_reader = csv.DictReader(csvfile)\n",
" for row in csv_reader:\n",
" if int(row['band']) in desired_band_nos: # Remove Pan band\n",
" csv_data.append(row)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd0 = sc.parallelize(csv_data[16:24]).repartition(8)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def get_metadata(line):\n",
" with rasterio.open(line['uri']) as dataset:\n",
" bounds = dataset.bounds\n",
" height = dataset.height\n",
" width = dataset.width\n",
" crs = dataset.get_crs()\n",
" srs = osr.SpatialReference()\n",
" srs.ImportFromWkt(crs.wkt)\n",
" proj4 = srs.ExportToProj4()\n",
" tile_cols = math.floor((width - 1) / 512) * 512\n",
" tile_rows = math.floor((height - 1) / 512) * 512\n",
" ws = [((x, x + 512), (y, y + 512)) for x in range(0,tile_cols, 512) \\\n",
" for y in range(0, tile_rows, 512)]\n",
"\n",
" def windows(line, ws):\n",
" for w in ws:\n",
" ((row_start, row_stop), (col_start, col_stop)) = w\n",
"\n",
" left = bounds.left + (bounds.right - bounds.left)*(float(col_start)/width)\n",
" right = bounds.left + (bounds.right - bounds.left)*(float(col_stop)/ width)\n",
" bottom = bounds.top + (bounds.bottom - bounds.top)*(float(row_stop)/height)\n",
" top = bounds.top + (bounds.bottom - bounds.top)*(float(row_start)/height)\n",
" extent = gps.Extent(left,bottom,right,top)\n",
" instant = datetime.strptime(line['date'], '%Y-%m-%d')\n",
" \n",
" new_line = line.copy()\n",
" new_line.pop('date')\n",
" new_line['window'] = w\n",
" new_line['projected_extent'] = gps.TemporalProjectedExtent(extent=extent, instant=instant, proj4=proj4)\n",
" new_line['tile_key'] = (row_start, col_start)\n",
" yield new_line\n",
" \n",
" return [i for i in windows(line, ws)]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'band': '0',\n",
" 'projected_extent': TemporalProjectedExtent(extent=Extent(xmin=362385.0, ymin=3294855.0, xmax=377745.0, ymax=3310215.0), instant=datetime.datetime(2017, 1, 1, 0, 0), epsg=None, proj4='+proj=utm +zone=17 +datum=WGS84 +units=m +no_defs '),\n",
" 'scene_id': 'LC80160402017001LGN00',\n",
" 'tile_key': (0, 0),\n",
" 'uri': 's3://landsat-pds/L8/016/040/LC80160402017001LGN00/LC80160402017001LGN00_B1.TIF',\n",
" 'window': ((0, 512), (0, 512))}"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rdd1 = rdd0.flatMap(get_metadata)\n",
"rdd1.first()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def get_data(line):\n",
" \n",
" new_line = line.copy()\n",
"\n",
" with rasterio.open(line['uri']) as dataset:\n",
" new_line['data'] = dataset.read(1, window=line['window'])\n",
" new_line.pop('window')\n",
" new_line.pop('uri')\n",
" \n",
" return new_line\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'band': '0', 'data': array([[0, 0, 0, ..., 0, 0, 0],\n",
" [0, 0, 0, ..., 0, 0, 0],\n",
" [0, 0, 0, ..., 0, 0, 0],\n",
" ..., \n",
" [0, 0, 0, ..., 0, 0, 0],\n",
" [0, 0, 0, ..., 0, 0, 0],\n",
" [0, 0, 0, ..., 0, 0, 0]], dtype=uint16), 'projected_extent': TemporalProjectedExtent(extent=Extent(xmin=362385.0, ymin=3294855.0, xmax=377745.0, ymax=3310215.0), instant=datetime.datetime(2017, 1, 1, 0, 0), epsg=None, proj4='+proj=utm +zone=17 +datum=WGS84 +units=m +no_defs '), 'scene_id': 'LC80160402017001LGN00', 'tile_key': (0,\n",
" 0)}"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rdd2 = rdd1.repartition(5000).map(get_data)\n",
"rdd2.first()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(('LC80160402017001LGN00', (5120, 1024)),\n",
" <pyspark.resultiterable.ResultIterable at 0x7ff34b5e1ba8>)"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rdd3 = rdd2.groupBy(lambda line: (line['scene_id'], line['tile_key']))\n",
"rdd3.first()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def make_tiles(grouped):\n",
" lines = list(grouped[1])\n",
" projected_extent = lines[0]['projected_extent']\n",
" try:\n",
" array = np.array([l['data'] for l in sorted(lines, key=lambda l: int(l['band']))])\n",
" if array.dtype == 'object':\n",
" bandshapes = [str(l['data'].shape) for l in sorted(lines, key=lambda l: l['band'])]\n",
" raise Exception(\"{}\".format('\\n'.join(bandshapes)))\n",
" except:\n",
" bandshapes = [\"{} - {}\".format(l['band'], l['data'].shape) for l in sorted(lines, key=lambda l: l['band'])]\n",
" raise Exception(\"ER {}\".format('\\n'.join(bandshapes)))\n",
" tile = gps.Tile.from_numpy_array(array, no_data_value=0.0)\n",
" return (projected_extent, tile)\n",
"\n",
"def interesting_tile(line):\n",
" [tpe, tile] = line\n",
" return (np.sum(tile[0][0]) != 0)\n",
"\n",
"def square_tile(line):\n",
" [tpe, tile] = line\n",
" return tile[0][0].shape == (512,512)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd4 = rdd3.map(make_tiles)\n",
"#data = rdd4.filter(interesting_tile).first()\n",
"#data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"plt.imshow(data[1][0][0])"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"raster_layer = gps.RasterLayer.from_numpy_rdd(gps.LayerType.SPACETIME, rdd4)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tiled_raster_layer = raster_layer.tile_to_layout(layout = gps.GlobalLayout(), target_crs=3857)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"#rdd4.first()\n",
"raster_layer.to_numpy_rdd().first()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pyramid = tiled_raster_layer.pyramid(resample_method=gps.ResampleMethod.BILINEAR)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"for layer in sorted(pyramid.levels.values(), key=lambda l: -l.zoom_level):\n",
" gps.write(\"file://{}/catalog/\".format(curdir), \"landsat\", layer, time_unit=gps.TimeUnit.DAYS)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Display NDVI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def safe_divide(a, b):\n",
" with np.errstate(divide='ignore', invalid='ignore'):\n",
" c = np.true_divide(a, b)\n",
" c[c == np.inf] = 0\n",
" c = np.nan_to_num(c)\n",
" return c\n",
"\n",
"\n",
"def compute_ndvi(tile):\n",
" cells = tile.cells.astype(float)\n",
" red = cells[3]\n",
" ir = cells[4]\n",
" ndvi = safe_divide((ir - red), (ir + red))\n",
" return gps.Tile.from_numpy_array(ndvi, no_data_value=tile.no_data_value)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ndvi_rdd = tiled_raster_layer.map_tiles(compute_ndvi).repartition(100)\n",
"pyramid = ndvi_rdd.to_spatial_layer().pyramid(resample_method=gps.ResampleMethod.BILINEAR)\n",
"colormap = gps.ColorMap.build(breaks=ndvi_rdd.get_quantile_breaks(15), colors='plasma')\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import json\n",
"json.dumps(tiled_raster_layer.layer_metadata.to_dict())\n",
"#tiled_raster_layer.layer_metadata"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tms_server = gps.TMS.build(pyramid, display=colormap)\n",
"M.add_layer(TMSRasterData(tms_server), name=\"landsat\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Display (Optional) ##"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pyramid = tiled_raster_layer.to_spatial_layer().pyramid()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"colormap = gps.ColorMap.build(breaks=tiled_raster_layer.get_quantile_breaks(15), colors='plasma')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tms_server = gps.TMS.build(pyramid, display=colormap)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"M.add_layer(TMSRasterData(tms_server), name=\"landsat\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"M.remove_layer(M.layers[0])"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"## Display from Ingested"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ndvi_color_map = \\\n",
" gps.ColorMap.build(breaks=[0.05,0.1,0.2,0.3,0.4,0.5,0.6,0.7,1],\n",
" colors=[0xd9f0a3ff,\n",
" 0xaddd8eff,\n",
" 0x78c679ff,\n",
" 0x41ab5dff,\n",
" 0x238443ff,\n",
" 0x006837ff,\n",
" 0x004529ff])\n",
"\n",
"ndwi_color_map = \\\n",
" gps.ColorMap.build(breaks=[0,0.1,0.2,0.3,0.4,1],\n",
" colors=[0xaacdff44,\n",
" 0x70abffff,\n",
" 0x3086ffff,\n",
" 0x1269e2ff,\n",
" 0x094aa5ff,\n",
" 0x012c69ff])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"cmap = mpl.cm.get_cmap('viridis')\n",
"def render_ndvi(tile):\n",
" #raise Exception(\"asd\")\n",
" cells = tile.cells.astype(float)\n",
" red = cells[3]\n",
" ir = cells[4]\n",
" ndvi = safe_divide((ir - red), (ir + red))\n",
" im = cmap(ndvi)\n",
" im = np.uint8(im * 255)\n",
" im = Image.fromarray(im)\n",
" return Iim\n",
"tms_server = gps.TMS.build((\"file:///{}/catalog/\".format(curdir), \"landsat\"), display=render_ndvi)\n",
"M.add_layer(TMSRasterData(tms_server), name=\"landsat\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"curdir"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"M.remove_layer(M.layers[0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Display from query"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"layer = gps.query(gps.LayerType.SPACETIME, \n",
" \"file:///{}/catalog/\".format(curdir), \n",
" \"landsat\", \n",
" layer_zoom=9,\n",
" num_partitions=100)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"r = layer.bands(3).cache()\n",
"ir = layer.bands(4).cache()\n",
"\n",
"ndvi = (ir - r) / (ir + r)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"metadata = layer.layer_metadata\n",
"isinstance(metadata, gps.Metadata)\n",
"hasattr(metadata, '_metadata_dict')\n",
"del metadata._metadata_dict"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"{\n",
" 'bounds': metadata.bounds._asdict(),\n",
" 'crs': metadata.crs,\n",
" 'cellType': metadata.cell_type,\n",
" 'extent': metadata.extent._asdict(),\n",
" 'layoutDefinition': {\n",
" 'extent': metadata.layout_definition.extent._asdict(),\n",
" 'tileLayout': metadata.tile_layout._asdict()\n",
" }\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"layer = gps.query(gps.LayerType.SPACETIME, \n",
" \"file:///{}/catalog/\".format(curdir), \n",
" \"landsat\", \n",
" layer_zoom=9,\n",
" num_partitions=100).map_tiles(compute_ndvi)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pyramid = ndvi.to_spatial_layer().pyramid()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pyramid = pyramid.cache()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tms_server = gps.TMS.build(pyramid, display=ndvi_color_map)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<promise.promise.Promise at 0x7ff34b5e7d68>"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"M.add_layer(TMSRasterData(tms_server), name=\"ndvi\")"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"56015"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"tms_server.server.port() \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd = layer.to_numpy_rdd()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd.takeSample(num=2, withReplacement=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd.count()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"layer.layer_metadata"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"raw_layer = gps.query(gps.LayerType.SPACETIME, \n",
" \"file:///{}/catalog/\".format(curdir), \n",
" \"landsat\", \n",
" layer_zoom=9,\n",
" num_partitions=100)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"raw_layer.layer_metadata\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "GeoNotebook + GeoPySpark",
"language": "python",
"name": "geonotebook3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.4.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment