Skip to content

Instantly share code, notes, and snippets.

@nishadhka
Last active January 11, 2020 08:56
Show Gist options
  • Save nishadhka/8860dd1609beac4f7fd40565edf3d5a4 to your computer and use it in GitHub Desktop.
Save nishadhka/8860dd1609beac4f7fd40565edf3d5a4 to your computer and use it in GitHub Desktop.
Pyspark to extract OSM PBF and convert into shape file
  1. based on this to convert the pbf into paraquet file
  2. The library, https://github.com/adrianulbona/osm-parquetizer to convert the osm.pbf file into three paraquet files
java -jar target/osm-parquetizer-1.0.1-SNAPSHOT.jar ../test1/romania-latest.osm.pbf
  1. The Python pyspark code to read the paraquete file and make subset on extent and save the shape file
  2. Using the docker for https://hub.docker.com/r/airpollutionstudyindia/matplotlib and executing the code in the contianer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode
from pyspark.sql import SQLContext
import pyspark.sql.functions as sqlf
from pyspark.sql.functions import col
from pyspark.sql.functions import collect_list, struct, sort_array, broadcast
from pyspark.sql.functions import array_contains
import pandas as pd
import geopandas as gp
import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import Point, LineString
from pyspark.sql import functions as F


# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file

sqlContext = SQLContext(sc)

# to read parquet file
#df = sqlContext.read.parquet('/home/jovyan/work/bhutan-latest.osm.pbf.way.parquet')
#df = sqlContext.read.parquet('/home/jovyan/work/poland-latest.osm.pbf.way.parquet')
nodeDF = sqlContext.read.parquet("/home/jovyan/work/poland-latest.osm.pbf.node.parquet")
wayDF = sqlContext.read.parquet("/home/jovyan/work/poland-latest.osm.pbf.way.parquet")

wayNodeDF = wayDF.filter(array_contains(wayDF.tags['key'],bytearray(b'highway')))

t1_wayNodeDF = wayNodeDF.select(col("id").alias("typewayId"), explode(col("tags.key")).alias("tagskey"))
t2_wayNodeDF = wayNodeDF.select(explode(col("tags.value")).alias("tagsvalue"))
t1_way = t1_wayNodeDF.withColumn("lindex", F.monotonically_increasing_id())
t2_way = t2_wayNodeDF.withColumn("rindex", F.monotonically_increasing_id())
typeway = t1_way.join(t2_way,col('lindex') == col('rindex'))
#pd_typeway = typeway.toPandas()
typeway.printSchema()

strtypeway = typeway.where(col("tagskey").isin(bytearray(b'highway')))

strtypeway=strtypeway.withColumn('tagskey_str',strtypeway.tagskey.astype('string'))
strtypeway=strtypeway.withColumn('tagsvalue_str',strtypeway.tagsvalue.astype('string'))

highwaytypes=strtypeway.select(col("typewayId"),col("tagskey_str"),col("tagsvalue_str"))
highwaytypes.printSchema() 

    
p1_expwayNodeDF = wayNodeDF.select(col("id").alias("wayId"), explode(col("nodes.index")).alias("indexedNode"))
p2_expwayNodeDF = wayNodeDF.select(explode(col("nodes.nodeId")).alias("nodeId"))

df_index = p1_expwayNodeDF.withColumn("lindex", F.monotonically_increasing_id())
af_index = p2_expwayNodeDF.withColumn("rindex", F.monotonically_increasing_id())

expwayNodeDF = df_index.join(af_index,col('lindex') == col('rindex'))
exagkw = expwayNodeDF.select(col("wayId"), col("indexedNode"),col("lindex").alias("rowId"),col("nodeId").alias("waynodeId"))


karkownodeDF=nodeDF.filter(sqlf.col('longitude') <20.5)
kkownodeDF=karkownodeDF.filter(sqlf.col('longitude') >19.5)
kownodeDF=kkownodeDF.filter(sqlf.col('latitude')<50.5)
ownodeDF=kownodeDF.filter(sqlf.col('latitude') >49.2)
knodeDF = ownodeDF.select(col("id").alias("nodeId"),col('latitude'),col('longitude') ) 

knodway=knodeDF.join(exagkw, col('nodeId') == col('waynodeId'))

typeknodway=knodway.join(highwaytypes, col('wayId') == col('typewayId'))

tp_nodeway=typeknodway.select(col("nodeId"),col("latitude"),col("longitude"),col("wayId"),col("indexedNode"),col("tagskey_str"),col("tagsvalue_str"))



pd_knodway = tp_nodeway.toPandas()
pd_knodway.to_csv('/home/jovyan/work/pd_knodway2.csv')


db=pd.read_csv('/home/sunbird/Documents/april_2019/ueinfo/osm-pbf/pd_knodway2.csv')
geometry = [Point(xy) for xy in zip(db.longitude, db.latitude)]
db1 = GeoDataFrame(db, geometry=geometry)
db1.sort_values(['wayId', 'indexedNode'], ascending=[True, True], inplace=True)
db2=db1.groupby('wayId').filter(lambda s: s.geometry.count()>=2)
db3 = db2.groupby(['wayId'])['geometry'].apply(lambda x: LineString(x.tolist()))
db4=pd.DataFrame(db3)
db5=db4.reset_index()

typedb=db[['wayId','tagskey_str','tagsvalue_str']]

db6=pd.merge(db5,typedb,on='wayId')
db7=gp.GeoDataFrame(db6)
db7.to_file('krakow_roads.shp',driver ='ESRI Shapefile')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment