Skip to content

Instantly share code, notes, and snippets.

@sharmaeklavya2
Last active February 19, 2024 23:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharmaeklavya2/4e4c52565b2d7aeceb074e870e6ad4aa to your computer and use it in GitHub Desktop.
Save sharmaeklavya2/4e4c52565b2d7aeceb074e870e6ad4aa to your computer and use it in GitHub Desktop.
Use code2dtree to find the tight approximation factor for the LPT algorithm for makespan minimization
#!/usr/bin/env python3
"""
Uses code2dtree (https://github.com/sharmaeklavya2/code2dtree) to find
the tight approximation factor of the LPT algorithm for makespan minimization
for a given number of jobs and machines.
Specifically, for every possible scenario, we find out the maximum load
a machine in the LPT schedule can have if the optimal makespan is at most 1.
We only consider minimal hard examples, i.e., the last good was assigned to the least loaded machine.
This is because otherwise we can remove the jobs appearing after the max loaded machine's last job.
Then the makespan of LPT remains the same but the optimal makespan may reduce.
"""
import argparse
from collections.abc import Iterable, Mapping, Sequence
import sys
import itertools
import random
from typing import Optional
from code2dtree import Var, Expr, func2dtree, getVarList
from code2dtree.node import LeafNode, ReturnNode
from code2dtree.interval import Interval
from code2dtree.linExpr import LinConstrTreeExplorer
from code2dtree.linProg import LinProg, LpStatus
from code2dtree.types import Real
ISeqSeq = Sequence[Sequence[int]]
try:
from float2frac import farey
# float2frac.py: https://gist.github.com/sharmaeklavya2/87af6715f783d9e440df5dd410dff6fc
except ImportError:
def farey(x: object, N: int = 0, eps: float = 0) -> object:
return x
def argmin(x: Sequence[Real | Expr]) -> int:
"""Largest index of minimum value in non-empty list."""
n = len(x)
minIndex, minValue = 0, x[0]
for i in range(1, n):
if x[i] <= minValue:
minIndex, minValue = i, x[i]
return minIndex
def argmax(x: Sequence[Real | Expr]) -> int:
"""Smallest index of maximum value in non-empty list."""
n = len(x)
maxIndex, maxValue = 0, x[0]
for i in range(1, n):
if x[i] > maxValue:
maxIndex, maxValue = i, x[i]
return maxIndex
def greedy(x: Iterable[Real | Expr], m: int) -> tuple[Sequence[int], int]:
"""Jobs have sizes x. There are m machines."""
assn = []
loads = []
jobGen = iter(x)
for j, xj in zip(range(m), jobGen):
assn.append(j)
loads.append(xj)
for j, xj in enumerate(jobGen, m):
i = argmin(loads)
assn.append(i)
loads[i] += xj
return (assn, argmax(loads))
def check1(leaf: LeafNode, x: Sequence[Var]) -> bool:
assert isinstance(leaf, ReturnNode)
assn: Sequence[int] = leaf.expr[0] # type: ignore[index]
maxLoaded: int = leaf.expr[1] # type: ignore[index]
return maxLoaded == assn[-1]
def check2(leaf: LeafNode, x: Sequence[Var], m: int) -> bool:
assn: Sequence[int] = leaf.expr[0] # type: ignore[index]
maxLoaded: int = leaf.expr[1] # type: ignore[index]
maxLoad = sum([xj for j, xj in enumerate(x) if assn[j] == maxLoaded])
lp = LinProg('max', x, maxLoad, Interval(0, 1, True, True))
lp.addDoubleConstraints(leaf.explorerOutput) # type: ignore[arg-type]
leaf.userData['lp'] = lp
lp = lp.copy()
lp.addConstraintExpr(sum(x) <= m)
solution = lp.solve(tol=0.0)
if solution.status == LpStatus.success:
leaf.userData['mkspLb'] = 1
leaf.userData['mkspUb'] = solution.optVal
return True
elif solution.status == LpStatus.infeasible:
return False
else:
raise ValueError(f'unknown error while solving LP: {solution}')
def genPartitions(n: int, m: int) -> Iterable[ISeqSeq]:
partial: list[list[int]] = [[] for i in range(m)]
partial[0].append(0)
def f(first_remaining: int) -> Iterable[ISeqSeq]:
if first_remaining == n:
yield [x.copy() for x in partial]
else:
for i in range(m):
if i == 0 or len(partial[i-1]) > 0:
partial[i].append(first_remaining)
yield from f(first_remaining+1)
partial[i].pop()
yield from f(1)
def getLb(leaf: LeafNode, M: ISeqSeq, x: Sequence[Var]) -> tuple[Optional[float], Optional[Sequence[float]]]:
# assn: Sequence[int] = leaf.expr[0] # type: ignore[index]
# print(f'getLb(assn={assn}, M={M})')
lp = leaf.userData['lp'].copy()
for bundle in M:
load = sum([x[j] for j in bundle])
lp.addConstraintExpr(load <= 1)
solution = lp.solve(tol=0.0)
if solution.status == LpStatus.success:
# print('\tsuccess:', solution.optVal, solution.optSol)
leaf.userData['mkspLb'] = min(leaf.userData['mkspLb'], solution.optVal)
return (solution.optVal, solution.optSol)
elif solution.status != LpStatus.infeasible:
raise ValueError(f'unknown error while solving LP: {solution}')
else:
# print('\tinfeas')
return (None, None)
def printProgress(it: int, totalIt: int, d: Mapping[object, object]) -> None:
dStr = ' '.join('{}={}'.format(k, v) for k, v in d.items())
print('[{}/{} = {}%] {}'.format(it, totalIt, int(it * 100 / totalIt), dStr),
file=sys.stderr, end='\r')
def showTightApprox(n: int, m: int) -> None:
# get decision tree
x = getVarList('x', n, 'small')
te = LinConstrTreeExplorer([x[i-1] >= x[i] for i in range(1, n)])
dtree = func2dtree(greedy, (x, m), te)
leaves = list(dtree.getLeaves())
print('leaves:', len(leaves))
leaves = [leaf for leaf in leaves if check1(leaf, x) and check2(leaf, x, m)]
print('leaves after check2:', len(leaves))
partitions = list(genPartitions(n=n, m=m))
print('partitions:', len(partitions))
bestLb: float = 1.0
worstSizes: Optional[Sequence[float]] = None
worstAssn: Optional[Sequence[int]] = None
worstOpt: Optional[ISeqSeq] = None
leavesPartitions = list(itertools.product(leaves, partitions))
random.shuffle(leavesPartitions)
lpsRun = 0
try:
for i, (leaf, M) in enumerate(leavesPartitions):
if i % 200 == 0:
printProgress(i, len(leavesPartitions), {'lpsRun': lpsRun, 'score': bestLb})
if leaf.userData['mkspUb'] > bestLb:
assn: Sequence[int] = leaf.expr[0] # type: ignore[index]
lb, sizes = getLb(leaf, M, x)
lpsRun += 1
if lb is not None and lb > bestLb:
bestLb, worstSizes, worstAssn, worstOpt = lb, sizes, assn, M
printProgress(len(leavesPartitions), len(leavesPartitions), {'lpsRun': lpsRun, 'score': bestLb})
print(file=sys.stderr)
except KeyboardInterrupt:
printProgress(i, len(leavesPartitions), {'lpsRun': lpsRun, 'score': bestLb})
print('\nEnding optimization prematurely', file=sys.stderr)
print()
print('bestFactor: {} = {}'.format(farey(bestLb), bestLb))
print('worstSizes:', farey(worstSizes))
print('worstAssn:', worstAssn)
print('worstOpt:', worstOpt)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-n', type=int, required=True, help='number of jobs')
parser.add_argument('-m', type=int, required=True, help='number of machines')
parser.add_argument('--seed', default='0', help='seed for random number generator')
args = parser.parse_args()
print(f'n={args.n} jobs, m={args.m} machines')
random.seed(args.seed)
showTightApprox(n=args.n, m=args.m)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment