Skip to content

Instantly share code, notes, and snippets.

@technillogue
Last active January 21, 2024 01:51
Show Gist options
  • Save technillogue/08c79eee52ed538bc3cea66612dbc5bb to your computer and use it in GitHub Desktop.
Save technillogue/08c79eee52ed538bc3cea66612dbc5bb to your computer and use it in GitHub Desktop.
import numpy as np
SIZE_COEFFICIENT = 0.0294 / 1024
N_FILES_COEFFICIENT = 429
sizes: list[int] = [int(size) for size in open("sizes.txt").read().split()]
n_bins = 30
# we determined this by experimenting with actual times
def emperical_model(size: int, n_files: int) -> float:
return size * SIZE_COEFFICIENT + n_files * N_FILES_COEFFICIENT
# how much time would it take to process all the files in a bin?
def heuristic(bin: list[int]) -> float:
return emperical_model(sum(bin), len(bin))
# how long would the slowest bin take?
def calculate_makespan(bins: list[list[int]]) -> float:
return max(map(heuristic, bins))
# convenience function to turn a list of bin assignments
# into a list of bins containing file sizes
def partition_to_bins(binning: list[int]) -> list[list[int]]:
bins = [[] for i in range(n_bins)]
for index, bin in enumerate(binning):
bins[bin].append(sizes[index])
return bins
def improve(bins: list[list[int]]) -> float:
initial_makespan = calculate_makespan(bins)
def step():
# consider the slowest bin
worst_bin = max(bins, key=heuristic)
# go items in the slowest bin biggest to smallest
for item in sorted(worst_bin, reverse=True):
# consider what would happen if we remove that item
worst_bin.remove(item)
# go through other bins, fastest to slowest
for other_bin in sorted(bins, key=heuristic):
if other_bin == worst_bin:
continue
# consider what would happen if we add that item to the other bin
other_bin.append(item)
new_makespan = calculate_makespan(bins)
# is the new makespan better?
if new_makespan < initial_makespan:
return True
# nope, undo the change
other_bin.remove(item)
# if moving the item to any other bin didn't help, put it back
worst_bin.append(item)
return False
# do steps until we can't improve any more
for i in range(100000):
if not step():
break
final_makespan = calculate_makespan(bins)
print(
f"improved with {i} steps. improved makespan by {initial_makespan - final_makespan:.3f}"
)
def batch_calculate_partition_makespan(
sizes: list[int], bin_matrix: list[list[int]]
) -> np.ndarray[np.float32]:
sizes = np.array(sizes)
# shape: [batch_size, num_files]
# the values are the bin to which each file is assigned
bin_matrix = np.array(bin_matrix)
# the biggest bin number is the number of bins
num_bins = np.max(bin_matrix) + 1
# Create a boolean mask for each bin
# shape: [num_bins, batch_size, num_files]
# for each possible partition in the batch,
# the mask is True where the bin is that partition
masks = np.array([(bin_matrix == i) for i in range(num_bins)])
# Use broadcasting to multiply sizes with masks and sum
# along the appropriate axis
# Results in a 2D array where each row corresponds to a bin and each column to a sum in that bin
# same shape as masks, but the type is int
# for each bin, for each batch, for each file,
# the value is the size of that file if it is in that bin, otherwise 0
file_sizes_per_bin = np.array([sizes * mask for mask in masks])
# now we can efficiently get the total size of each bin
total_size_per_bin = file_sizes_per_bin.sum(axis=2)
# since the shape is [num_bins, batch_size], we transpose to get [batch_size, num_bins]
size_per_bin_per_batch = total_size_per_bin.T
# then we can get the max size of each bin (the makespan)
makespan_per_batch = size_per_bin_per_batch.max(axis=1)
return makespan_per_batch
best_partitions = []
best_makespans = []
for i in range(15):
# each row assigns a bin for each file
# each column is a possible partition
random_partitions = np.random.randint(
0, n_bins, [1024 * 64, len(sizes)], dtype=np.int8
)
# what's the makespan ofr each of those partitions?
partition_makespans = batch_calculate_partition_makespan(sizes, random_partitions)
# argsort gives us the indexes of the sorted array
# best_partition_indexes is the indexes of the 10 best partitions
best_partition_indexes = partition_makespans.argsort()[:10]
# numpy lets us index arrays with arrays of indexes
# add the best makespans and partitions to the list
best_makespans.extend(partition_makespans[best_partition_indexes])
best_partitions.extend(random_partitions[best_partition_indexes])
print("done")
attempts = []
for partition in best_partitions:
# convert the partition representation to actual bins
bins = partition_to_bins(partition)
# improve the solution
improve(bins)
# add the improved solution to the list
attempts.append(bins)
# select the fastest solution
final_best = min(attempts, key=calculate_makespan)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment