Skip to content

Instantly share code, notes, and snippets.

@goodhamgupta
Created December 8, 2022 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save goodhamgupta/45e97642ae3680e5ca167303879c50fb to your computer and use it in GitHub Desktop.
Save goodhamgupta/45e97642ae3680e5ca167303879c50fb to your computer and use it in GitHub Desktop.
CVRPTW with Mixed Pickup and Delivery
from functools import partial
from ortools.constraint_solver import routing_enums_pb2, pywrapcp
def create_data_model():
# Dummy data
int_time_matrix = [[0, 15, 34, 29], [15, 0, 20, 14], [34, 20, 0, 7], [29, 14, 7, 0]] # In minutes
cur_distance_matrix = [[0, 11, 24, 20], [11, 0, 14, 10], [24, 14, 0, 5], [20, 10, 5, 0]] # In Kilometres
data['distance_matrix'] = cur_dist_matrix
data['time_matrix'] = int_time_matrix
data['time_windows'] = [(0,720)] * len(cur_dist_matrix) # In minutes. 1 AM to 1 PM
data['demands_small'] = [20, 0, 0, 0] # Small package type
data['demands_large'] = [0, 40, 0, 0] # Large package type
data['demands_deliveries'] = [0, 0, -60, 0] # Delivery
data['demands_total'] = [20, 40, -60, 0]
data['time_per_demand_vehicle'] = 30
data['num_vehicles'] = 2
data['vehicle_capacity'] = 40
data['pickups_deliveries'] = [
[0, 2],
[1, 2],
]
data['depot'] = 3
return data
data = create_data_model()
def distance_evaluator(from_node, to_node):
from_ = manager.IndexToNode(from_node)
to_ = manager.IndexToNode(to_node)
return int(data['distance_matrix'][from_][to_])
def demand_evaluator_small(index):
return int(data['demands_small'][manager.IndexToNode(index)])
def demand_evaluator_large(index):
return int(data['demands_large'][manager.IndexToNode(index)])
def demand_evaluator_deliveries(index):
return int(data['demands_deliveries'][manager.IndexToNode(index)])
def demand_evaluator_total(index):
return int(data['demands_total'][manager.IndexToNode(index)])
def add_distance_constraints(routing, data, distance_evaluator_index):
distance = 'Distance'
routing.AddDimension(
evaluator_index=distance_evaluator_index,
slack_max=0,
capacity=100,
fix_start_cumul_to_zero=True,
name=distance
)
def add_capacity_constraints(
routing,
data,
demand_evaluator_small,
demand_evaluator_large,
demand_evaluator_deliveries,
demand_evaluator_total
):
routing.AddDimension(
demand_evaluator_small,
0,
data['vehicle_capacity'],
True,
"capacity_small"
)
routing.AddDimension(
demand_evaluator_large,
0,
data['vehicle_capacity'],
True,
"capacity_large"
)
routing.AddDimension(
demand_evaluator_deliveries,
0,
data['vehicle_capacity'],
False,
"capacity_deliveries"
)
routing.AddDimension(
demand_evaluator_total,
0,
data['vehicle_capacity'],
False,
"capacity_total"
)
def create_time_evaluator(data):
total_time_matrix = []
for row in data['time_matrix']:
cur_row = []
for col in row:
# Add constant service time per location
col += 30
cur_row.append(col)
total_time_matrix.append(cur_row)
def time_evaluator(manager, from_node, to_node):
from_ = manager.IndexToNode(from_node)
to_ = manager.IndexToNode(to_node)
return int(total_time_matrix[from_][to_])
return time_evaluator
def add_time_window_constraints(routing, manager, data, time_evaluator_index):
time = 'Time'
horizon = 600
routing.AddDimension(
time_evaluator_index,
horizon, # allow waiting time
horizon, # maximum time per vehicle
False, # don't force start cumul to zero since we are giving TW to start nodes. Note that this is not mandatory for this example, since each node has a TW starting from 0.
time
)
time_dimension = routing.GetDimensionOrDie(time)
for location_idx, time_window in enumerate(data['time_windows']):
if location_idx == 3: # Skip depot
continue
index = manager.NodeToIndex(location_idx)
time_dimension.CumulVar(index).SetRange(
time_window[0],
time_window[1]
)
routing.AddToAssignment(time_dimension.SlackVar(index))
for vehicle_id in range(data['num_vehicles']):
index = routing.Start(vehicle_id)
time_dimension.CumulVar(index).SetRange(
data['time_windows'][0][0],
data['time_windows'][0][1]
)
routing.AddToAssignment(time_dimension.SlackVar(index))
def print_solution(manager, routing, assignment): # pylint:disable=too-many-locals
"""Prints assignment on console"""
print(f'Objective: {assignment.ObjectiveValue()}')
time_dimension = routing.GetDimensionOrDie('Time')
cap_small_dim = routing.GetDimensionOrDie('capacity_small')
cap_large_dim = routing.GetDimensionOrDie('capacity_large')
cap_total_dim = routing.GetDimensionOrDie('capacity_total')
total_distance = 0
total_load = 0
total_time = 0
for vehicle_id in range(manager.GetNumberOfVehicles()):
index = routing.Start(vehicle_id)
plan_output = 'Route for vehicle {}:\n'.format(vehicle_id)
distance = 0
while not routing.IsEnd(index):
small_load_var = cap_small_dim.CumulVar(index)
large_load_var = cap_large_dim.CumulVar(index)
total_load_var = cap_total_dim.CumulVar(index)
time_var = time_dimension.CumulVar(index)
slack_var = time_dimension.SlackVar(index)
plan_output += ' {0} Small Load({1}) Large Load({2}) Total Load({3}) Time({4},{5}) Slack({6},{7}) ->\n'.format(
manager.IndexToNode(index),
assignment.Value(small_load_var),
assignment.Value(large_load_var),
assignment.Value(total_load_var),
assignment.Min(time_var),
assignment.Max(time_var),
assignment.Min(slack_var),
assignment.Max(slack_var)
)
previous_index = index
index = assignment.Value(routing.NextVar(index))
distance += routing.GetArcCostForVehicle(previous_index, index,
vehicle_id)
total_load_var = cap_total_dim.CumulVar(index)
time_var = time_dimension.CumulVar(index)
slack_var = time_dimension.SlackVar(index)
plan_output += ' {0} Small Load({1}) Large Load({2}) Total Load({3}) Time({4},{5})\n\n'.format(
manager.IndexToNode(index),
assignment.Value(small_load_var),
assignment.Value(large_load_var),
assignment.Value(total_load_var),
assignment.Min(time_var),
assignment.Max(time_var)
)
plan_output += 'Distance of the route: {0}km\n'.format(distance)
plan_output += 'Load of the route: {}\n'.format(
assignment.Value(total_load_var))
plan_output += 'Time of the route: {} minutes\n'.format(
assignment.Value(time_var))
print(plan_output)
total_distance += distance
total_load += assignment.Value(total_load_var)
total_time += assignment.Value(time_var)
print('Total Distance of all routes: {0}km'.format(total_distance))
print('Total Load of all routes: {}'.format(total_load))
print('Total Time of all routes: {0}min'.format(total_time))
def create_pickup_and_delivery_constraints(manager):
for request in data['pickups_deliveries']:
pickup_index = manager.NodeToIndex(request[0])
delivery_index = manager.NodeToIndex(request[1])
routing.AddPickupAndDelivery(pickup_index, delivery_index)
routing.solver().Add(
routing.VehicleVar(pickup_index) == routing.VehicleVar(
delivery_index))
routing.solver().Add(
distance_dimension.CumulVar(pickup_index) <=
distance_dimension.CumulVar(delivery_index))
data = create_data_model()
manager = pywrapcp.RoutingIndexManager(
data['num_locations'],
data['num_vehicles'],
data['depot']
)
routing = pywrapcp.RoutingModel(manager)
distance_evaluator_index = routing.RegisterTransitCallback(
distance_evaluator
)
add_distance_constraints(routing, data, distance_evaluator_index)
routing.SetArcCostEvaluatorOfAllVehicles(distance_evaluator_index)
# Add Capacity constraint.
# [START capacity_constraint]
# Register demand for different packages sizes
demand_evaluator_index_small = routing.RegisterUnaryTransitCallback(
demand_evaluator_small
)
demand_evaluator_index_large = routing.RegisterUnaryTransitCallback(
demand_evaluator_large
)
demand_evaluator_index_deliveries = routing.RegisterUnaryTransitCallback(
demand_evaluator_deliveries
)
demand_evaluator_index_total = routing.RegisterUnaryTransitCallback(
demand_evaluator_total
)
add_capacity_constraints(
routing,
data,
demand_evaluator_index_small,
demand_evaluator_index_large,
demand_evaluator_index_deliveries,
demand_evaluator_index_total
)
# [END capacity_constraint]
# Add penalties and allow dropping locations with 0 demand
# Not used for now.
# penalty = 1000
# for idx, demand in enumerate(data['demands_total']):
# if idx == 1:
# continue
# else:
# if demand == 0:
# routing.AddDisjunction([manager.NodeToIndex(idx)], 0)
# else:
# routing.AddDisjunction([manager.NodeToIndex(idx)], penalty * demand)
# Add Time Window constraint.
# [START time_constraint]
time_evaluator_index = routing.RegisterTransitCallback(
partial(create_time_evaluator(data), manager))
add_time_window_constraints(routing, manager, data, time_evaluator_index)
# [END time_constraint]
# Add pickups and deliveries constraint.
for request in data['pickups_deliveries']:
distance_dimension = routing.GetDimensionOrDie('Distance')
pickup_index = manager.NodeToIndex(request[0])
delivery_index = manager.NodeToIndex(request[1])
routing.AddPickupAndDelivery(pickup_index, delivery_index)
routing.solver().Add(
routing.VehicleVar(pickup_index) == routing.VehicleVar(
delivery_index))
routing.solver().Add(
distance_dimension.CumulVar(pickup_index) <= distance_dimension.CumulVar(delivery_index))
# Setting first solution heuristic (cheapest addition).
# [START parameters]
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
search_parameters.time_limit.FromSeconds(600)
search_parameters.solution_limit = 100
search_parameters.log_search = True
# [END parameters]
# Solve the problem.
# [START solve]
solution = routing.SolveWithParameters(search_parameters)
# [END solve]
routing.status()
print_solution(manager, routing, solution)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment