Skip to content

Instantly share code, notes, and snippets.

@HTenkanen
Last active July 7, 2023 06:24
Show Gist options
  • Save HTenkanen/6ed582a97a83e9530bbdcca84bca8d0c to your computer and use it in GitHub Desktop.
Save HTenkanen/6ed582a97a83e9530bbdcca84bca8d0c to your computer and use it in GitHub Desktop.
Digiroad to Graph
import networkx as nx
from osgeo import ogr
import os
def load_graph_from_Digiroad_shape(filepath, direction='AJOSUUNTA', both_ways=2, against=3, along=4, strict=True):
"""Generates nx.MultiDiGraph() object from Digiroad 2.0 Shapefile.
Parameters
----------
filepath : str
Filepath to Digiroad 2.0 Shapefile.
direction : str
Name for column that contains information about the allowed driving directions
both_ways : int
Value specifying that the road is drivable to both directions.
against : int
Value specifying that the road is drivable against the digitizing direction.
along : int
Value specifying that the road is drivable along the digitizing direction.
strict : bool
Raise error if error is being caught when building the graph.
"""
graph = nx.MultiDiGraph()
shp = ogr.Open(filepath)
if shp is None:
raise RuntimeError("Unable to open {}".format(filepath))
print("Loading graph from file ..")
for lyr in shp:
fields = [x.GetName() for x in lyr.schema]
for f in lyr:
g = f.geometry()
if g is None:
if strict:
raise nx.NetworkXError("Bad data: feature missing geometry")
else:
continue
flddata = [f.GetField(f.GetFieldIndex(x)) for x in fields]
attributes = dict(zip(fields, flddata))
attributes["ShpName"] = lyr.GetName()
# Digiroad 2.0 specific GeometryType is somekind of number always
if isinstance(g.GetGeometryType(), int):
coords = g.GetPoint_2D(0)
x, y = coords[0], coords[1]
# Add x and y to attributes
attributes['x'] = x
attributes['y'] = y
attributes['nodeid'] = coords
# Add node to the graph
graph.add_node(coords, **attributes)
# Get edges
for edge in edges_from_line(g, attributes):
e1, e2, attr = edge
# If road is bi-directional add it also in reversed order
if attr[direction] == both_ways:
graph.add_edge(e1, e2, **attr)
graph.add_edge(e2, e1, **attr)
# If road is drivable against the digitization direction (reverse)
elif attr[direction] == against:
graph.add_edge(e2, e1, **attr)
# Otherwise add the edge "normally"
elif attr[direction] == along:
graph.add_edge(e1, e2, **attr)
else:
if strict:
raise nx.NetworkXError("GeometryType {} not supported".
format(g.GetGeometryType()))
# Add name
graph.graph['name'] = os.path.basename(filepath)[:-4]
# Add CRS
try:
crs = lyr.GetSpatialRef().ExportToProj4()
except:
crs = None
graph.graph['crs'] = crs
return graph
def edges_from_line(geom, attrs):
"""
Generate edges for each line in geom
Written as a helper for read_shp
Parameters
----------
geom: ogr line geometry
To be converted into an edge or edges
attrs: dict
Attributes to be associated with all geoms
Returns
-------
edges: generator of edges
each edge is a tuple of form
(node1_coord, node2_coord, attribute_dict)
suitable for expanding into a networkx Graph add_edge call
"""
if geom.GetGeometryName().startswith('MULTI'):
print("Multilinestring")
edge_attrs = attrs.copy()
last = geom.GetPointCount() - 1
edge_attrs["Wkt"] = geom.ExportToWkt()
yield (geom.GetPoint_2D(0), geom.GetPoint_2D(last), edge_attrs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment