Skip to content

Instantly share code, notes, and snippets.

@tommyod
Last active November 14, 2021 12:30
Show Gist options
  • Save tommyod/5156c4bcae9e51f8dd9bfef5d957b285 to your computer and use it in GitHub Desktop.
Save tommyod/5156c4bcae9e51f8dd9bfef5d957b285 to your computer and use it in GitHub Desktop.
Python 3.7 implementation of the Wagelmans algorithm for the dynamic lot size problem
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The linear-time Wagelmans algorithm for the dynamic lot size problem
--------------------------------------------------------------------
@author: tommyod (https://github.com/tommyod)
An implementation of the Wagelmans O(n) algorithm for the dynamic lot
size problem, also called the single-item uncapacitated lot-sizing problem.
Due to the overhead of the data structures used, this algorithm is actually
slightly (~20%) slower than the O(n^2) Wagner-Whitin algorithm on a collection
of randomly generated problems. The HW algorithm scales linearily in most cases
when the Planning Horizon Theorem is implemented correctly.
Speed
-----
Periods(n) Time
10 => 69.3 µs
100 => 599 µs
1,000 => 6.04 ms
10,000 => 59.7 ms
100,000 => 631 ms
1,000,000 => 6.48 s
Correctness
-----------
The algorithm has been tested against the test case in the Wagner-Whitin paper,
and against more naive (and slow) implementations. It gave the same answer as
a naive implementation on more than 10 million inputs.
For a description of the problem, see Wikipedia:
- https://en.wikipedia.org/wiki/Dynamic_lot-size_model
The references are:
- Harvey M. Wagner and Thomson M. Whitin,
"Dynamic version of the economic lot size model,"
Management Science, Vol. 5, pp. 89–96, 1958
- Albert Wagelmans, Stan Van Hoesel and Antoon Kolen ,
"Economic Lot Sizing: An O(n log n) Algorithm That Runs
in Linear Time in the Wagner-Whitin Case,"
Operations Research, Vol. 40, Supplement 1: Optimization (1992)
- Chapter 7 (Single-Item Uncapacitated Lot-Sizing) in the book
"Production Planning by Mixed Integer Programming"
by Yves Pochet and Laurence A. Wolsey
See also: https://tommyodland.com/articles/2021/the-dynamic-lot-size-problem
MIT License (https://choosealicense.com/licenses/mit/)
"""
# =============================================================================
# MAIN PROGRAM
# =============================================================================
from collections import deque
from itertools import accumulate, product
class CumSumList:
"""Fast sums from index i to j, by storing cumulative sums."""
def __init__(self, elements):
self.cumsums = [0] * len(elements)
partial_sum = 0
for i, element in enumerate(elements):
partial_sum += element
self.cumsums[i] = partial_sum
def sum_between(self, i, j):
"""Compute sum: elements[i] + elements[i+1] + ... + elements[j]"""
if i > j:
return 0
top = self.cumsums[j] if j < len(self.cumsums) else self.cumsums[-1]
low = self.cumsums[i - 1] if i >= 1 else 0
return top - low
class ConvexEnvelope:
"""A class for storing a lower convex envelope of points. It's assume that
the x-values of the added points (x, y) are give in non-increasing order.
When new points are added, previous points might be removed if they are no
long in the lower convex envelope."""
def __init__(self, points=None):
self.points = deque() # Use queue for O(1) pop()/append() on both ends
self.indices = deque()
# Keep track of an index associated with each point
self.index_counter = 0
if points:
for point in points:
self.add(*point)
def add(self, x, y):
if self.points:
x_n, y_n = self.points[-1]
assert x <= x_n, "x-values can never increase"
# Go through previous points and remove them if necessary
for i in reversed(range(1, len(self.points))):
x_a, y_a = self.points[-1]
x_b, y_b = self.points[-2]
# Do not raise ZeroDivisionError when denominator is 0
epsilon = 1e-9
slope_P_A = (y - y_a) / (x - x_a + epsilon)
slope_P_B = (y - y_b) / (x - x_b + epsilon)
# Remove point A if slope condition, or if P is directly below A
if (slope_P_B <= slope_P_A) or ((x_a == x) and (y <= y_a)):
self.points.pop()
self.indices.pop()
else:
break
self.points.append((x, y))
self.indices.append(self.index_counter)
self.index_counter += 1
def optimal_index(effpoints, slope):
"""Get the optimal index j for the Wagermans algorithm. The more general
version of this algorithm (not Wagner-Whitin cost structure) would use a
binary search."""
# Base case - the optimal index must be the only one available
if len(effpoints.indices) == 1:
return effpoints.indices[0]
# Prepare variables for looping through points
min_objective_value = float("inf") # Best (minimum) objective value seen
prev_index = effpoints.indices[0] # Best known index so far
to_remove = 0 # Number of points to remove after looping
# The objective value can decrease, then increase, in this loop
for (x, y), index in zip((effpoints.points), (effpoints.indices)):
objective_value = y + slope * x
# The objective value did not increase, keep looping
if objective_value <= min_objective_value:
min_objective_value = objective_value # Update minimum value seen
prev_index = index # Keep track of previous index
to_remove += 1 # Remove one more non-optimal point after loop
else:
# The objective value did increase
to_remove -= 1 # Do not improve previous value, it was the minimum
break
# We remove points that were found before the minimum,
# but always keep at least one point
for _ in range(min(to_remove, len(effpoints.points) - 1)):
effpoints.indices.popleft()
effpoints.points.popleft()
return prev_index
def wagelmans(demands, order_costs, holding_costs):
"""Compute the optimal cost and program for the dynamic lot size problem.
Parameters
----------
demands : list
A list of non-negative demands. The final demand should be non-zero,
since if it's zero then this last element can be removed from the
problem.
order_costs : list
A list of non-negative order costs. The order cost is independent of
the number of products ordered. If products are ordered at all, then
the order cost must be paid. It does not matter if we order 1 or 99.
holding_costs : list
A list of non-negative order costs. The holding cost at index t
is the cost (per product) of holding from period t to period t+1.
Returns
-------
dict
Dictionary with keys 'cost' (minimal cost) and 'solution'
(optimal solution). One problem can have several optimal solutions,
but each optimal solution must achieve the minimum cost.
"""
_validate_inputs(demands, order_costs, holding_costs)
d, o, h = demands, order_costs, holding_costs
assert d[-1] > 0, "Final demand should be positive"
d_cumsum = CumSumList(d) # Use this data structure to compute d_{i, j}
p = list(reversed(list(accumulate(reversed(h))))) # Reverse cumulative sum
T = len(demands) # Problem size T (number of periods)
G = {T: 0} # Intial value for recursion. G[t] = optimal for [t, t+1, ...]
effpoints = ConvexEnvelope([(d_cumsum.sum_between(0, T - 1), 0)])
# Used to construct the solution
cover_by = {} # cover_by[t] = j => cover period t to j by ordering at t
for t in reversed(range(T)):
# Get the optimal index (as added to the ConvexEnvelope datastructure)
index_effpoints = optimal_index(effpoints, slope=p[t])
j = T - 1 - index_effpoints # Convert index in datastructure to j
# Update minimal cost
G[t] = o[t] + p[t] * d_cumsum.sum_between(t, j) + G[j + 1]
cover_by[t] = j
# If d[t] == 0, an option is to not order at time t
# If this has a lower cost than ordering (as computed above):
if (d[t] == 0) and G[t] > G[t + 1]:
G[t] = G[t + 1]
cover_by[t] = t
# Add the new point (x_t, y_t) = (d_{0, t-1}, G[t])
effpoints.add(d_cumsum.sum_between(0, t - 1), G[t])
# Construct the optimal solution. Order d[t:j+1] at time t to cover
# demands in period from t to j (inclusive)
t = 0
solution = [0] * T
while True:
j = cover_by[t] # Get order time in period from t to j
solution[t] = sum(d[t : j + 1])
t = j + 1 # Set t (period start) to (period end) plus one
if t == T:
break # Break out if we're at the end
cost = G[0] - sum(h[t] * d_cumsum.sum_between(0, t) for t in range(T))
return {"cost": cost, "solution": solution}
# =============================================================================
# TEST CASES
# =============================================================================
def test_ConvexEnvelope():
"""Test the ConvexEnvelope datastruture."""
# As points are added, everything except [(9, 2), (0, 0)] falls above the
# lower convex envelope. These points above are removed.
effpoints = ConvexEnvelope([(9, 2), (7, 3), (6, 2), (4, 1), (2, 1), (0, 0)])
assert list(effpoints.points) == [(9, 2), (0, 0)]
assert list(effpoints.indices) == [0, 5] # Original indices of points
# Edge case - the point (1, 1) is "between" the two other points
effpoints = ConvexEnvelope([(2, 2), (1, 1), (0, 0)])
assert list(effpoints.points) == [(2, 2), (0, 0)] # (1, 1) is removed
assert list(effpoints.indices) == [0, 2]
# When adding a point with the same x value, but lower y-value, the
# previous point is removed from the convex envelope
effpoints = ConvexEnvelope([(5, 0), (4, 9), (4, 6)])
assert list(effpoints.points) == [(5, 0), (4, 6)]
assert list(effpoints.indices) == [0, 2]
def test_against_example_in_paper():
"""Test against the problem in the Wagner-Whitin paper."""
demands = [69, 29, 36, 61, 61, 26, 34, 67, 45, 67, 79, 56]
order_costs = [85, 102, 102, 101, 98, 114, 105, 86, 119, 110, 98, 114]
holding_costs = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# Costs and programs copied by hand from the original paper
costs = [85, 114, 186, 277, 348, 400, 469, 555, 600, 710, 789, 864]
programs = [
[69],
[69 + 29, 0],
[69 + 29 + 36, 0, 0],
[69 + 29, 0, 36 + 61, 0],
[69 + 29 + 36, 0, 0, 61 + 61, 0],
[69 + 29 + 36, 0, 0, 61 + 61 + 26, 0, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67 + 79, 0],
[69 + 29, 0, 36 + 61, 0, 61 + 26 + 34, 0, 0, 67 + 45, 0, 67, 79 + 56, 0],
]
for t in range(1, len(demands)):
result = wagelmans(demands[:t], order_costs[:t], holding_costs[:t])
assert costs[t - 1] == result["cost"]
assert programs[t - 1] == result["solution"]
def test_against_handcrafted_example1():
demands = [10, 5, 10, 20]
order_costs = [3, 1, 5, 5]
holding_costs = [1, 1, 1, 1]
result = wagelmans(demands, order_costs, holding_costs)
assert result["cost"] == 14
assert result["solution"] == [10, 5, 10, 20]
def test_against_handcrafted_example2():
demands = [0, 10]
order_costs = [10, 100]
holding_costs = [1, 0]
result = wagelmans(demands, order_costs, holding_costs)
assert result["cost"] == 20
assert result["solution"] == [10, 0]
def test_against_handcrafted_example3():
demands = [0, 0, 7]
order_costs = [60, 90, 90]
holding_costs = [9, 8, 5]
result = wagelmans(demands, order_costs, holding_costs)
assert result["cost"] == 90
assert result["solution"] == [0, 0, 7]
def test_against_handcrafted_example4():
demands = [0, 5, 0, 10, 10]
order_costs = [0, 10, 15, 5, 20]
holding_costs = [1, 1, 3, 1, 2]
result = wagelmans(demands, order_costs, holding_costs)
assert result["cost"] == 20
assert result["solution"] == [5, 0, 0, 20, 0]
# =============================================================================
# UTILITY FUNCTIONS (might be nice to have)
# =============================================================================
def _validate_inputs(demands: list, order_costs: list, holding_costs: list):
assert isinstance(demands, list)
assert isinstance(order_costs, list)
assert isinstance(holding_costs, list)
assert len(demands) == len(order_costs)
assert len(demands) - 1 <= len(holding_costs) <= len(demands)
assert demands[-1] > 0, "Last period must have positive demand"
assert all(d >= 0 for d in demands), "All demands must be non-negative"
assert all(o >= 0 for o in order_costs), "All order costs must be non-negative"
def evaluate(demands, order_costs, holding_costs, solution):
assert sum(demands) == sum(solution)
assert len(demands) == len(solution) == len(order_costs) == len(holding_costs)
cost = 0
in_stock = 0
for t in range(len(demands)):
# Update stock and the cost of this order
in_stock += solution[t] - demands[t]
cost += order_costs[t] if solution[t] > 0 else 0
# Sell products, pay the price of bringing stock to next period
cost += in_stock * holding_costs[t]
assert in_stock >= 0
return cost
def solve_naively(demands, order_costs, holding_costs):
"""Compute the best solution in O(n * n^2) time."""
best_solution = None
min_cost = float("inf")
T = len(demands)
for order_at in product([0, 1], repeat=T): # Count in binary
# Construct a solution
solution, accumulated_demand = T * [0], 0
for t in reversed(range(T)):
accumulated_demand += demands[t]
if order_at[t] > 0:
solution[t] = accumulated_demand
accumulated_demand = 0
# If demands at not met, the solutions is not feasible
if not sum(demands) == sum(solution):
continue
# Update best found solution
cost = evaluate(demands, order_costs, holding_costs, solution)
if cost < min_cost:
min_cost = cost
best_solution = solution
return {"cost": min_cost, "solution": best_solution}
# =============================================================================
# RUN TEST CASES (this algo has also been tested against naive implementations)
# =============================================================================
if __name__ == "__main__":
test_ConvexEnvelope()
test_against_example_in_paper()
test_against_handcrafted_example1()
test_against_handcrafted_example2()
test_against_handcrafted_example3()
test_against_handcrafted_example4()
# Generate random instances and test naive computation
import random
N = 10 ** 4
for test in range(N):
# Generate a random problem
length = random.randint(1, 8)
demands = [random.randint(0, 5) for _ in range(length)]
order_costs = [random.randint(0, 9) for _ in range(length)]
holding_costs = [random.randint(0, 5) for _ in range(length)]
demands[-1] = random.randint(1, 5)
# Get results from both implementations
results_naive = solve_naively(demands, order_costs, holding_costs)
results = wagelmans(demands, order_costs, holding_costs)
# The solution need not be unique, test that both have min cost
assert results["cost"] == results_naive["cost"]
cost_wagner = evaluate(demands, order_costs, holding_costs, results["solution"])
cost_naive = evaluate(demands, order_costs, holding_costs, results_naive["solution"])
assert cost_wagner == cost_naive
print(f"Success: tested 'wagelmans' against naive algorithm on {N} inputs.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment