Created
October 25, 2016 18:31
-
-
Save selahlynch/fe290359e8a3b663f2c2d9609f33b2f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import sqlalchemy | |
from PERMA.code.mysql_scripts import mysql_iter_funcs | |
from osgeo import ogr | |
if len(sys.argv) == 6: | |
(schema, table, out_column, shapefile, shape_field) = sys.argv[1:] | |
else: | |
print "Usages map_coordinates.py <schema> <table> <out_column> <shapefile> <shape_field>" | |
print "Using hardcoded values..." | |
schema = "philadelphia" | |
table = "msg_geo_r1M" | |
out_column = "block_group_fips" | |
shapefile = "/home/selah/Data/Census_Block_Groups_2010/Census_Block_Groups_2010.shp" | |
shape_field = "STFID" | |
(latitude_column, longitude_column) = ("latitude", "longitude") | |
#engine_url = "mysql://127.0.0.1:3307/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema) #on laptop | |
#engine_url = "mysql://venti:3306/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema) #on work desktop | |
engine_url = "mysql://localhost:3306/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema) #on server | |
db_eng = sqlalchemy.create_engine(engine_url) | |
print "Extending table if necessary..." | |
mysql_iter_funcs.extend_table(db_eng, table, {out_column: 'BIGINT(22)'}) | |
res = db_eng.execute("SELECT id, {0}, {1} FROM {2} WHERE {3} IS NULL".format(latitude_column, longitude_column, table, out_column)) | |
inp_iter = iter(res) | |
#convert result set to iter of dicts... dicts can be updated, whereas resultset rows cannot | |
def dictify(an_iter): | |
for item in an_iter: | |
yield dict(item) | |
inp_iter_dict = dictify(inp_iter) | |
driver = ogr.GetDriverByName("ESRI Shapefile") | |
dataSource = driver.Open(shapefile, 0) | |
feats = list(dataSource.GetLayer()) | |
def getShapeFeatureField(lon, lat): | |
point = ogr.Geometry(ogr.wkbPoint) | |
point.AddPoint(lon, lat) | |
for feature in feats: | |
geom = feature.GetGeometryRef() | |
if point.Within(geom) == True: | |
return int(feature.GetField(shape_field)) | |
return 0 | |
def coords_to_census_tract(coords_iter): | |
for item in coords_iter: | |
lon = item[longitude_column] | |
lat = item[latitude_column] | |
item[out_column] = getShapeFeatureField(lon, lat) | |
yield item | |
out_iter = coords_to_census_tract(inp_iter_dict) | |
mysql_iter_funcs.mysql_update(db_eng, table, out_iter, log_every=1000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment