Skip to content

Instantly share code, notes, and snippets.

@Westlife1002
Created May 15, 2021 04:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Westlife1002/03f5f2a1b03841a81221623bce63951e to your computer and use it in GitHub Desktop.
Save Westlife1002/03f5f2a1b03841a81221623bce63951e to your computer and use it in GitHub Desktop.
VRP_10.0
"""Capacitated Vehicle Routing Problem"""
from __future__ import print_function
from six.moves import xrange
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
import networkx as nx
import matplotlib.pyplot as plt
import csv
# from Haversine_Algorithm import distance
import matplotlib
import random
import math
import pandas as pd
import tkinter as tk
# Problem Data Definition #
class Vehicle():
def __init__(self):
self._capacity = 9000
# self.id = id
self.routes = []
self.time_min = []
self.time_max = []
self.load = []
self._speed = 0.31 #mile/min
@property
def capacity(self):
"""Gets vehicle capacity"""
return self._capacity
def routes(self):
return self.routes
def speed(self):
return self._speed
class DataProblem():
"""Stores the data for the problem"""
def __init__(self):
"""Initializes the data for the problem"""
self._vehicle = Vehicle()
self._num_vehicles = num_vehicles()
self._locations = []
self._demands = []
self._time_windows = []
df = pd.read_csv("VRP_input.csv", encoding="cp1252")
self.dataframe = df
# print(df)
self._demands = df["Chargeable Weight"].tolist()
lat = df["Lat"].tolist()
lon = df["Long"].tolist()
self._locations = list(zip(lat,lon))
# print(self._locations )
earliest = df['earliest'].tolist()
latest = df['latest'].tolist()
self._time_windows = list(zip(earliest,latest))
# print(self._time_windows)
self._depot = 0
@property
def getvehicle(self):
"""Gets a vehicle"""
return self._vehicle
@property
def num_vehicles(self):
"""Gets number of vehicles"""
return self._num_vehicles
@property
def locations(self):
"""Gets locations"""
return self._locations
@property
def num_locations(self):
"""Gets number of locations"""
return len(self.locations)
@property
def depot(self):
"""Gets depot location index"""
return self._depot
@property
def demands(self):
"""Gets demands at each location"""
return self._demands
@property
def time_per_demand_unit(self):
"""Gets the time (in min) to load a demand"""
return 20 # average 20 minutes
@property
def time_windows(self):
"""Gets (start time, end time) for each locations"""
return self._time_windows
def distance(lat1, long1, lat2, long2):
# Note: The formula used in this function is not exact, as it assumes
# the Earth is a perfect sphere.
# Mean radius of Earth in miles
radius_earth = 3959
# Convert latitude and longitude to
# spherical coordinates in radians.
degrees_to_radians = math.pi/180.0
phi1 = lat1 * degrees_to_radians
phi2 = lat2 * degrees_to_radians
lambda1 = long1 * degrees_to_radians
lambda2 = long2 * degrees_to_radians
dphi = phi2 - phi1
dlambda = lambda2 - lambda1
a = haversine(dphi) + math.cos(phi1) * math.cos(phi2) * haversine(dlambda)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius_earth * c
return d
def haversine(angle):
h = math.sin(angle / 2) ** 2
return h
# print(DataProblem().locations)
# print(DataProblem().demands)
#######################
# Problem Constraints #
#######################
class CreateDistanceEvaluator(object): # pylint: disable=too-few-public-methods
"""Creates callback to return distance between points."""
def __init__(self, data, manager):
"""Initializes the distance matrix."""
self._distances = {}
self.manager = manager
self.dist_matrix = []
# @property
# def manager(self):
# """Gets routing model"""
# return self._manager
# precompute distance between location to have distance callback in O(1)
for from_node in xrange(data.num_locations):
self._distances[from_node] = {}
temp = []
for to_node in xrange(data.num_locations):
if from_node == to_node:
self._distances[from_node][to_node] = 0
temp.append(0)
else:
x1 = data.locations[from_node][0] # Ycor of end1
# print (x1)
y1 = data.locations[from_node][1] # Xcor of end1
# print(y1)
x2 = data.locations[to_node][0] # Ycor of end2
y2 = data.locations[to_node][1] # Xcor of end2
self._distances[from_node][to_node] = int(
distance(x1, y1, x2, y2))
temp.append(int(distance(x1, y1, x2, y2)))
self.dist_matrix.append(temp)
# def distance_evaluator(self, from_index, to_index):
# """Returns the Harversine Distance between the two nodes"""
# # Convert from routing variable Index to time matrix NodeIndex.
# # print("from_index: ", from_index)
# # print("self.dist_matrix: ", self.dist_matrix)
# # print("type(from_index)", from_index)
# from_node = self.manager.IndexToNode(from_index)
# # print("type(from_node)", from_node)
# # print("")
# # print("from_node: ", from_node)
# to_node = self.manager.IndexToNode(to_index)
# # return self._distances[from_node][to_node]
# return self.dist_matrix[from_node][to_node]
# def distance_evaluator(self, from_index, to_index):
# """Returns the Harversine Distance between the two nodes"""
# return self._distances[from_index][to_index]
class CreateDemandEvaluator(object):
"""Creates callback to get demands at each location."""
def __init__(self, data, manager):
"""Initializes the demand array."""
self._demands = data.demands
self._manager = manager
def demand_evaluator(self, from_index):
"""Returns the demand of the current node"""
from_node = self._manager.IndexToNode(from_index)
return self._demands[from_node]
class CreateTimeEvaluator(object):
"""Creates callback to get total times between locations."""
@staticmethod
def service_time(data, node):
"""Gets the service time for the specified location."""
# return data.demands[node] * data.time_per_demand_unit #function of volume at this node
return data.time_per_demand_unit #constant service time for all nodes
@staticmethod
def travel_time(data, from_node, to_node):
"""Gets the travel times between two locations."""
if from_node == to_node:
travel_time = 0
else:
x1=data.locations[from_node][0]
y1=data.locations[from_node][1]
x2=data.locations[to_node][0]
y2=data.locations[to_node][1]
travel_time = distance(
x1,y1,x2,y2) / data.getvehicle.speed()
return travel_time
def __init__(self, data, manager):
"""Initializes the total time matrix."""
self._total_time = {}
self._manager = manager
# precompute total time to have time callback in O(1)
for from_node in xrange(data.num_locations):
self._total_time[from_node] = {} #under total_time {}, add key = this from_node, value ={}
for to_node in xrange(data.num_locations):
if from_node == to_node:
self._total_time[from_node][to_node] = 0 #under this from_node {}, add key = this to_node, value = 0
else:
self._total_time[from_node][to_node] = int( #under this from_node {}, add key = this to_node, value = service_time + travel_time
self.service_time(data, from_node) +
self.travel_time(data, from_node, to_node))
# print(self._total_time)
# print(self._total_time)
def time_evaluator(self, from_index, to_index):
"""Returns the total time between the two nodes"""
from_node = self._manager.IndexToNode(from_index)
to_node = self._manager.IndexToNode(to_index)
return self._total_time[from_node][to_node]
def add_distance_dimension(routing, transit_callback_index):
"""Add Global Span constraint"""
distance = "Distance"
maximum_distance = 360
routing.AddDimension(
transit_callback_index,
0, # null slack
maximum_distance, # maximum distance per vehicle
True, # start cumul to zero
distance)
distance_dimension = routing.GetDimensionOrDie(distance)
# Try to minimize the max distance among vehicles.
# /!\ It doesn't mean the standard deviation is minimized
# distance_dimension.SetGlobalSpanCostCoefficient(100)
# def add_capacity_constraints(routing, data, demand_evaluator):
# """Adds capacity constraint"""
# capacity = "Capacity"
# routing.AddDimension(
# demand_evaluator,
# 0, # null capacity slack
# data.getvehicle.capacity, # vehicle maximum capacity
# True, # start cumul to zero
# capacity)
def add_capacity_constraints(routing, data, demand_callback_index):
"""Adds capacity constraint"""
capacity = "Capacity"
routing.AddDimensionWithVehicleCapacity(
demand_callback_index,
0, # null capacity slack
capacity_vector(), # vector vehicle_capacity
True, # start cumul to zero
capacity)
def add_time_window_constraints(routing, data, time_callback_index, manager):
"""Add Global Span constraint"""
time = "Time"
horizon = 2000
routing.AddDimension(
time_callback_index,
horizon, # allow waiting time
horizon, # maximum time per vehicle
False, # don't force start cumul to zero since we are giving TW to start nodes
time)
time_dimension = routing.GetDimensionOrDie(time)
for location_idx, time_window in enumerate(data.time_windows):
if location_idx == 0:
continue
index = manager.NodeToIndex(location_idx)
# print(index)
time_dimension.CumulVar(index).SetRange(time_window[0], time_window[1])
# print(time_dimension.CumulVar(index))
routing.AddToAssignment(time_dimension.SlackVar(index))
for vehicle_id in xrange(data.num_vehicles):
index = routing.Start(vehicle_id)
# set a constraint on the start or the end node for each vehicle, which is not included in above
time_dimension.CumulVar(index).SetRange(data.time_windows[0][0], data.time_windows[0][1])
routing.AddToAssignment(time_dimension.SlackVar(index))
###########
# Printer #
###########
vlist=[]
load_list=[]
time_list_min=[]
time_list_max=[]
class ConsolePrinter():
"""Print solution to console"""
def __init__(self, data, manager, routing, assignment):
"""Initializes the printer"""
self._data = data
self._routing = routing
self._assignment = assignment
self._manager = manager
@property
def data(self):
"""Gets problem data"""
return self._data
@property
def routing(self):
"""Gets routing model"""
return self._routing
@property
def assignment(self):
"""Gets routing model"""
return self._assignment
@property
def manager(self):
"""Gets routing model"""
return self._manager
def print(self):
"""Prints assignment on console"""
# Inspect solution.
capacity_dimension = self.routing.GetDimensionOrDie('Capacity')
time_dimension = self.routing.GetDimensionOrDie('Time')
total_dist = 0
total_time = 0
global vlist
global load_list
global time_list_min
global time_list_max
for vehicle_id in xrange(self.data.num_vehicles):
index = self.routing.Start(vehicle_id)
# print(index)
plan_output = 'Route for vehicle {0}:\n'.format(vehicle_id)
route_dist = 0
this_vehicle = Vehicle()
while not self.routing.IsEnd(index):
node_index = self.manager.IndexToNode(index)
# print(node_index)
this_vehicle.routes.append(self.manager.IndexToNode(index))
# print(this_vehicle.routes)
# print(self.data.vehicle.routes)
# print(self.data.vehicle.routes.append(temp_id))
next_node_index = self.manager.IndexToNode(
self.assignment.Value(self.routing.NextVar(index)))
route_dist += distance(
self.data.locations[node_index][0],
self.data.locations[node_index][1],
self.data.locations[next_node_index][0],
self.data.locations[next_node_index][1])
load_var = capacity_dimension.CumulVar(index)
# print("load_var: ", load_var)
route_load = self.assignment.Value(load_var)
# route_load += self.data.demands[node_index]
time_var = time_dimension.CumulVar(index)
time_min = self.assignment.Min(time_var)
time_max = self.assignment.Max(time_var)
this_vehicle.load.append(route_load)
this_vehicle.time_min.append(time_min)
this_vehicle.time_max.append(time_max)
slack_var = time_dimension.SlackVar(index)
slack_min = self.assignment.Min(slack_var)
slack_max = self.assignment.Max(slack_var)
plan_output += ' {0} Load({1}) Time({2},{3}) Slack({4},{5}) ->'.format(
node_index,
route_load,
time_min, time_max,
slack_min, slack_max)
index = self.assignment.Value(self.routing.NextVar(index))
node_index = self.manager.IndexToNode(index)
load_var = capacity_dimension.CumulVar(index)
# print("truck type: ", str(load_var).split("..")[1][:-1])
route_load = self.assignment.Value(load_var)
time_var = time_dimension.CumulVar(index)
route_time = self.assignment.Value(time_var)
time_min = self.assignment.Min(time_var)
time_max = self.assignment.Max(time_var)
total_dist += route_dist
total_time += route_time
plan_output += ' {0} Load({1}) Time({2},{3})\n'.format(node_index, route_load, time_min, time_max)
plan_output += 'Distance of the route: {0} km\n'.format(route_dist)
plan_output += 'Load of the route: {0}\n'.format(route_load)
plan_output += 'Time of the route: {0} min\n'.format(route_time)
plan_output += 'Truck Type: {0} kg\n'.format(str(load_var).split("..")[1][:-1])
print(plan_output)
this_vehicle.routes.append(0)
this_vehicle.load.append(route_load)
this_vehicle.time_min.append(time_min)
this_vehicle.time_max.append(time_max)
vlist.append(this_vehicle.routes)
load_list.append(this_vehicle.load)
time_list_min.append(this_vehicle.time_min)
time_list_max.append(this_vehicle.time_max)
# print(vlist)
print('Total Distance of all routes: {0} km'.format(total_dist))
print('Total Time of all routes: {0} min'.format(total_time))
def PickupColor(count):
default_cl=["red", "blue","green","orange","yellow","orchid","black"]
if count <= len(default_cl) - 1:
cl=default_cl[count]
else:
temp=random.choice(list(matplotlib.colors.cnames.items()))[0]
while "white" in temp or "gray" in temp and temp in default_cl:
temp = random.choice(list(matplotlib.colors.cnames.items()))[0]
continue
cl=temp
return cl
def DrawNetwork():
G = nx.DiGraph()
locations = DataProblem()._locations
# print(locations)
x = 0
print(vlist)
for vehicle_id in vlist:
n = 0
e = []
node = []
cl=PickupColor(x)
# print(cl)
# print(data.num_vehicles)
# print(this_vehicle.id)
# print(this_vehicle.routes)
for i in vehicle_id:
G.add_node(i, pos=(locations[i][0], locations[i][1]))
# a= [locations[i][0], locations[i][1]]
# print(a)
if n > 0:
# print(n)
# print(vehicle_id.routes[n])
# print (vehicle_id.routes[n-1])
u = (vehicle_id[n - 1], vehicle_id[n])
e.append(u)
node.append(i)
G.add_edge(vehicle_id[n - 1], vehicle_id[n])
nx.draw(G, nx.get_node_attributes(G, 'pos'), nodelist=node, edgelist=e, with_labels=True,
node_color=cl, width=2, edge_color=cl,
style='dashed', font_color='w', font_size=12, font_family='sans-serif')
n += 1
x += 1
# let's color the node 0 in black
nx.draw_networkx_nodes(G, locations, nodelist=[0], node_color='k')
plt.axis('on')
# plt.show()
def Reporting(data):
df = data.dataframe
df['Vehicle_ID'] = 0
df['Pos_in_route'] = 0
df['arr_min'] = 0
df['arr_max'] = 0
df['total_load'] = 0
df['prior_lat'] = 0
df['prior_lon'] = 0
veh = vlist
global prior_list
prior_list=[]
lst_GPS = df[['Lat','Long']].to_dict()
df1 = pd.DataFrame(columns=df.columns) #Create an empty dataframe
for i in range(len(veh)):
veh_id_dict = dict.fromkeys(veh[i][:-1], i)
veh_pos_dict = dict(zip(veh[i][:-1], range(len(veh[i]) - 1)))
veh_arrmin = dict(zip(veh[i][:-1], time_list_min[i][:-1]))
veh_arrmax = dict(zip(veh[i][:-1], time_list_max[i][:-1]))
veh_load = dict.fromkeys(veh[i][:-1], load_list[i][-1])
# print(veh[i])
veh_prior = veh[i].copy()
veh_prior.insert(1,0)
veh_prior = veh_prior[:-1]
# print(veh_prior)
lat = list(zip(*list(zip(veh_prior, [lst_GPS['Lat'][i] for i in veh_prior]))))[1]
lon = list(zip(*list(zip(veh_prior, [lst_GPS['Long'][i] for i in veh_prior]))))[1]
# print(lat)
prior_list.append([lat,lon])
veh_prior_lat = dict(zip(veh[i][:-1], lat))
veh_prior_lon = dict(zip(veh[i][:-1], lon))
# print(veh_prior_lat)
df.loc[veh[i][:-1], 'Vehicle_ID'] = df.loc[veh[i][:-1]].ID.map(veh_id_dict)
df.loc[veh[i][:-1], 'Pos_in_route'] = df.loc[veh[i][:-1]].ID.map(veh_pos_dict)
df.loc[veh[i][:-1], 'arr_min'] = df.loc[veh[i][:-1]].ID.map(veh_arrmin)
df.loc[veh[i][:-1], 'arr_max'] = df.loc[veh[i][:-1]].ID.map(veh_arrmax)
df.loc[veh[i][:-1], 'total_load'] = df.loc[veh[i][:-1]].ID.map(veh_load)
df.loc[veh[i][:-1], 'prior_lat'] = df.loc[veh[i][:-1]].ID.map(veh_prior_lat)
df.loc[veh[i][:-1], 'prior_lon'] = df.loc[veh[i][:-1]].ID.map(veh_prior_lon)
# print(df.loc[veh[i][:-1], ['Vehicle_ID', 'Pos_in_route','prior_lat']])
df1 = df1.append(df.loc[veh[i][:-1]])
df1.sort_values(['ID', 'Vehicle_ID'], inplace=True)
for i in range(len(veh)):
df.loc[veh[i][-1], 'Vehicle_ID'] = i
df.loc[veh[i][-1], 'Pos_in_route'] = len(veh[i])-1
df.loc[veh[i][-1], 'arr_min'] = time_list_min[i][-1]
df.loc[veh[i][-1], 'arr_max'] = time_list_max[i][-1]
df.loc[veh[i][-1], 'total_load'] = load_list[i][-1]
df.loc[veh[i][:-1], 'prior_lat'] = prior_list[i][0][-1]
df.loc[veh[i][:-1], 'prior_lon'] = prior_list[i][1][-1]
df1 = df1.append(df.loc[veh[i][-1]])
df1['Utilization_Rate'] = df1['total_load'] / data.getvehicle.capacity
# print(df1)
# Create a Pandas Excel writer using XlsxWriter as the engine.
writer = pd.ExcelWriter('VRP_result.xlsx', engine='xlsxwriter')
# Convert the dataframe to an XlsxWriter Excel object.
df1.to_excel(writer, sheet_name='Sheet1')
writer.save()
########
# Main #
########
def main():
"""Entry point of the program"""
# Instantiate the data problem.
data = DataProblem()
# Create Routing Model
manager = pywrapcp.RoutingIndexManager(data.num_locations, data.num_vehicles, data.depot)
routing = pywrapcp.RoutingModel(manager)
# Define weight of each edge
dist_evaluator = CreateDistanceEvaluator(data,manager)
# print(dist_evaluator.dist_matrix)
def distance_evaluator(from_index, to_index):
"""Returns the Harversine Distance between the two nodes"""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
# print("from_index: ", from_index)
# print("to_index: ", to_index)
# print("from_node: ", from_node)
# print("to_node: ", to_node)
# print("dist: ", dist_evaluator.dist_matrix[from_node][to_node])
return dist_evaluator.dist_matrix[from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(distance_evaluator)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# Add Distance constraint
add_distance_dimension(routing, transit_callback_index)
# Add Capacity constraint
demand_evaluator = CreateDemandEvaluator(data, manager).demand_evaluator
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_evaluator)
add_capacity_constraints(routing, data, demand_callback_index)
# Add Time Window constraint
time_evaluator = CreateTimeEvaluator(data, manager).time_evaluator
time_callback_index = routing.RegisterTransitCallback(time_evaluator)
add_time_window_constraints(routing, data, time_callback_index, manager)
# Setting first solution heuristic (cheapest addition).
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
# 1).First Solution Heuristic
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
# 2).Guided Local Search Heuristics
# search_parameters.local_search_metaheuristic = (
# routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
# search_parameters.log_search = True
# search_parameters.time_limit.FromSeconds(10)
# search_parameters.time_limit_ms = 3000
# 3).Simulated Annealing Heuristics
# search_parameters.local_search_metaheuristic = (
# routing_enums_pb2.LocalSearchMetaheuristic.SIMULATED_ANNEALING)
# search_parameters.time_limit_ms = 1800000
# 4).TABU_SEARCH Heuristics
# search_parameters.local_search_metaheuristic = (
# routing_enums_pb2.LocalSearchMetaheuristic.TABU_SEARCH)
# search_parameters.time_limit_ms = 600000
# Solve the problem.
# assignment = routing.SolveWithParameters(search_parameters)
solution = routing.SolveWithParameters(search_parameters)
if solution:
printer = ConsolePrinter(data, manager, routing, solution)
# print_solution(data, manager, routing, solution)
printer.print()
DrawNetwork()
Reporting(data)
plt.show()
else:
print("No solution found !")
# if __name__ == '__main__':
# main()
window = tk.Tk()
window.title('VRP 2.0')
window.geometry('500x500')
l1 = tk.Label(window, bg='yellow', width=50, text='empty')
l1.pack()
l2 = tk.Label(window, bg='yellow', width=50, text='empty')
l2.pack()
def print_selection1(v):
l1.config(text='You have selected total of ' + str(num_vehicles()) +' trucks')
def print_selection2(v):
l2.config(text='Truck Capacity is: ' + v + ' kg')
def num_vehicles():
global t1
global t2
global t3
global t4
global t5
t1=s1.get()
t2=s2.get()
t3=s3.get()
t4= s4.get()
t5= s5.get()
return t1+t2+t3+t4+t5
def capacity_vector():
return [1660]*t1+[2324]*t2+[4980]*t3+[7470]*t4+[14940]*t5
s1 = tk.Scale(window, label='Sprinter', from_=0, to=30, orient=tk.HORIZONTAL,
length=400, showvalue=1, tickinterval=5, resolution=1, command=print_selection1)
s1.pack()
s2 = tk.Scale(window, label='Luton Van', from_=0, to=30, orient=tk.HORIZONTAL,
length=400, showvalue=1, tickinterval=5, resolution=1, command=print_selection1)
s2.pack()
s3 = tk.Scale(window, label='7.5 Tonner', from_=0, to=30, orient=tk.HORIZONTAL,
length=400, showvalue=1, tickinterval=5, resolution=1, command=print_selection1)
s3.pack()
s4 = tk.Scale(window, label='17.5 Tonner', from_=0, to=30, orient=tk.HORIZONTAL,
length=400, showvalue=1, tickinterval=5, resolution=1, command=print_selection1)
s4.pack()
s5 = tk.Scale(window, label='Tralier', from_=0, to=30, orient=tk.HORIZONTAL,
length=400, showvalue=1, tickinterval=5, resolution=1, command=print_selection1)
s5.pack()
b = tk.Button(window, text='Run', width=15,
height=2, command=main)
b.pack()
window.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment