Last active
November 14, 2021 12:30
-
-
Save tommyod/5156c4bcae9e51f8dd9bfef5d957b285 to your computer and use it in GitHub Desktop.
Python 3.7 implementation of the Wagelmans algorithm for the dynamic lot size problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
The linear-time Wagelmans algorithm for the dynamic lot size problem | |
-------------------------------------------------------------------- | |
@author: tommyod (https://github.com/tommyod) | |
An implementation of the Wagelmans O(n) algorithm for the dynamic lot | |
size problem, also called the single-item uncapacitated lot-sizing problem. | |
Due to the overhead of the data structures used, this algorithm is actually | |
slightly (~20%) slower than the O(n^2) Wagner-Whitin algorithm on a collection | |
of randomly generated problems. The HW algorithm scales linearily in most cases | |
when the Planning Horizon Theorem is implemented correctly. | |
Speed | |
----- | |
Periods(n) Time | |
10 => 69.3 µs | |
100 => 599 µs | |
1,000 => 6.04 ms | |
10,000 => 59.7 ms | |
100,000 => 631 ms | |
1,000,000 => 6.48 s | |
Correctness | |
----------- | |
The algorithm has been tested against the test case in the Wagner-Whitin paper, | |
and against more naive (and slow) implementations. It gave the same answer as | |
a naive implementation on more than 10 million inputs. | |
For a description of the problem, see Wikipedia: | |
- https://en.wikipedia.org/wiki/Dynamic_lot-size_model | |
The references are: | |
- Harvey M. Wagner and Thomson M. Whitin, | |
"Dynamic version of the economic lot size model," | |
Management Science, Vol. 5, pp. 89–96, 1958 | |
- Albert Wagelmans, Stan Van Hoesel and Antoon Kolen , | |
"Economic Lot Sizing: An O(n log n) Algorithm That Runs | |
in Linear Time in the Wagner-Whitin Case," | |
Operations Research, Vol. 40, Supplement 1: Optimization (1992) | |
- Chapter 7 (Single-Item Uncapacitated Lot-Sizing) in the book | |
"Production Planning by Mixed Integer Programming" | |
by Yves Pochet and Laurence A. Wolsey | |
See also: https://tommyodland.com/articles/2021/the-dynamic-lot-size-problem | |
MIT License (https://choosealicense.com/licenses/mit/) | |
""" | |
# ============================================================================= | |
# MAIN PROGRAM | |
# ============================================================================= | |
from collections import deque | |
from itertools import accumulate, product | |
class CumSumList: | |
"""Fast sums from index i to j, by storing cumulative sums.""" | |
def __init__(self, elements): | |
self.cumsums = [0] * len(elements) | |
partial_sum = 0 | |
for i, element in enumerate(elements): | |
partial_sum += element | |
self.cumsums[i] = partial_sum | |
def sum_between(self, i, j): | |
"""Compute sum: elements[i] + elements[i+1] + ... + elements[j]""" | |
if i > j: | |
return 0 | |
top = self.cumsums[j] if j < len(self.cumsums) else self.cumsums[-1] | |
low = self.cumsums[i - 1] if i >= 1 else 0 | |
return top - low | |
class ConvexEnvelope: | |
"""A class for storing a lower convex envelope of points. It's assume that | |
the x-values of the added points (x, y) are give in non-increasing order. | |
When new points are added, previous points might be removed if they are no | |
long in the lower convex envelope.""" | |
def __init__(self, points=None): | |
self.points = deque() # Use queue for O(1) pop()/append() on both ends | |
self.indices = deque() | |
# Keep track of an index associated with each point | |
self.index_counter = 0 | |
if points: | |
for point in points: | |
self.add(*point) | |
def add(self, x, y): | |
if self.points: | |
x_n, y_n = self.points[-1] | |
assert x <= x_n, "x-values can never increase" | |
# Go through previous points and remove them if necessary | |
for i in reversed(range(1, len(self.points))): | |
x_a, y_a = self.points[-1] | |
x_b, y_b = self.points[-2] | |
# Do not raise ZeroDivisionError when denominator is 0 | |
epsilon = 1e-9 | |
slope_P_A = (y - y_a) / (x - x_a + epsilon) | |
slope_P_B = (y - y_b) / (x - x_b + epsilon) | |
# Remove point A if slope condition, or if P is directly below A | |
if (slope_P_B <= slope_P_A) or ((x_a == x) and (y <= y_a)): | |
self.points.pop() | |
self.indices.pop() | |
else: | |
break | |
self.points.append((x, y)) | |
self.indices.append(self.index_counter) | |
self.index_counter += 1 | |
def optimal_index(effpoints, slope): | |
"""Get the optimal index j for the Wagermans algorithm. The more general | |
version of this algorithm (not Wagner-Whitin cost structure) would use a | |
binary search.""" | |
# Base case - the optimal index must be the only one available | |
if len(effpoints.indices) == 1: | |
return effpoints.indices[0] | |
# Prepare variables for looping through points | |
min_objective_value = float("inf") # Best (minimum) objective value seen | |
prev_index = effpoints.indices[0] # Best known index so far | |
to_remove = 0 # Number of points to remove after looping | |
# The objective value can decrease, then increase, in this loop | |
for (x, y), index in zip((effpoints.points), (effpoints.indices)): | |
objective_value = y + slope * x | |
# The objective value did not increase, keep looping | |
if objective_value <= min_objective_value: | |
min_objective_value = objective_value # Update minimum value seen | |
prev_index = index # Keep track of previous index | |
to_remove += 1 # Remove one more non-optimal point after loop | |
else: | |
# The objective value did increase | |
to_remove -= 1 # Do not improve previous value, it was the minimum | |
break | |
# We remove points that were found before the minimum, | |
# but always keep at least one point | |
for _ in range(min(to_remove, len(effpoints.points) - 1)): | |
effpoints.indices.popleft() | |
effpoints.points.popleft() | |
return prev_index | |
def wagelmans(demands, order_costs, holding_costs): | |
"""Compute the optimal cost and program for the dynamic lot size problem. | |
Parameters | |
---------- | |
demands : list | |
A list of non-negative demands. The final demand should be non-zero, | |
since if it's zero then this last element can be removed from the | |
problem. | |
order_costs : list | |
A list of non-negative order costs. The order cost is independent of | |
the number of products ordered. If products are ordered at all, then | |
the order cost must be paid. It does not matter if we order 1 or 99. | |
holding_costs : list | |
A list of non-negative order costs. The holding cost at index t | |
is the cost (per product) of holding from period t to period t+1. | |
Returns | |
------- | |
dict | |
Dictionary with keys 'cost' (minimal cost) and 'solution' | |
(optimal solution). One problem can have several optimal solutions, | |
but each optimal solution must achieve the minimum cost. | |
""" | |
_validate_inputs(demands, order_costs, holding_costs) | |
d, o, h = demands, order_costs, holding_costs | |
assert d[-1] > 0, "Final demand should be positive" | |
d_cumsum = CumSumList(d) # Use this data structure to compute d_{i, j} | |
p = list(reversed(list(accumulate(reversed(h))))) # Reverse cumulative sum | |
T = len(demands) # Problem size T (number of periods) | |
G = {T: 0} # Intial value for recursion. G[t] = optimal for [t, t+1, ...] | |
effpoints = ConvexEnvelope([(d_cumsum.sum_between(0, T - 1), 0)]) | |
# Used to construct the solution | |
cover_by = {} # cover_by[t] = j => cover period t to j by ordering at t | |
for t in reversed(range(T)): | |
# Get the optimal index (as added to the ConvexEnvelope datastructure) | |
index_effpoints = optimal_index(effpoints, slope=p[t]) | |
j = T - 1 - index_effpoints # Convert index in datastructure to j | |
# Update minimal cost | |
G[t] = o[t] + p[t] * d_cumsum.sum_between(t, j) + G[j + 1] | |
cover_by[t] = j | |
# If d[t] == 0, an option is to not order at time t | |
# If this has a lower cost than ordering (as computed above): | |
if (d[t] == 0) and G[t] > G[t + 1]: | |
G[t] = G[t + 1] | |
cover_by[t] = t | |
# Add the new point (x_t, y_t) = (d_{0, t-1}, G[t]) | |
effpoints.add(d_cumsum.sum_between(0, t - 1), G[t]) | |
# Construct the optimal solution. Order d[t:j+1] at time t to cover | |
# demands in period from t to j (inclusive) | |
t = 0 | |
solution = [0] * T | |
while True: | |
j = cover_by[t] # Get order time in period from t to j | |
solution[t] = sum(d[t : j + 1]) | |
t = j + 1 # Set t (period start) to (period end) plus one | |
if t == T: | |
break # Break out if we're at the end | |
cost = G[0] - sum(h[t] * d_cumsum.sum_between(0, t) for t in range(T)) | |
return {"cost": cost, "solution": solution} | |
# ============================================================================= | |
# TEST CASES | |
# ============================================================================= | |
def test_ConvexEnvelope(): | |
"""Test the ConvexEnvelope datastruture.""" | |
# As points are added, everything except [(9, 2), (0, 0)] falls above the | |
# lower convex envelope. These points above are removed. | |
effpoints = ConvexEnvelope([(9, 2), (7, 3), (6, 2), (4, 1), (2, 1), (0, 0)]) | |
assert list(effpoints.points) == [(9, 2), (0, 0)] | |
assert list(effpoints.indices) == [0, 5] # Original indices of points | |
# Edge case - the point (1, 1) is "between" the two other points | |
effpoints = ConvexEnvelope([(2, 2), (1, 1), (0, 0)]) | |
assert list(effpoints.points) == [(2, 2), (0, 0)] # (1, 1) is removed | |
assert list(effpoints.indices) == [0, 2] | |
# When adding a point with the same x value, but lower y-value, the | |
# previous point is removed from the convex envelope | |
effpoints = ConvexEnvelope([(5, 0), (4, 9), (4, 6)]) | |
assert list(effpoints.points) == [(5, 0), (4, 6)] | |
assert list(effpoints.indices) == [0, 2] | |
def test_against_example_in_paper(): | |
"""Test against the problem in the Wagner-Whitin paper.""" | |
demands = [69, 29, 36, 61, 61, 26, 34, 67, 45, 67, 79, 56] | |
order_costs = [85, 102, 102, 101, 98, 114, 105, 86, 119, 110, 98, 114] | |
holding_costs = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] | |
# Costs and programs copied by hand from the original paper | |
costs = [85, 114, 186, 277, 348, 400, 469, 555, 600, 710, 789, 864] | |
programs = [ | |
[69], | |
[69 + 29, 0], | |
[69 + 29 + 36, 0, 0], | |
[69 + 29, 0, 36 + 61, 0], | |
[69 + 29 + 36, 0, 0, 61 + 61, 0], | |
[69 + 29 + 36, 0, 0, 61 + 61 + 26, 0, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67 + 79, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67, 79 + 56, 0], | |
] | |
for t in range(1, len(demands)): | |
result = wagelmans(demands[:t], order_costs[:t], holding_costs[:t]) | |
assert costs[t - 1] == result["cost"] | |
assert programs[t - 1] == result["solution"] | |
def test_against_handcrafted_example1(): | |
demands = [10, 5, 10, 20] | |
order_costs = [3, 1, 5, 5] | |
holding_costs = [1, 1, 1, 1] | |
result = wagelmans(demands, order_costs, holding_costs) | |
assert result["cost"] == 14 | |
assert result["solution"] == [10, 5, 10, 20] | |
def test_against_handcrafted_example2(): | |
demands = [0, 10] | |
order_costs = [10, 100] | |
holding_costs = [1, 0] | |
result = wagelmans(demands, order_costs, holding_costs) | |
assert result["cost"] == 20 | |
assert result["solution"] == [10, 0] | |
def test_against_handcrafted_example3(): | |
demands = [0, 0, 7] | |
order_costs = [60, 90, 90] | |
holding_costs = [9, 8, 5] | |
result = wagelmans(demands, order_costs, holding_costs) | |
assert result["cost"] == 90 | |
assert result["solution"] == [0, 0, 7] | |
def test_against_handcrafted_example4(): | |
demands = [0, 5, 0, 10, 10] | |
order_costs = [0, 10, 15, 5, 20] | |
holding_costs = [1, 1, 3, 1, 2] | |
result = wagelmans(demands, order_costs, holding_costs) | |
assert result["cost"] == 20 | |
assert result["solution"] == [5, 0, 0, 20, 0] | |
# ============================================================================= | |
# UTILITY FUNCTIONS (might be nice to have) | |
# ============================================================================= | |
def _validate_inputs(demands: list, order_costs: list, holding_costs: list): | |
assert isinstance(demands, list) | |
assert isinstance(order_costs, list) | |
assert isinstance(holding_costs, list) | |
assert len(demands) == len(order_costs) | |
assert len(demands) - 1 <= len(holding_costs) <= len(demands) | |
assert demands[-1] > 0, "Last period must have positive demand" | |
assert all(d >= 0 for d in demands), "All demands must be non-negative" | |
assert all(o >= 0 for o in order_costs), "All order costs must be non-negative" | |
def evaluate(demands, order_costs, holding_costs, solution): | |
assert sum(demands) == sum(solution) | |
assert len(demands) == len(solution) == len(order_costs) == len(holding_costs) | |
cost = 0 | |
in_stock = 0 | |
for t in range(len(demands)): | |
# Update stock and the cost of this order | |
in_stock += solution[t] - demands[t] | |
cost += order_costs[t] if solution[t] > 0 else 0 | |
# Sell products, pay the price of bringing stock to next period | |
cost += in_stock * holding_costs[t] | |
assert in_stock >= 0 | |
return cost | |
def solve_naively(demands, order_costs, holding_costs): | |
"""Compute the best solution in O(n * n^2) time.""" | |
best_solution = None | |
min_cost = float("inf") | |
T = len(demands) | |
for order_at in product([0, 1], repeat=T): # Count in binary | |
# Construct a solution | |
solution, accumulated_demand = T * [0], 0 | |
for t in reversed(range(T)): | |
accumulated_demand += demands[t] | |
if order_at[t] > 0: | |
solution[t] = accumulated_demand | |
accumulated_demand = 0 | |
# If demands at not met, the solutions is not feasible | |
if not sum(demands) == sum(solution): | |
continue | |
# Update best found solution | |
cost = evaluate(demands, order_costs, holding_costs, solution) | |
if cost < min_cost: | |
min_cost = cost | |
best_solution = solution | |
return {"cost": min_cost, "solution": best_solution} | |
# ============================================================================= | |
# RUN TEST CASES (this algo has also been tested against naive implementations) | |
# ============================================================================= | |
if __name__ == "__main__": | |
test_ConvexEnvelope() | |
test_against_example_in_paper() | |
test_against_handcrafted_example1() | |
test_against_handcrafted_example2() | |
test_against_handcrafted_example3() | |
test_against_handcrafted_example4() | |
# Generate random instances and test naive computation | |
import random | |
N = 10 ** 4 | |
for test in range(N): | |
# Generate a random problem | |
length = random.randint(1, 8) | |
demands = [random.randint(0, 5) for _ in range(length)] | |
order_costs = [random.randint(0, 9) for _ in range(length)] | |
holding_costs = [random.randint(0, 5) for _ in range(length)] | |
demands[-1] = random.randint(1, 5) | |
# Get results from both implementations | |
results_naive = solve_naively(demands, order_costs, holding_costs) | |
results = wagelmans(demands, order_costs, holding_costs) | |
# The solution need not be unique, test that both have min cost | |
assert results["cost"] == results_naive["cost"] | |
cost_wagner = evaluate(demands, order_costs, holding_costs, results["solution"]) | |
cost_naive = evaluate(demands, order_costs, holding_costs, results_naive["solution"]) | |
assert cost_wagner == cost_naive | |
print(f"Success: tested 'wagelmans' against naive algorithm on {N} inputs.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment