Skip to content

Instantly share code, notes, and snippets.

@jackbergus
Last active July 25, 2022 13:44
Show Gist options
  • Save jackbergus/96693910481163e0e0bde91e448f655a to your computer and use it in GitHub Desktop.
Save jackbergus/96693910481163e0e0bde91e448f655a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2022 Giacomo Bergami
#
# This file is part of MaxCDijkstra
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import pandas
INF = float("inf")
class MyGraph(object):
def __init__(self):
self.outgoing_edges = dict()
self.edge_weight = dict()
self.charge_power = dict()
def add_edge(self, src, dst, weight):
if src not in self.outgoing_edges:
self.outgoing_edges[src] = set()
self.outgoing_edges[src].add(dst)
self.edge_weight[(src, dst)] = weight
def out_edges(self, src):
if not src in self.outgoing_edges:
return set()
else:
return self.outgoing_edges[src]
def weight(self, src, dst):
cp = (src, dst)
if not cp in self.edge_weight:
return 0
else:
return self.edge_weight[cp]
def load_edges_csv(self, filename):
for _, row in pandas.read_csv(filename, names=["src", "dst", "weight"]).iterrows():
self.add_edge(row["src"], row["dst"], row["weight"])
def load_charging_power(self, filename):
for _, row in pandas.read_csv(filename, names=["node", "power"]).iterrows():
self.charge_power[row["node"]] = row["power"]
def get_charge_per_time_unit(self, node):
if node in self.charge_power:
return self.charge_power[node]
else:
return 0.0
def is_charging_station(self, node):
if node in self.charge_power:
return self.charge_power[node] > 0.0
else:
return False
def initNRecharges(self):
d = dict()
for node in self.charge_power:
d[node] = 0
return d
def countTotalPowerUsedInRecharge(self, nRecharges):
total = 0.0
for x in nRecharges:
total = total + self.get_charge_per_time_unit(x) * nRecharges[x]
return total
g = MyGraph()
g.load_edges_csv("/home/giacomo/PycharmProjects/pythonProject1/graph.txt")
g.load_charging_power("/home/giacomo/PycharmProjects/pythonProject1/charging_power.csv")
print(g)
class pathWithCost(object):
def __init__(self, path, cost, nRecharges):
self.path = path
self.cost = cost
self.nRecharges = nRecharges
def __str__(self):
return "path: " + str(self.path) + " with cost:" + str(self.cost) + " and greedy recharges: " + str(
self.nRecharges)
def extendWith(self, x):
self.path.extend(x.getPath())
self.cost = x.getCost()
self.nRecharges = x.getNRecharges()
def getNRecharges(self):
return self.nRecharges
def setNRecharges(self, x):
self.nRecharges = x
def getPath(self):
return self.path
def getCost(self):
return self.cost
def setPath(self, x):
self.path = x
def appendToPath(self, x):
self.path.append(x)
def setCost(self, c):
self.cost = c
def addToCost(self, c):
self.cost = self.cost + c
def print(self, g):
return "path: " + str(self.path) + " with cost:" + str(self.cost) + " and greedy recharges: " + str(
g.countTotalPowerUsedInRecharge(self.nRecharges)) + " from " + str(self.nRecharges)
def find_shortest_path(graph, start, end, nRecharges, pathCost, vehicleChargeCapacity, vehicleMaxCapacity):
# Default path if no solution was found
cp = pathWithCost(list(), INF, nRecharges)
if (start == end) and (vehicleChargeCapacity > 0):
# If we reached destination with a vehicle which is still fully charged
cp.appendToPath(start)
cp.setCost(pathCost)
return cp
elif len(graph.out_edges(start)) == 0 or vehicleChargeCapacity < 0:
# Otherwise, if either the current node is a deadlock or the vehicle is
# out of battery, then no more viable path can be pursued
return cp
# If all the other test are passed, I am attempting at generating a novel solution
cp.appendToPath(start)
# Still, at this stage, no further path was found, and so the minimum path has infinite cost
shortest = pathWithCost(list(), INF, nRecharges)
if not graph.is_charging_station(start):
# If the current node start is not a charging station, then do the Dijkstra algorithm as usual
for node in graph.out_edges(start):
if not node in cp.getPath():
cost = graph.weight(start, node)
newpath = find_shortest_path(graph,
node,
end,
copy.deepcopy(nRecharges),
pathCost + cost,
vehicleChargeCapacity - cost,
vehicleMaxCapacity)
npC = newpath.getCost()
sC = shortest.getCost()
nT = graph.countTotalPowerUsedInRecharge(newpath.getNRecharges())
sT = graph.countTotalPowerUsedInRecharge(shortest.getNRecharges())
# We are preferring the novel solution to the previously computed one if and only if the former
# minimizes the overall cost with the recharge
if ((npC < sC) or (npC == sC and nT < sT)) and (len(newpath.getPath()) > 0):
shortest = newpath
# At the end, we can only extend the current path with the one that was computed before
cp.extendWith(shortest)
return cp
else:
# Otherwise, we are in a charging station, and we need to try the minimum amount of recharge required to visit
# the path
while True:
# Computing the maximum recharge at this current iteration, in nRecharges[start]
maxRecharge = nRecharges[start] * graph.get_charge_per_time_unit(start)
# Then, running Dijkstra as usual, where now the currentCapacity considers the maxRecharge available at this
# iteration
for node in graph.out_edges(start):
if not node in cp.getPath():
cost = graph.weight(start, node)
capacityAfterRecharge = min(vehicleMaxCapacity, vehicleChargeCapacity + maxRecharge)
actualRecharge = capacityAfterRecharge - vehicleChargeCapacity
currentCapacity = capacityAfterRecharge - cost
newpath = find_shortest_path(graph,
node,
end,
copy.deepcopy(nRecharges),
pathCost + cost,
currentCapacity,
vehicleMaxCapacity)
npC = newpath.getCost()
sC = shortest.getCost()
nT = graph.countTotalPowerUsedInRecharge(newpath.getNRecharges())
sT = graph.countTotalPowerUsedInRecharge(shortest.getNRecharges())
if ((npC < sC) or (npC == sC and nT < sT)) and (len(newpath.getPath()) > 0):
shortest = newpath
if (shortest.getCost() < INF) and (len(shortest.getPath()) > 0):
# A solution was found if the navigation cost is not INF and the length of the path is not-zero.
# By previous definitions, this entails that
cp.extendWith(shortest)
return cp
if min(vehicleChargeCapacity + maxRecharge, vehicleMaxCapacity) < vehicleMaxCapacity:
# A solution was not found yet, but the recharge on the current charging station
# has not reached yet the maximum car recharge
nRecharges[start] = nRecharges[start] + 1
shortest.setPath(list())
shortest.setCost(INF)
else:
# Otherwise, we reached the maximum charge and, nevertheless, no viable path was found.
# Returning the impossible path
return pathWithCost(list(), INF, nRecharges)
# Printing the solution
print(find_shortest_path(g, 1, 4, g.initNRecharges(), 0, 5, 5).print(g))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment