Created May 9, 2024 22:12
from pathlib import Path
from collections import defaultdict
import matplotlib.axes
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.animation as animation
# number of ticks to simulate per frame (1 is real speed, 2 is 2x speed, etc)
# the maximum number of items to process per tick
# the minimum number of items to process per tick
# number of bars in the plot
NUM_BARS = 120
# if set, will render the animation to the file instead of displaying it
OUTPUT_VIDEO = None # 'combined2.mp4'
# how long to run the animation for when saving a video
VIDEO_LENGTH = 60 * 60
# if set, will write the the number of items proceed per tick to the specified file
# (when the video render ends, or when the window is closed)
OUTPUT_DATA = Path(OUTPUT_VIDEO).with_suffix('.txt') if OUTPUT_VIDEO else None
# {due tick: list of items}
queue = defaultdict(list)
queue[120] = list(range(1000))
n_processed_history = []
ax: matplotlib.axes.Axes
fig, ax = plt.subplots()
fig.set_size_inches(16 / 2, 9 / 2)
history = [0 for _ in range(120)]
average2 = 0
ticks_text = ax.annotate('Tick: 0', (0, 1), fontsize='x-large', xycoords='axes fraction', xytext=(10, -10), textcoords='offset points', ha='left', va='top', animated=True,)
avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{len(history)} tick avg')[0]
avg_line2 = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='green', linestyle='--', label='weighted "avg"')[0]
bars =, [0 for _ in range(NUM_BARS)], label='queue size')
xlim=[0, NUM_BARS],
ylim=[0, MAX_PER_TICK * 2],
xlabel='Queue due tick',
ylabel='Items in queue'
ax.legend(loc='upper right')
def process_item(item):
return 120
def process_first_queue(tick, force=False, max_to_process=MAX_PER_TICK, reschedule=True):
# this is a O(n) (slow) way to find the smallest element,
# it could be done in constant time with a min heap
# but this is fine for testing
next_update_tick = min(queue.keys())
num_items = len(queue[next_update_tick])
if not force and next_update_tick > tick:
return 0, num_items
print(f'tick {tick}, processing updates for {next_update_tick} (+{next_update_tick - tick})')
# process and schedule items for later, how later depends on what
# the process_item function returns
num_processed = 0
items = queue.pop(next_update_tick)
for i in range(min(len(items), max_to_process)):
# note that this is popping items from the right side of the list
# so that previously delayed items are processed first
# this prevents items from being delayed indefinitely
item = items.pop()
delay = process_item(item)# - (i % 2)
queue[tick + delay].append(item)
num_processed += 1
# move anything not processed to the right side of the next tick's queue
if items:
if reschedule:
print(f' delaying {len(items)} to tick {tick + 1}')
queue[tick + 1].extend(items)
return num_processed, num_items
def update_standard(tick, average_items):
Update method that updates up to MAX_PER_TICK items every tick
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=MAX_PER_TICK)
if not num_processed:
total_items += num_items
total_processed += num_processed
return total_items, total_processed
def update_overflow_to_avg(tick, average_items):
Update method that limits the number of processed items to the average
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=new_max)
if not num_processed:
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def update_extra_to_avg(tick, average_items):
'''Update method that processes extra items up to the average'''
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick)
if not num_processed:
total_processed += num_processed
total_items += num_items
if not queue:
return total_items, total_processed
# process some items early (up to the average, minus how far in the future the next queue is)
next_update_tick = min(queue.keys())
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1
num_extra_to_process = max(0, want_to_process)
while num_extra_to_process > 0:
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False)
if not num_processed:
num_extra_to_process -= num_processed
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def update_combined(tick, average_items):
Update method that limits the number of processed items to the average as well as
processes extra items up to the average
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=new_max)
if not num_processed:
total_processed += num_processed
total_items += num_items
if not queue:
return total_items, total_processed
# process some items early (up to the average, minus how far in the future the next queue is)
next_update_tick = min(queue.keys())
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1
num_extra_to_process = max(0, want_to_process)
while num_extra_to_process > 0:
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False)
if not num_processed:
num_extra_to_process -= num_processed
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def do_update(tick):
global average2
# update_func = update_standard
# update_func = update_overflow_to_avg
# update_func = update_extra_to_avg
update_func = update_combined
total_items, num_processed = update_func(tick, round(average2))
average2 = average2 * (119/120) + total_items * (1/120)
avg = sum(history) / len(history)
avg_line.set_data(range(NUM_BARS), [avg for _ in range(NUM_BARS)])
avg_line2.set_data(range(NUM_BARS), [average2 for _ in range(NUM_BARS)])
for i, bar_artist in enumerate(bars.patches):
bar_artist.set_height(len(queue.get(tick + i, [])))
secs, rem_ticks = divmod(tick, 60)
mins, secs = divmod(secs, 60)
ticks_text.set_text(f'Tick: {tick}\n{mins:02d}m{secs:02d}s+{rem_ticks}')
return bars.patches + [avg_line, avg_line2, ticks_text]
def update_bars(tick):
artists = []
for i in range(TICKS_PER_FRAME):
update_tick = tick * TICKS_PER_FRAME + i + 1
artists = do_update(update_tick)
return artists
ani = animation.FuncAnimation(
frames=range(VIDEO_LENGTH) if OUTPUT_VIDEO else None,
interval=1 if OUTPUT_VIDEO else 1000 / 60,
if OUTPUT_VIDEO:, fps=60, dpi=100)
with open(OUTPUT_DATA, 'w') as f:
f.write('\n'.join(str(n) for n in n_processed_history))
datasets = {}
for file in Path('.').glob('*.txt'):
name = file.stem.replace('_', ' ')
with open(file) as f:
data = [int(n) for n in'\n')]
data = np.lib.stride_tricks.sliding_window_view(data, 120)
datasets[name] = np.std(data, axis=1)
ax: matplotlib.axes.Axes
fig, ax = plt.subplots()
fig.set_size_inches(16 / 2, 9 / 2)
# plot sorted by the last entry, ascending
for name, data in sorted(datasets.items(), key=lambda x: x[1][-1], reverse=True):
ax.plot(range(len(data)), data, label=name)
xlabel='Time (ticks)',
ylabel='Standard deviation'
ax.legend(loc='upper right')
plt.autoscale(tight=True, axis='x')
fig.savefig('stdev.png', dpi=300)
