Last active
March 1, 2022 12:27
-
-
Save tommyod/7d3ee88b7c08fadab6de1ea1e615a925 to your computer and use it in GitHub Desktop.
Python 3.7 implementation of the Wagner-Whitin algorithm for the dynamic lot size problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
The Wagner-Whitin algorithm for the dynamic lot size problem | |
-------------------------------------------------------------- | |
@author: tommyod (https://github.com/tommyod) | |
An implementation of the Wagner-Whitin O(n^2) algorithm for the dynamic lot | |
size problem, also called the single-item uncapacitated lot-sizing problem. | |
Although the algorithm is O(n^2) in theory, it scales linearily with input | |
size in practise, as seen in the table below. The linear scaling is due to | |
using the Planning Horizon Theorem in the implementation. | |
Speed | |
----- | |
Periods(n) Time | |
10 => 45.4 µs | |
100 => 440 µs | |
1,000 => 4.53 ms | |
10,000 => 47.5 ms | |
100,000 => 495 ms | |
1,000,000 => 4.74 s | |
Correctness | |
----------- | |
The algorithm has been tested against the test case in the paper, and against | |
more naive (and slow) implementations. It gave the same answer as a naive | |
implementation on more than 10 million inputs. | |
For a description of the problem, see Wikipedia: | |
- https://en.wikipedia.org/wiki/Dynamic_lot-size_model | |
The reference is: | |
- Harvey M. Wagner and Thomson M. Whitin, | |
"Dynamic version of the economic lot size model," | |
Management Science, Vol. 5, pp. 89–96, 1958 | |
See also: https://tommyodland.com/articles/2021/the-dynamic-lot-size-problem | |
MIT License (https://choosealicense.com/licenses/mit/) | |
""" | |
# ============================================================================= | |
# MAIN PROGRAM | |
# ============================================================================= | |
import itertools | |
class CumSumList: | |
"""Fast sums from index i to j, by storing cumulative sums.""" | |
def __init__(self, elements): | |
self.cumsums = [0] * len(elements) | |
partial_sum = 0 | |
for i, element in enumerate(elements): | |
partial_sum += element | |
self.cumsums[i] = partial_sum | |
def sum_between(self, i, j): | |
"""Compute sum: elements[i] + elements[i+1] + ... + elements[j]""" | |
if i > j: | |
return 0 | |
top = self.cumsums[j] if j < len(self.cumsums) else self.cumsums[-1] | |
low = self.cumsums[i - 1] if i >= 1 else 0 | |
return top - low | |
def wagner_whitin(demands, order_costs, holding_costs): | |
"""Compute the optimal cost and program for the dynamic lot size problem. | |
Parameters | |
---------- | |
demands : list | |
A list of non-negative demands. The final demand should be non-zero, | |
since if it's zero then this last element can be removed from the | |
problem. | |
order_costs : list | |
A list of non-negative order costs. The order cost is independent of | |
the number of products ordered. If products are ordered at all, then | |
the order cost must be paid. It does not matter if we order 1 or 99. | |
holding_costs : list | |
A list of non-negative order costs. The holding cost at index t | |
is the cost (per product) of holding from period t to period t+1. | |
Returns | |
------- | |
dict | |
Dictionary with keys 'cost' (minimal cost) and 'solution' | |
(optimal solution). One problem can have several optimal solutions, | |
but each optimal solution must achieve the minimum cost. | |
""" | |
d, o, h = demands, order_costs, holding_costs | |
_validate_inputs(demands, order_costs, holding_costs) | |
d_cumsum = CumSumList(d) | |
assert d[-1] > 0, "Final demand should be positive" | |
T = len(d) # Problem size | |
F = {-1: 0} # Base case for recursion. F[t] = min cost from period 0 to t | |
t_star_star = 0 # Highest period where minimum was achieved | |
# Used to construct the solution | |
cover_by = {} # cover_by[t] = j => cover period j to t by ordering at j | |
# Main forward recursion loop. At time t, choose a period j to order from | |
# to cover from period j to period t (inclusive on both ends) | |
for t in range(len(demands)): | |
# If there is no demand in period t | |
if d[t] == 0: | |
F[t] = F[t - 1] | |
cover_by[t] = t | |
continue | |
# There is demand in period t | |
assert d[t] > 0 | |
S_t = 0 # Partial sum S_t(j) := h_j * d_{j+1, t} + S_t(j+1) | |
min_args = [] # List of arguments for the min() function | |
for j in reversed(range(t_star_star, t + 1)): | |
S_t += h[j] * d_cumsum.sum_between(j + 1, t) | |
min_args.append(o[j] + S_t + F[j - 1]) | |
# Index of minimum value in list `min_args` | |
argmin = min_args.index(min(min_args)) | |
# Update the t** variable - using the Planning Horizon Theorem | |
t_star_star = max(t_star_star, t - argmin) | |
# Update the variables for cost and program | |
F[t] = min_args[argmin] | |
cover_by[t] = t - argmin | |
# Construct the optimal solution. Order d[j:t+1] at time j to cover | |
# demands in period from j to t (inclusive) | |
t = T - 1 | |
solution = [0] * T | |
while True: | |
j = cover_by[t] # Get order time in period from j to t | |
solution[j] = sum(d[j : t + 1]) | |
t = j - 1 # Set t (period end) to (period start) minus one | |
if j == 0: | |
break # Break out if this period started at 0 | |
return {"cost": F[len(demands) - 1], "solution": solution} | |
# ============================================================================= | |
# TEST CASES | |
# ============================================================================= | |
def test_against_example_in_paper(): | |
"""Test against the problem in the Wagner-Whitin paper.""" | |
demands = [69, 29, 36, 61, 61, 26, 34, 67, 45, 67, 79, 56] | |
order_costs = [85, 102, 102, 101, 98, 114, 105, 86, 119, 110, 98, 114] | |
holding_costs = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] | |
# Costs and programs copied by hand from the original paper | |
costs = [85, 114, 186, 277, 348, 400, 469, 555, 600, 710, 789, 864] | |
programs = [ | |
[69], | |
[69 + 29, 0], | |
[69 + 29 + 36, 0, 0], | |
[69 + 29, 0, 36 + 61, 0], | |
[69 + 29 + 36, 0, 0, 61 + 61, 0], | |
[69 + 29 + 36, 0, 0, 61 + 61 + 26, 0, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67 + 79, 0], | |
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67, 79 + 56, 0], | |
] | |
for t in range(1, len(demands)): | |
result = wagner_whitin(demands[:t], order_costs[:t], holding_costs[:t]) | |
assert costs[t - 1] == result["cost"] | |
assert programs[t - 1] == result["solution"] | |
def test_against_handcrafted_example1(): | |
demands = [10, 5, 10, 20] | |
order_costs = [3, 1, 5, 5] | |
holding_costs = [1, 1, 1, 1] | |
result = wagner_whitin(demands, order_costs, holding_costs) | |
assert result["cost"] == 14 | |
assert result["solution"] == [10, 5, 10, 20] | |
def test_against_handcrafted_example2(): | |
demands = [0, 10] | |
order_costs = [10, 100] | |
holding_costs = [1, 0] | |
result = wagner_whitin(demands, order_costs, holding_costs) | |
assert result["cost"] == 20 | |
assert result["solution"] == [10, 0] | |
def test_against_handcrafted_example3(): | |
demands = [0, 0, 7] | |
order_costs = [60, 90, 90] | |
holding_costs = [9, 8, 5] | |
result = wagner_whitin(demands, order_costs, holding_costs) | |
assert result["cost"] == 90 | |
assert result["solution"] == [0, 0, 7] | |
def test_against_handcrafted_example4(): | |
demands = [0, 5, 0, 10, 10] | |
order_costs = [0, 10, 15, 5, 20] | |
holding_costs = [1, 1, 3, 1, 2] | |
result = wagner_whitin(demands, order_costs, holding_costs) | |
assert result["cost"] == 20 | |
assert result["solution"] == [5, 0, 0, 20, 0] | |
# ============================================================================= | |
# UTILITY FUNCTIONS (might be nice to have) | |
# ============================================================================= | |
def _validate_inputs(demands: list, order_costs: list, holding_costs: list): | |
assert isinstance(demands, list) | |
assert isinstance(order_costs, list) | |
assert isinstance(holding_costs, list) | |
assert len(demands) == len(order_costs) | |
assert len(demands) - 1 <= len(holding_costs) <= len(demands) | |
assert demands[-1] > 0, "Last period must have positive demand" | |
assert all(d >= 0 for d in demands), "All demands must be non-negative" | |
assert all(o >= 0 for o in order_costs), "All order costs must be non-negative" | |
def evaluate(demands, order_costs, holding_costs, solution): | |
assert sum(demands) == sum(solution) | |
assert len(demands) == len(solution) == len(order_costs) == len(holding_costs) | |
cost = 0 | |
in_stock = 0 | |
for t in range(len(demands)): | |
# Update stock and the cost of this order | |
in_stock += solution[t] - demands[t] | |
cost += order_costs[t] if solution[t] > 0 else 0 | |
# Sell products, pay the price of bringing stock to next period | |
cost += in_stock * holding_costs[t] | |
assert in_stock >= 0 | |
return cost | |
def solve_naively(demands, order_costs, holding_costs): | |
"""Compute the best solution in O(n * n^2) time.""" | |
best_solution = None | |
min_cost = float("inf") | |
T = len(demands) | |
for order_at in itertools.product([0, 1], repeat=T): # Count in binary | |
# Construct a solution | |
solution, accumulated_demand = T * [0], 0 | |
for t in reversed(range(T)): | |
accumulated_demand += demands[t] | |
if order_at[t] > 0: | |
solution[t] = accumulated_demand | |
accumulated_demand = 0 | |
# If demands at not met, the solutions is not feasible | |
if not sum(demands) == sum(solution): | |
continue | |
# Update best found solution | |
cost = evaluate(demands, order_costs, holding_costs, solution) | |
if cost < min_cost: | |
min_cost = cost | |
best_solution = solution | |
return {"cost": min_cost, "solution": best_solution} | |
# ============================================================================= | |
# RUN TEST CASES (this algo has also been tested against naive implementations) | |
# ============================================================================= | |
if __name__ == "__main__": | |
test_against_example_in_paper() | |
test_against_handcrafted_example1() | |
test_against_handcrafted_example2() | |
test_against_handcrafted_example3() | |
test_against_handcrafted_example4() | |
# Generate random instances and test naive computation | |
import random | |
N = 10 ** 4 | |
for test in range(N): | |
# Generate a random problem | |
length = random.randint(1, 8) | |
demands = [random.randint(0, 5) for _ in range(length)] | |
order_costs = [random.randint(0, 9) for _ in range(length)] | |
holding_costs = [random.randint(0, 5) for _ in range(length)] | |
demands[-1] = random.randint(1, 5) | |
# Get results from both implementations | |
results_naive = solve_naively(demands, order_costs, holding_costs) | |
results = wagner_whitin(demands, order_costs, holding_costs) | |
# The solution need not be unique, test that both have min cost | |
assert results["cost"] == results_naive["cost"] | |
cost_wagner = evaluate(demands, order_costs, holding_costs, results["solution"]) | |
cost_naive = evaluate(demands, order_costs, holding_costs, results_naive["solution"]) | |
assert cost_wagner == cost_naive | |
print(f"Success: tested 'wagner_whitin' against naive algorithm on {N} inputs.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment