Skip to content

Instantly share code, notes, and snippets.

@prithwi
Last active February 23, 2016 00:42
Show Gist options
  • Save prithwi/3c8bed6b9e2a3920c230 to your computer and use it in GitHub Desktop.
Save prithwi/3c8bed6b9e2a3920c230 to your computer and use it in GitHub Desktop.
Get all lat/lon from shapefile that are within a shape
#!/usr/bin/env python
# encoding: utf-8
# Author: Prithwish Chakrabory <prithwi@vt.edu>
# LICENSE: MIT
# date: 2015-09-01
"""
Code to extract lat lon from a shapefile that fills a grid.
Shapefile used: http://thematicmapping.org/downloads/TM_WORLD_BORDERS_SIMPL-0.3.zip
Can use any shapefile as long as geometry defined by lon, lat
"""
import geopandas
import shapely
import numpy as np
import pandas as pd
import argparse
import logging
import sys
log = logging.getLogger()
# GLOBALS
ENCODING = 'utf-8'
SHAPEFILE = "../data/TM_World_Borders/TM_WORLD_BORDERS_SIMPL-0.3.shp"
SHAPE_IDX = 'ISO2'
DEFAULT_OUTFILE = "./grid_shapes.h5"
def init_logs(arg, log):
if arg and vars(arg).get('verbose', False):
l = logging.DEBUG
else:
l = logging.INFO
# printing to stdout
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
log.setLevel(l)
return
def cartesian(x, y):
"""Cartesian product of x and y
ref: http://stackoverflow.com/a/11144716/2423379
"""
return np.transpose([np.tile(x, len(y)),
np.repeat(y, len(x))])
class CoordGridder(object):
"""Class to extract grid between shapes
Usage:
------
>>> countries = ['United States', 'United Kingdom']
>>> cc_codes = ['US', 'GB']
>>> geo = CoordGridder(SHAPEFILE, index_col=SHAPE_IDX)
>>> output = geo.get_grids(cc_codes, names=countries)
>>> output.to_csv(DEFAULT_OUTFILE)
"""
def __init__(self, shapefile, index_col=None):
"""Initialized the class with the shapefile
:shapefile: shapefile location
:index_col: index column
"""
self._shapefile = shapefile
self._index_col = index_col
self._geo_df = self.read_shapefile(shapefile, index_col)
return
@property
def geo_df(self):
return self._geo_df
@staticmethod
def read_shapefile(shapefile, index_col=None):
"""reads the shapefile
Function intentionally left as staticmethod
:returns: geopandas dataframe
"""
geo_df = geopandas.GeoDataFrame.from_file(shapefile, encoding=ENCODING)
if index_col:
geo_df.set_index(index_col, inplace=True)
return geo_df
def get_coords(self, shape, interp_val=1., buf_d=1.):
"""Get all lat lons within a shape
:shape: shape of the location requried
:interp_val: grid interval
:buf_d: buffer area to use
"""
buff_shape = shape.buffer(buf_d)
min_lon, min_lat, max_lon, max_lat = buff_shape.bounds
lon_range = np.arange(np.floor(min_lon), np.ceil(max_lon),
interp_val)
lat_range = np.arange(np.floor(min_lat), np.ceil(max_lat),
interp_val)
lat_lon = pd.DataFrame(cartesian(lat_range, lon_range))
lat_lon.columns = ['lat', 'lon']
isContained = lambda x: buff_shape.contains(shapely.geometry.Point(x['lon'], x['lat']))
return lat_lon[lat_lon.apply(isContained, axis=1)]
def get_grids(self, idxs=None, names=None,
interp_val=1.0, buf_d=1.0):
"""Grids from Multiple locations
:idxs: unique identifiers in the shapefile
:names: name to identify with. if not given use idx
:interp_val: interpolation distance
:buf_d: buffer area. to accomdoate very small shapes.
:return: stacked dataframe of all countries.
"""
if idxs is None:
idxs = list(self._geo_df.index)
if names is None:
names = self._geo_df.ix[idxs, 'NAME'].tolist()
output = pd.DataFrame()
for name, idx in zip(names, idxs):
try:
shape = self.geo_df.ix[idx, 'geometry']
coords = self.get_coords(shape, interp_val, buf_d)
coords['Name'] = name
coords['CC'] = idx
output = pd.concat((output, coords))
log.info(u"Done for {}".format(name))
except Exception as e:
log.info(u"Error {} for {}".format(e, name))
else:
output = output[['lat', 'lon', 'Name', 'CC']]
output['place'] = (output[['lat', 'lon', 'CC']]
.apply(lambda xx: u"_".join([str(x) for x in xx]),
axis=1))
return output
def parse_args():
'''
Assigns a lat, lon pair to country
'''
ap = argparse.ArgumentParser('program')
# Main options
ap.add_argument("--cc_codes", metavar='CC_CODES', nargs='*',
required=False, type=str, default=None,
help="list of countries. Default=All")
ap.add_argument("--outfile", metavar='OUTFILE',
required=False, type=str, default=DEFAULT_OUTFILE,
help="Output File.")
ap.add_argument("--buff_area", metavar='BUFF_AREA', required=False,
type=float, default=0.,
help="Buffer area. Default=0")
# Log options
ap.add_argument('-v', '--verbose', action="store_true",
help="Log option: Write debug messages.")
arg = ap.parse_args()
return arg
def main():
try:
arg = parse_args()
init_logs(arg, log)
cc_codes = arg.cc_codes
outfile = arg.outfile
buff_area = arg.buff_area
geo = CoordGridder(SHAPEFILE, index_col=SHAPE_IDX)
output = geo.get_grids(cc_codes, names=None, buf_d=buff_area)
log.info(u'Dumping to {}'.format(outfile))
if outfile.endswith('.csv'):
output.to_csv(outfile, index=False, header=False, encoding=ENCODING)
elif outfile.endswith('.h5'):
output.Name = output.Name.str.normalize('NFKD')
output.Name = output.Name.astype(str)
output.CC = output.CC.astype(str)
output.place = output.place.astype(str)
output.to_hdf(outfile, 'grid', format='t')
return 0
except Exception as e:
log.exception(e)
return 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment