Skip to content

Instantly share code, notes, and snippets.

@udf
Created May 9, 2024 22:12
Show Gist options
  • Save udf/8b156d8c9fc6f7ae0778a2194aad9108 to your computer and use it in GitHub Desktop.
Save udf/8b156d8c9fc6f7ae0778a2194aad9108 to your computer and use it in GitHub Desktop.
from pathlib import Path
from collections import defaultdict
import matplotlib.axes
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.animation as animation
# number of ticks to simulate per frame (1 is real speed, 2 is 2x speed, etc)
TICKS_PER_FRAME = 1
# the maximum number of items to process per tick
MAX_PER_TICK = 100
# the minimum number of items to process per tick
MIN_PER_TICK = 10
# number of bars in the plot
NUM_BARS = 120
# if set, will render the animation to the file instead of displaying it
OUTPUT_VIDEO = None # 'combined2.mp4'
# how long to run the animation for when saving a video
VIDEO_LENGTH = 60 * 60
# if set, will write the the number of items proceed per tick to the specified file
# (when the video render ends, or when the window is closed)
OUTPUT_DATA = Path(OUTPUT_VIDEO).with_suffix('.txt') if OUTPUT_VIDEO else None
# {due tick: list of items}
queue = defaultdict(list)
queue[120] = list(range(1000))
n_processed_history = []
ax: matplotlib.axes.Axes
fig, ax = plt.subplots()
fig.set_size_inches(16 / 2, 9 / 2)
history = [0 for _ in range(120)]
average2 = 0
ticks_text = ax.annotate('Tick: 0', (0, 1), fontsize='x-large', xycoords='axes fraction', xytext=(10, -10), textcoords='offset points', ha='left', va='top', animated=True,)
avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{len(history)} tick avg')[0]
avg_line2 = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='green', linestyle='--', label='weighted "avg"')[0]
bars = ax.bar(range(NUM_BARS), [0 for _ in range(NUM_BARS)], label='queue size')
ax.set(
xlim=[0, NUM_BARS],
ylim=[0, MAX_PER_TICK * 2],
xlabel='Queue due tick',
ylabel='Items in queue'
)
ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('+%d'))
ax.legend(loc='upper right')
fig.tight_layout()
def process_item(item):
return 120
def process_first_queue(tick, force=False, max_to_process=MAX_PER_TICK, reschedule=True):
# this is a O(n) (slow) way to find the smallest element,
# it could be done in constant time with a min heap
# but this is fine for testing
next_update_tick = min(queue.keys())
num_items = len(queue[next_update_tick])
if not force and next_update_tick > tick:
return 0, num_items
print(f'tick {tick}, processing updates for {next_update_tick} (+{next_update_tick - tick})')
# process and schedule items for later, how later depends on what
# the process_item function returns
num_processed = 0
items = queue.pop(next_update_tick)
for i in range(min(len(items), max_to_process)):
# note that this is popping items from the right side of the list
# so that previously delayed items are processed first
# this prevents items from being delayed indefinitely
item = items.pop()
delay = process_item(item)# - (i % 2)
queue[tick + delay].append(item)
num_processed += 1
# move anything not processed to the right side of the next tick's queue
if items:
if reschedule:
print(f' delaying {len(items)} to tick {tick + 1}')
queue[tick + 1].extend(items)
else:
queue[next_update_tick].extend(items)
return num_processed, num_items
def update_standard(tick, average_items):
'''
Update method that updates up to MAX_PER_TICK items every tick
'''
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=MAX_PER_TICK)
if not num_processed:
break
total_items += num_items
total_processed += num_processed
return total_items, total_processed
def update_overflow_to_avg(tick, average_items):
'''
Update method that limits the number of processed items to the average
'''
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=new_max)
if not num_processed:
break
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def update_extra_to_avg(tick, average_items):
'''Update method that processes extra items up to the average'''
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick)
if not num_processed:
break
total_processed += num_processed
total_items += num_items
if not queue:
return total_items, total_processed
# process some items early (up to the average, minus how far in the future the next queue is)
next_update_tick = min(queue.keys())
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1
num_extra_to_process = max(0, want_to_process)
while num_extra_to_process > 0:
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False)
if not num_processed:
break
num_extra_to_process -= num_processed
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def update_combined(tick, average_items):
'''
Update method that limits the number of processed items to the average as well as
processes extra items up to the average
'''
new_max = max(MIN_PER_TICK, min(average_items, MAX_PER_TICK))
total_processed = 0
total_items = 0
while 1:
num_processed, num_items = process_first_queue(tick, max_to_process=new_max)
if not num_processed:
break
total_processed += num_processed
total_items += num_items
if not queue:
return total_items, total_processed
# process some items early (up to the average, minus how far in the future the next queue is)
next_update_tick = min(queue.keys())
want_to_process = new_max - total_processed - (next_update_tick - tick) - 1
num_extra_to_process = max(0, want_to_process)
while num_extra_to_process > 0:
num_processed, num_items = process_first_queue(tick, force=True, max_to_process=num_extra_to_process, reschedule=False)
if not num_processed:
break
num_extra_to_process -= num_processed
total_processed += num_processed
total_items += num_items
return total_items, total_processed
def do_update(tick):
global average2
# update_func = update_standard
# update_func = update_overflow_to_avg
# update_func = update_extra_to_avg
update_func = update_combined
total_items, num_processed = update_func(tick, round(average2))
if OUTPUT_DATA:
n_processed_history.append(num_processed)
average2 = average2 * (119/120) + total_items * (1/120)
history.pop(0)
history.append(total_items)
avg = sum(history) / len(history)
avg_line.set_data(range(NUM_BARS), [avg for _ in range(NUM_BARS)])
avg_line2.set_data(range(NUM_BARS), [average2 for _ in range(NUM_BARS)])
for i, bar_artist in enumerate(bars.patches):
bar_artist.set_height(len(queue.get(tick + i, [])))
secs, rem_ticks = divmod(tick, 60)
mins, secs = divmod(secs, 60)
ticks_text.set_text(f'Tick: {tick}\n{mins:02d}m{secs:02d}s+{rem_ticks}')
return bars.patches + [avg_line, avg_line2, ticks_text]
def update_bars(tick):
artists = []
for i in range(TICKS_PER_FRAME):
update_tick = tick * TICKS_PER_FRAME + i + 1
artists = do_update(update_tick)
return artists
ani = animation.FuncAnimation(
fig=fig,
func=update_bars,
frames=range(VIDEO_LENGTH) if OUTPUT_VIDEO else None,
interval=1 if OUTPUT_VIDEO else 1000 / 60,
cache_frame_data=False,
blit=True,
repeat=False
)
if OUTPUT_VIDEO:
ani.save(OUTPUT_VIDEO, fps=60, dpi=100)
else:
plt.show()
if OUTPUT_DATA:
with open(OUTPUT_DATA, 'w') as f:
f.write('\n'.join(str(n) for n in n_processed_history))
from pathlib import Path
import numpy as np
import matplotlib.axes
import matplotlib.pyplot as plt
datasets = {}
for file in Path('.').glob('*.txt'):
name = file.stem.replace('_', ' ')
with open(file) as f:
data = [int(n) for n in f.read().split('\n')]
data = np.lib.stride_tricks.sliding_window_view(data, 120)
datasets[name] = np.std(data, axis=1)
ax: matplotlib.axes.Axes
fig, ax = plt.subplots()
fig.set_size_inches(16 / 2, 9 / 2)
# plot sorted by the last entry, ascending
for name, data in sorted(datasets.items(), key=lambda x: x[1][-1], reverse=True):
ax.plot(range(len(data)), data, label=name)
ax.set(
ylim=0,
xlabel='Time (ticks)',
ylabel='Standard deviation'
)
ax.legend(loc='upper right')
fig.tight_layout()
plt.autoscale(tight=True, axis='x')
fig.savefig('stdev.png', dpi=300)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment