Skip to content

Instantly share code, notes, and snippets.

@hagberg
Forked from jtorrents/gist:4586874
Last active December 11, 2015 11:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hagberg/4591198 to your computer and use it in GitHub Desktop.
Save hagberg/4591198 to your computer and use it in GitHub Desktop.
import time
import networkx as nx
from contextlib import contextmanager
@contextmanager
def hidden_nodes(G, nodes=None):
import types
if nodes is None:
nodes = []
def successors_iter(G,n):
try:
return iter(s for s in G.adj[n] if s not in nodes)
except KeyError:
raise NetworkXError("The node %s is not in the digraph."%(n,))
def predecessors_iter(G,n):
try:
return iter(s for s in G.pred[n] if s not in nodes)
except KeyError:
raise NetworkXError("The node %s is not in the digraph."%(n,))
try:
if G.is_directed():
si = G.successors_iter
G.successors_iter = types.MethodType(successors_iter,G)
pi = G.predecessors_iter
G.predecessors_iter = types.MethodType(predecessors_iter,G)
else:
si = G.neighbors_iter
G.neighbors_iter = types.MethodType(successors_iter,G)
yield G
finally:
if G.is_directed():
G.successors_iter = si
G.predecessors_iter = pi
else:
G.neighbors_iter = si
def bidirectional_shortest_path(G, source, target, ignore_nodes=None, ignore_edges=None):
"""Return a list of nodes in a shortest path between source and target.
Parameters
----------
G : NetworkX graph
source : node label
starting node for path
target : node label
ending node for path
ignore_nodes : list of nodes
nodes to ignore, optional
ignore_edges : list of edges
edges to ignore, optional
Returns
-------
path: list
List of nodes in a path from source to target.
Raises
------
NetworkXNoPath
If no path exists between source and target.
See Also
--------
shortest_path
Notes
-----
This algorithm is used by shortest_path(G,source,target).
"""
# call helper to do the real work
results=_bidirectional_pred_succ(G,source,target,ignore_nodes,ignore_edges)
pred,succ,w=results
# build path from pred+w+succ
path=[]
# from w to target
while w is not None:
path.append(w)
w=succ[w]
# from source to w
w=pred[path[0]]
while w is not None:
path.insert(0,w)
w=pred[w]
return path
def _bidirectional_pred_succ(G, source, target, ignore_nodes=None, ignore_edges=None):
"""Bidirectional shortest path helper.
Returns (pred,succ,w) where
pred is a dictionary of predecessors from w to the source, and
succ is a dictionary of successors from w to the target.
"""
# does BFS from both source and target and meets in the middle
if target == source:
return ({target:None},{source:None},source)
# handle either directed or undirected
if G.is_directed():
Gpred=G.predecessors_iter
Gsucc=G.successors_iter
else:
Gpred=G.neighbors_iter
Gsucc=G.neighbors_iter
# support optional nodes filter
if ignore_nodes:
def filter_iter(nodes_iter):
def iterate(v):
for w in nodes_iter(v):
if w not in ignore_nodes:
yield w
return iterate
Gpred=filter_iter(Gpred)
Gsucc=filter_iter(Gsucc)
# support optional edges filter
if ignore_edges:
if G.is_directed():
def filter_pred_iter(pred_iter):
def iterate(v):
for w in pred_iter(v):
if (w, v) not in ignore_edges:
yield w
return iterate
def filter_succ_iter(succ_iter):
def iterate(v):
for w in succ_iter(v):
if (v, w) not in ignore_edges:
yield w
return iterate
Gpred=filter_pred_iter(Gpred)
Gsucc=filter_succ_iter(Gsucc)
else:
def filter_iter(nodes_iter):
def iterate(v):
for w in nodes_iter(v):
if (v, w) not in ignore_edges \
and (w, v) not in ignore_edges:
yield w
return iterate
Gpred=filter_iter(Gpred)
Gsucc=filter_iter(Gsucc)
# predecesssor and successors in search
pred={source:None}
succ={target:None}
# initialize fringes, start with forward
forward_fringe=[source]
reverse_fringe=[target]
while forward_fringe and reverse_fringe:
if len(forward_fringe) <= len(reverse_fringe):
this_level=forward_fringe
forward_fringe=[]
for v in this_level:
for w in Gsucc(v):
if w not in pred:
forward_fringe.append(w)
pred[w]=v
if w in succ:
# found path
return pred,succ,w
else:
this_level=reverse_fringe
reverse_fringe=[]
for v in this_level:
for w in Gpred(v):
if w not in succ:
succ[w]=v
reverse_fringe.append(w)
if w in pred:
# found path
return pred,succ,w
raise nx.NetworkXNoPath("No path between %s and %s." % (source, target))
def node_connectivity_filter(G, source, target,strict=False,max_paths=None):
# Maximum possible node independent paths
if G.is_directed():
possible = min(G.out_degree(source), G.in_degree(target))
else:
possible = min(G.degree(source), G.degree(target))
if max_paths is None:
max_paths = float('Inf')
K = 0
if target == source:
return None
elif possible == 0:
return 0
elif strict and target in G[source]:
return float('nan')
exclude = set()
for i in range(min(possible, max_paths)):
try:
path = bidirectional_shortest_path(G,source,target,ignore_nodes=exclude)
exclude.update(set(path)-set([source, target]))
K += 1
except nx.NetworkXNoPath:
break
return K
def node_connectivity_hide(G, source, target,strict=False,max_paths=None):
# Maximum possible node independent paths
if G.is_directed():
possible = min(G.out_degree(source), G.in_degree(target))
else:
possible = min(G.degree(source), G.degree(target))
if max_paths is None:
max_paths = float('Inf')
K = 0
if target == source:
return None
elif possible == 0:
return 0
elif strict and target in G[source]:
return float('nan')
exclude = set()
for i in range(min(possible, max_paths)):
try:
with hidden_nodes(G, nodes=exclude) as H:
path = nx.shortest_path(H,source,target)
exclude.update(set(path)-set([source, target]))
K += 1
except nx.NetworkXNoPath:
break
return K
if __name__ == '__main__':
for p in [0.05, 0.1, 0.15, 0.2]:
G = nx.gnp_random_graph(1001,p)
print("Testing filters with a G_np random graph of order {0} and size {1}".format(G.order(), G.size()))
start = time.time()
k = node_connectivity_hide(G, 1, 1000)
print("\tContext class: found {0} node independent paths in {1:.4f} seconds".format(k, time.time()-start))
start = time.time()
k = node_connectivity_filter(G, 1, 1000)
print("\tFilter function: found {0} node independent paths in {1:.4f} seconds".format(k, time.time()-start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment