Last active
July 25, 2022 13:44
-
-
Save jackbergus/96693910481163e0e0bde91e448f655a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright 2022 Giacomo Bergami | |
# | |
# This file is part of MaxCDijkstra | |
# | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; either version 2 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program; if not, see <http://www.gnu.org/licenses/>. | |
import copy | |
import pandas | |
INF = float("inf") | |
class MyGraph(object): | |
def __init__(self): | |
self.outgoing_edges = dict() | |
self.edge_weight = dict() | |
self.charge_power = dict() | |
def add_edge(self, src, dst, weight): | |
if src not in self.outgoing_edges: | |
self.outgoing_edges[src] = set() | |
self.outgoing_edges[src].add(dst) | |
self.edge_weight[(src, dst)] = weight | |
def out_edges(self, src): | |
if not src in self.outgoing_edges: | |
return set() | |
else: | |
return self.outgoing_edges[src] | |
def weight(self, src, dst): | |
cp = (src, dst) | |
if not cp in self.edge_weight: | |
return 0 | |
else: | |
return self.edge_weight[cp] | |
def load_edges_csv(self, filename): | |
for _, row in pandas.read_csv(filename, names=["src", "dst", "weight"]).iterrows(): | |
self.add_edge(row["src"], row["dst"], row["weight"]) | |
def load_charging_power(self, filename): | |
for _, row in pandas.read_csv(filename, names=["node", "power"]).iterrows(): | |
self.charge_power[row["node"]] = row["power"] | |
def get_charge_per_time_unit(self, node): | |
if node in self.charge_power: | |
return self.charge_power[node] | |
else: | |
return 0.0 | |
def is_charging_station(self, node): | |
if node in self.charge_power: | |
return self.charge_power[node] > 0.0 | |
else: | |
return False | |
def initNRecharges(self): | |
d = dict() | |
for node in self.charge_power: | |
d[node] = 0 | |
return d | |
def countTotalPowerUsedInRecharge(self, nRecharges): | |
total = 0.0 | |
for x in nRecharges: | |
total = total + self.get_charge_per_time_unit(x) * nRecharges[x] | |
return total | |
g = MyGraph() | |
g.load_edges_csv("/home/giacomo/PycharmProjects/pythonProject1/graph.txt") | |
g.load_charging_power("/home/giacomo/PycharmProjects/pythonProject1/charging_power.csv") | |
print(g) | |
class pathWithCost(object): | |
def __init__(self, path, cost, nRecharges): | |
self.path = path | |
self.cost = cost | |
self.nRecharges = nRecharges | |
def __str__(self): | |
return "path: " + str(self.path) + " with cost:" + str(self.cost) + " and greedy recharges: " + str( | |
self.nRecharges) | |
def extendWith(self, x): | |
self.path.extend(x.getPath()) | |
self.cost = x.getCost() | |
self.nRecharges = x.getNRecharges() | |
def getNRecharges(self): | |
return self.nRecharges | |
def setNRecharges(self, x): | |
self.nRecharges = x | |
def getPath(self): | |
return self.path | |
def getCost(self): | |
return self.cost | |
def setPath(self, x): | |
self.path = x | |
def appendToPath(self, x): | |
self.path.append(x) | |
def setCost(self, c): | |
self.cost = c | |
def addToCost(self, c): | |
self.cost = self.cost + c | |
def print(self, g): | |
return "path: " + str(self.path) + " with cost:" + str(self.cost) + " and greedy recharges: " + str( | |
g.countTotalPowerUsedInRecharge(self.nRecharges)) + " from " + str(self.nRecharges) | |
def find_shortest_path(graph, start, end, nRecharges, pathCost, vehicleChargeCapacity, vehicleMaxCapacity): | |
# Default path if no solution was found | |
cp = pathWithCost(list(), INF, nRecharges) | |
if (start == end) and (vehicleChargeCapacity > 0): | |
# If we reached destination with a vehicle which is still fully charged | |
cp.appendToPath(start) | |
cp.setCost(pathCost) | |
return cp | |
elif len(graph.out_edges(start)) == 0 or vehicleChargeCapacity < 0: | |
# Otherwise, if either the current node is a deadlock or the vehicle is | |
# out of battery, then no more viable path can be pursued | |
return cp | |
# If all the other test are passed, I am attempting at generating a novel solution | |
cp.appendToPath(start) | |
# Still, at this stage, no further path was found, and so the minimum path has infinite cost | |
shortest = pathWithCost(list(), INF, nRecharges) | |
if not graph.is_charging_station(start): | |
# If the current node start is not a charging station, then do the Dijkstra algorithm as usual | |
for node in graph.out_edges(start): | |
if not node in cp.getPath(): | |
cost = graph.weight(start, node) | |
newpath = find_shortest_path(graph, | |
node, | |
end, | |
copy.deepcopy(nRecharges), | |
pathCost + cost, | |
vehicleChargeCapacity - cost, | |
vehicleMaxCapacity) | |
npC = newpath.getCost() | |
sC = shortest.getCost() | |
nT = graph.countTotalPowerUsedInRecharge(newpath.getNRecharges()) | |
sT = graph.countTotalPowerUsedInRecharge(shortest.getNRecharges()) | |
# We are preferring the novel solution to the previously computed one if and only if the former | |
# minimizes the overall cost with the recharge | |
if ((npC < sC) or (npC == sC and nT < sT)) and (len(newpath.getPath()) > 0): | |
shortest = newpath | |
# At the end, we can only extend the current path with the one that was computed before | |
cp.extendWith(shortest) | |
return cp | |
else: | |
# Otherwise, we are in a charging station, and we need to try the minimum amount of recharge required to visit | |
# the path | |
while True: | |
# Computing the maximum recharge at this current iteration, in nRecharges[start] | |
maxRecharge = nRecharges[start] * graph.get_charge_per_time_unit(start) | |
# Then, running Dijkstra as usual, where now the currentCapacity considers the maxRecharge available at this | |
# iteration | |
for node in graph.out_edges(start): | |
if not node in cp.getPath(): | |
cost = graph.weight(start, node) | |
capacityAfterRecharge = min(vehicleMaxCapacity, vehicleChargeCapacity + maxRecharge) | |
actualRecharge = capacityAfterRecharge - vehicleChargeCapacity | |
currentCapacity = capacityAfterRecharge - cost | |
newpath = find_shortest_path(graph, | |
node, | |
end, | |
copy.deepcopy(nRecharges), | |
pathCost + cost, | |
currentCapacity, | |
vehicleMaxCapacity) | |
npC = newpath.getCost() | |
sC = shortest.getCost() | |
nT = graph.countTotalPowerUsedInRecharge(newpath.getNRecharges()) | |
sT = graph.countTotalPowerUsedInRecharge(shortest.getNRecharges()) | |
if ((npC < sC) or (npC == sC and nT < sT)) and (len(newpath.getPath()) > 0): | |
shortest = newpath | |
if (shortest.getCost() < INF) and (len(shortest.getPath()) > 0): | |
# A solution was found if the navigation cost is not INF and the length of the path is not-zero. | |
# By previous definitions, this entails that | |
cp.extendWith(shortest) | |
return cp | |
if min(vehicleChargeCapacity + maxRecharge, vehicleMaxCapacity) < vehicleMaxCapacity: | |
# A solution was not found yet, but the recharge on the current charging station | |
# has not reached yet the maximum car recharge | |
nRecharges[start] = nRecharges[start] + 1 | |
shortest.setPath(list()) | |
shortest.setCost(INF) | |
else: | |
# Otherwise, we reached the maximum charge and, nevertheless, no viable path was found. | |
# Returning the impossible path | |
return pathWithCost(list(), INF, nRecharges) | |
# Printing the solution | |
print(find_shortest_path(g, 1, 4, g.initNRecharges(), 0, 5, 5).print(g)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment