Created
May 6, 2021 16:47
Set partitioning problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Integer program for set partitioning | |
""" | |
# %% | |
import cplex | |
import random | |
from typing import List, Tuple | |
import string | |
X = list(string.ascii_lowercase) | |
eps = 10e-6 | |
# Cost function | |
def cost(k: List) -> float: return 1.0 | |
# label for a set | |
def func(x) -> str: return "".join(x) | |
# Reduced cost calculation | |
def rc(e, pi, X): return cost(e) + sum([p for p, x in zip(pi, X) if x in e]) | |
def master(X: List, K: List[List]) -> cplex.Cplex: | |
""" Restricted master problem """ | |
prob = cplex.Cplex() | |
numvar = len(K) | |
names = list(map(func, K)) | |
var_type = [prob.variables.type.continuous] * numvar | |
prob.variables.add(names=names, | |
lb=[0.0] * numvar, | |
ub=[1.0] * numvar, | |
types=var_type) | |
prob.objective.set_sense(prob.objective.sense.minimize) | |
prob.objective.set_linear([(n, cost(kj)) for n, kj in zip(names, K)]) | |
lhs = [] | |
for i in X: | |
coeffs = [1.0 if i in kj else 0.0 for kj in K] | |
lhs.append(cplex.SparsePair(names, coeffs)) | |
prob.linear_constraints.add(lin_expr=lhs, | |
rhs=[1.0] * len(lhs), | |
senses=['E'] * len(lhs), | |
names=X) | |
prob.set_problem_type(cplex.Cplex.problem_type.LP) | |
print(f"{len(lhs)} constraints and {len(names)} variables.") | |
return prob | |
def pricing(X: List, pi: List, ncols: int=5) -> Tuple[List, float]: | |
""" Heuristic to generate new partitions """ | |
# ordered set of elements that have positive dual variables | |
K_new = [x for p, x in sorted(zip(pi, X), reverse=True) if p < 0.0] | |
# reduced cost of new partition | |
rc_new = rc(K_new, pi, X) | |
return sorted(K_new), rc_new | |
# Initial partitions to start with | |
K = [X[i:i + 10] for i in range(0, len(X), 10)] | |
p = master(X, K) | |
_ = p.set_log_stream(None) | |
_ = p.set_results_stream(None) | |
# %% The main column generation procedure | |
i, maxiter = 0, 1000 | |
while (i< maxiter): | |
p.solve() | |
pi = [-p for p in p.solution.get_dual_values()] | |
# Generate new partitions/columns | |
K_new, rc_new = pricing(X, pi) | |
# generate new column | |
if rc_new <= -eps: | |
# add the columns to the problem (if they don't exist already) | |
names = p.variables.get_names() | |
newvar = func(K_new) | |
if newvar not in names: | |
print(f"Iteration {i}: Adding '{''.join(K_new)}' column with rc: {rc_new}.") | |
p.variables.add(names=[newvar], | |
lb=[0.0], ub=[1.0], | |
types=[p.variables.type.continuous]) | |
p.objective.set_linear(newvar, cost(K_new)) | |
for c in K_new: | |
p.linear_constraints.set_coefficients(str(c), newvar, 1.0) | |
p.set_problem_type(cplex.Cplex.problem_type.LP) | |
i+=1 | |
else: | |
print("No improvement partition found - Terminating column generation.") | |
break | |
# Do the final (integer) solve | |
names = p.variables.get_names() | |
p.variables.set_types(list(zip(names, [p.variables.type.binary] * len(names)))) | |
#p.write("test.lp") | |
p.solve() | |
if p.solution.get_status() in [101, 102, 105, 107, 111, 113]: | |
x_opt = p.solution.get_values() | |
print(f"Optimal value: {p.solution.get_objective_value()}") | |
for vr, val in zip(names, x_opt): | |
if val >=1-eps: | |
s = list(vr) | |
print(f"Partition: '{''.join(s)}' with cost: {cost(s)}.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment