Skip to content

Instantly share code, notes, and snippets.

@ALenfant
Last active April 12, 2022 13:48
Show Gist options
  • Save ALenfant/6766560 to your computer and use it in GitHub Desktop.
Save ALenfant/6766560 to your computer and use it in GitHub Desktop.
An implementation of the KSPA algorithm (for K-Shortest Paths based on A* found at http://en.cnki.com.cn/Article_en/CJFDTOTAL-JSJY200804041.htm), using the igraph library, in Python 3. This algorithm has the particularity of being able to find the k shortest paths in a multigraph (a graph, directed or not, with parallel edges). Implemented accor…
def heuristic_cost_estimate(graph, source, target):
#Possible implementation here https://gist.github.com/delijati/1629405
return 0 #Heuristic = 0 : same as Dijkstra
def reconstruct_path(came_from, current_node):
if current_node in came_from:
p = reconstruct_path(came_from, came_from[current_node])
return p + [current_node]
else:
return [current_node]
def Astar(graph, source, target, weights=None):
closedset = set() #The set of nodes already evaluated.
openset = set([source]) #The set of tentative nodes to be evaluated, initially containing the start node
came_from = {} #The map of navigated nodes
g_score = {} #Cost from start along the best known path
f_score = {} # Estimated total cost from start to goal through y
g_score[source] = 0
f_score[source] = g_score[source] + heuristic_cost_estimate(graph, source, target)
while len(openset) > 0: #while openset is not empty
current = min(openset, key=f_score.get) #the node in openset having the lowest f_score[] value
if current == target:
return reconstruct_path(came_from, target)
openset.remove(current)
closedset.add(current)
for neighbor_edge in graph.es.select(_source=current): #graph.neighborhood(current, mode=igraph.OUT):
neighbor = neighbor_edge.target
if weights == None:
tentative_g_score = g_score[current] + 1
else:
tentative_g_score = g_score[current] + neighbor_edge[weights] #dist_between(current,neighbor)
if neighbor in closedset and tentative_g_score >= g_score[neighbor]:
continue
if neighbor not in openset or tentative_g_score < g_score[neighbor]:
came_from[neighbor] = current
g_score[neighbor] = tentative_g_score
f_score[neighbor] = g_score[neighbor] + heuristic_cost_estimate(graph, neighbor, target)
if neighbor not in openset:
openset.add(neighbor) #add neighbor to openset
return None #Failure to find a path
#Tranforms a vertex sequence to the corresponding sequence of edges' "ids"
#
def VertexListToEdgeList(graph, nodelist, id_attribute, weights=None):
edge_list = []
for i in range(len(nodelist)-1):
if weights == None:
#No weights
edge = graph.es.find(_source=nodelist[i], _target=nodelist[i+1])
else:
possible_edges = graph.es.select(_source=nodelist[i], _target=nodelist[i+1])
edge = possible_edges[0]
if len(possible_edges) > 1:
for possible_edge in possible_edges:
if possible_edge[weights] < edge[weights]:
edge = possible_edge
edge_list.append(edge[id_attribute])
return edge_list
def contains(small, big):
for i in range(len(big)-len(small)+1):
for j in range(len(small)):
if big[i+j] != small[j]:
break
else:
return i, i+len(small)
return False
def EdgeListCost(graph, edgelist, weights):
if weights == None:
#No weights, each edge is worth 1
potential_current_path_cost = len(edgelist)
else:
potential_current_path_cost = sum([graph.es[edge][weights] for edge in edgelist])
return potential_current_path_cost
def KSPA(graph, source, target, num_paths, weights=None):
#Initialization:
# Copy the graph because we will modify it
import copy
initial_graph = graph #Kept for cost calculations since sometimes edges are deleted
graph = copy.deepcopy(graph)
# Give another set of ids to the edges that will not change
for edge in graph.es:
edge["abs_id"] = edge.index
# Copy the graph because we will modify it
import copy
graph = copy.deepcopy(graph)
##Step 1
import queue
optional_path = set() #Possible next paths to consider
optional_path_queue = queue.PriorityQueue() #Possible next paths to consider, with cost as priority (a get() will return the lowest-cost path inside)
optional_path_vertices = {} #Dict with each optional_path as keys containing paths in therms of vertices
current_path_vertices = Astar(graph, source, target, weights=weights) #Shortest path
if current_path_vertices == []:
return []
current_path = VertexListToEdgeList(graph, current_path_vertices, "abs_id", weights=weights) #to a list of edge "ids"
current_path = tuple(current_path) #To a tuple (to add in a set if necessary) #Current path to consider
result_path = [current_path] #Contains the results as paths of edges (identified by their id in the initial graph) by order of priority
result_path_vertices = [current_path_vertices] #Contains the results as path of vertices (identified by their id) (same order)
result_path_costs = [] #Contains the cost of each result
k = 1 #Number of shortest paths already computed
#Main loop
while True:
edges_to_delete = [] #Contains the edges to delete before trying to find the next shortest path
deleted_edges = [] #Contains a list of tuples (source, target, attributes) of the temporarily deleted edges
##Step 2
#If we have no more possible path or enough paths
if current_path == None or k >= num_paths:
break #Leave the main loop
##Step 3
#loop for every node until the end node
for i in range(len(current_path)):
edge_id = current_path[i]
node_j = current_path_vertices[i] #graph.es.find(abs_id = edge_id).source
path_before_node_j = current_path[:i] #Path until node j
path_before_node_j_vertices = current_path_vertices[:i]
##Step 4
#For each reachable node...
next_edges = [] #Will contain the list of outgoing edges that respect condition R
for next_edge in graph.es.select(_source=node_j):
#Condition R checks
if next_edge.target != node_j and next_edge.target not in path_before_node_j_vertices:
next_edges.append(next_edge["abs_id"]) #Condition R respected, we keep this edg
else:
edge_to_delete = next_edge
for next_edge in next_edges:
#In order to avoid current_path from adding to optional_path again:
checked_path = path_before_node_j + (next_edge,) #New path tuple
set_of_paths_to_check = set()
set_of_paths_to_check = set_of_paths_to_check.union(result_path)
set_of_paths_to_check = set_of_paths_to_check.union(optional_path)
for path_to_check in set_of_paths_to_check:
if contains(checked_path, path_to_check) != False:
edges_to_delete.append(next_edge)
edge_to_delete = graph.es.find(abs_id=next_edge)
#We'll delete later to avoid id conflicts/edge shuffling
break
##Step 5
#We will look for the next shortest path, let's delete the edges to delete
edges_to_delete = graph.es.select(abs_id_in=edges_to_delete)
for edge_to_delete in edges_to_delete:
if edge_to_delete.source != node_j:
raise Exception("Wrong edge to delete!") #References to edges shifted
deleted_edges.append((edge_to_delete.source, edge_to_delete.target, edge_to_delete.attributes()))
#edge_to_delete.delete()
edges_to_delete.delete()
edges_to_delete = [] #TODO placer correctement dans loop
if len(graph.es.select(abs_id_in=edges_to_delete)) > 0:
raise Exception("Wrong edge deleted!") #References to edges shifted
#Then look for the next shortest path
new_shortest_path_vertices = Astar(graph, node_j, target, weights=weights) #Shortest path
#If there is one
if new_shortest_path_vertices != None:
#Find the used edge's "ids"
new_shortest_path = VertexListToEdgeList(graph, new_shortest_path_vertices, "abs_id", weights=weights) #to a list of edge "ids"
new_shortest_path = tuple(new_shortest_path) #To a tuple (to add in a set if necessary)
#We concatenate the shortest path between the node j and the end vertex to the left part of the path until j
new_optional_path = path_before_node_j + new_shortest_path
new_optional_path_vertices = path_before_node_j_vertices + new_shortest_path_vertices
new_optional_path_cost = EdgeListCost(initial_graph, new_optional_path, weights)
if graph.es.find(abs_id=new_optional_path[-1]).target != target:
raise Exception("Wrong selected path") #Safeguard
#We add the path to the list of possible ones
optional_path.add(new_optional_path)
optional_path_vertices[new_optional_path] = new_optional_path_vertices #[graph.es.find(abs_id = edge_id).source for edge_id in new_optional_path] #we convert it to a list of vertex ids
optional_path_queue.put_nowait((new_optional_path_cost, new_optional_path))
#Delete all following edges
edges_to_delete = graph.es.select(abs_id_in=next_edges)
for edge_to_delete in edges_to_delete:
if edge_to_delete.source != node_j:
raise Exception("Wrong edge to delete!") #References to edges shifted
deleted_edges.append((edge_to_delete.source, edge_to_delete.target, edge_to_delete.attributes()))
edges_to_delete.delete()
edges_to_delete = []
#Next iteration (next node j)
#We tried all edges so we are at the end node
##Step 7
#We select the path with the minimal cost and consider it as current_path
if optional_path == None:
raise Exception("Error") #This should NOT happen, optional_path can be empty though
break #this will exit the loop since current_path == None
try:
current_path_cost, current_path = optional_path_queue.get_nowait()
except queue.Empty:
print("No more possible paths")
break
if current_path == None:
raise Exception("error to manage")
current_path_vertices = optional_path_vertices[current_path] #We set the path containing vertices ids for the next iteration
#We have the path with the least cost
result_path.append(current_path)
result_path_vertices.append(current_path_vertices)
result_path_costs.append(current_path_cost)
#We remove it from optional_path since it has been selected
optional_path.remove(current_path)
#We've got a new path!
k = k + 1
##Step 8?
#TODO:Is it needed to add all edges back????
for deleted_edge in deleted_edges:
path_source, path_target, path_attributes = deleted_edge
graph.add_edge(path_source, path_target)
graph.es[len(graph.es) - 1].update_attributes(path_attributes)
deleted_edges = []
return result_path, result_path_vertices, result_path_costs
@dr-neeraj-saxena
Copy link

Hi ALenfant,
I need to determine k-shortest paths in a multigraph, and find your above code to be the right fit to solve my problem.
I am new to complex Python programming, and find it hard as to how to make the code work.

Would you be able to share any input files and high-level instructions on how to execute this code.

I'll really appreciate your help on this.

Best
Neeraj

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment