Created
May 13, 2024 23:01
-
-
Save bengardner/34d5746ef13c261139636a4782a4d14f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Direct implementation if DeadlineQueue from Lua. | |
''' | |
from game_tick import game | |
import math | |
class DeadlineQueue: | |
def __init__(self, slice_ticks, slice_count): | |
self.SLICE_TICKS = slice_ticks | |
self.SLICE_COUNT = slice_count | |
self.cur_slice = math.floor(game.tick / slice_ticks) | |
self.cur_index = 1 + (self.cur_slice % slice_count) | |
self.deadline = (1 + self.cur_slice) * slice_ticks | |
# NOTE: using a dict instead of an array to match lua | |
self.slices = dict() | |
for xx in range(1, slice_count + 1): | |
self.slices[xx] = dict() | |
def _queue(self, key, value, deadline, wrap_index): | |
rel_slice = max(0, math.floor(deadline / self.SLICE_TICKS) - self.cur_slice) | |
if rel_slice >= self.SLICE_COUNT: | |
if wrap_index: | |
rel_slice = self.SLICE_COUNT - 1 | |
else: | |
return False | |
abs_index = 1 + (self.cur_index + rel_slice - 1) % self.SLICE_COUNT | |
value['_deadline'] = deadline | |
self.slices[abs_index][key] = value | |
return True | |
def queue(self, key, value, deadline): | |
self._queue(key, value, deadline, True) | |
def queue_maybe(self, key, value, deadline): | |
self._queue(key, value, deadline, False) | |
def purge(self, key): | |
for _, qq in self.slices.items(): | |
if key in qq: | |
del qq[key] | |
def _advance_index(self): | |
self.cur_index = 1 + (self.cur_index % self.SLICE_COUNT) | |
self.cur_slice += 1 | |
self.deadline += self.SLICE_TICKS | |
def next(self): | |
now = game.tick | |
while True: | |
deadline = self.deadline | |
qq = self.slices[self.cur_index] | |
#print("now:", now, "deadline:", deadline, "cur_index:", self.cur_index, "qq:", qq) | |
for key, val in list(qq.items()): # have to convert to a list to delete while iterating | |
if now >= val['_deadline']: | |
del qq[key] | |
return key, val | |
elif val['_deadline'] > deadline: | |
del qq[key] | |
self._queue(key, val, val['_deadline'], True) | |
if qq or now < deadline: | |
return None, None | |
self._advance_index() | |
def next_coarse(self): | |
now = game.tick | |
while True: | |
deadline = self.deadline | |
qq = self.slices[self.cur_index] | |
for key, val in list(qq.items()): # have to convert to a list to delete while iterating | |
if val['_deadline'] > deadline: | |
del qq[key] | |
self.queue(key, val, val['_deadline'], True) | |
else: | |
del qq[key] | |
return key, val | |
if now < deadline: | |
return None, None | |
self._advance_index() | |
def get_current_count(self): | |
return len(self.slices[self.cur_index]) | |
def get_count(self, offset=None): | |
if offset is None: | |
offset = 0 | |
cur_index = 1 + ((self.cur_index - 1 + offset) % self.SLICE_COUNT) | |
return len(self.slices[cur_index]) | |
# some usage example code | |
if __name__ == '__main__': | |
dlq = DeadlineQueue(1, 60) | |
for x in range(60): | |
val = { x: x } | |
dlq.queue(x, val, x) | |
for x in range(200): | |
game.elapse() | |
print('Tick:', game.tick) | |
while True: | |
key, val = dlq.next() | |
if key is None: | |
break | |
print(" -", key, val) | |
dlq.queue(key, val, game.tick + 30) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from collections import defaultdict | |
import math | |
import matplotlib.axes | |
import matplotlib.pyplot as plt | |
import matplotlib.ticker as ticker | |
import matplotlib.animation as animation | |
from DeadlineQueue import DeadlineQueue | |
# Factorio emulation -- just the tick. defines global 'game.tick' | |
from game_tick import game | |
# number of ticks to simulate per frame (1 is real speed, 2 is 2x speed, etc) | |
TICKS_PER_FRAME = 1 | |
# the maximum number of items to process per tick | |
MAX_PER_TICK = 70 | |
# the minimum number of items to process per tick | |
MIN_PER_TICK = 10 | |
# number of bars in the plot | |
NUM_BARS = 240 | |
# size of the averaging window | |
AVE_LENGTH = 180 | |
# if set, will render the animation to the file instead of displaying it | |
OUTPUT_VIDEO = None # 'combined2.mp4' | |
# how long to run the animation for when saving a video | |
VIDEO_LENGTH = 60 * 60 | |
# if set, will write the the number of items proceed per tick to the specified file | |
# (when the video render ends, or when the window is closed) | |
OUTPUT_DATA = Path(OUTPUT_VIDEO).with_suffix('.txt') if OUTPUT_VIDEO else None | |
#-----------------------------------------------------------------------------# | |
# Set up the fake entities | |
def preload_entities(dl_queue): | |
# { period: count } | |
preload_entities = { | |
#120: 1000, | |
120: MAX_PER_TICK * 120 / 2, | |
#60: 1000, | |
#180: 1000, | |
#10*60: 1000, | |
} | |
# set up a bunch of 120-tick reloads | |
key = 1 | |
for period, count in preload_entities.items(): | |
for xx in range(math.ceil(count)): | |
key += 1 | |
dl_queue.queue(key, { "key": key, "period": period }, period) | |
dl_queue = DeadlineQueue(1, NUM_BARS) | |
preload_entities(dl_queue) | |
#-----------------------------------------------------------------------------# | |
# {due tick: list of items} | |
queue = defaultdict(list) | |
queue[120] = list(range(1000)) | |
n_processed_history = [] | |
ax: matplotlib.axes.Axes | |
fig, ax = plt.subplots() | |
fig.set_size_inches(16 / 2, 9 / 2) | |
history = [0 for _ in range(120)] | |
average2 = 0 | |
ticks_text = ax.annotate('Tick: 0', (0, 1), fontsize='x-large', xycoords='axes fraction', xytext=(10, -10), textcoords='offset points', ha='left', va='top', animated=True,) | |
#avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{len(history)} tick avg')[0] | |
avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{AVE_LENGTH} tick avg')[0] | |
avg_line2 = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='green', linestyle='--', label='weighted "avg"')[0] | |
bars = ax.bar(range(NUM_BARS), [0 for _ in range(NUM_BARS)], label='queue size') | |
ax.set( | |
xlim=[0, NUM_BARS], | |
ylim=[0, MAX_PER_TICK * 2], | |
xlabel='Queue due tick', | |
ylabel='Items in queue' | |
) | |
ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('+%d')) | |
ax.legend(loc='upper right') | |
fig.tight_layout() | |
def process_item(item): | |
period = item.get('period') | |
if period: | |
return period | |
return 120 | |
class DlqState: | |
def __init__(self): | |
self.ave_sum = 0 | |
self.ave_idx = 0 | |
self.ave_arr = [0] * AVE_LENGTH | |
def ave_add_sample(self, count): | |
self.ave_sum -= self.ave_arr[self.ave_idx] | |
self.ave_arr[self.ave_idx] = count | |
self.ave_sum += count | |
self.ave_idx = (self.ave_idx + 1) % len(self.ave_arr) | |
def ave_get(self): | |
return self.ave_sum / len(self.ave_arr) | |
def get_ipt(self, tick_delta): | |
ipt = self.ave_get() | |
if tick_delta > 1: | |
ipt += tick_delta | |
return max(MIN_PER_TICK, min(MAX_PER_TICK, math.ceil(ipt))) | |
dlq_state = DlqState() | |
def update_deadline_queue(tick, average_items): | |
''' | |
Update method that updates up to MAX_PER_TICK items every tick | |
''' | |
game.tick = tick | |
global dlq_state | |
# grab the items-per-tick, which is the running average serviced plus | |
# a bonus depending on how late we are running. | |
ipt = dlq_state.get_ipt(tick - dl_queue.deadline) | |
total_processed = 0 | |
total_items = None | |
# process up to ipt items | |
for _ in range(ipt): | |
key, val = dl_queue.next() | |
if key is None: | |
break | |
if total_items is None: | |
total_items = dl_queue.get_count() + 1 | |
dt = process_item(val) | |
dl_queue.queue(key, val, tick + dt) | |
total_processed += 1 | |
# update the running average | |
dlq_state.ave_add_sample(total_processed) | |
if total_items is None: | |
total_items = 0 | |
print('{}/{}'.format(tick, dl_queue.deadline), | |
'ipt:', ipt, | |
'ave:', | |
math.floor(dlq_state.ave_get()), | |
"processed:", total_processed, total_items, | |
'next:', dl_queue.get_count(), | |
'dl:', dl_queue.deadline, dl_queue.deadline < tick) | |
return total_items, total_processed | |
def do_update(tick): | |
game.tick = tick | |
global average2 | |
update_func = update_deadline_queue | |
total_items, num_processed = update_func(tick, round(average2)) | |
if OUTPUT_DATA: | |
n_processed_history.append(num_processed) | |
average2 = average2 * (119/120) + total_items * (1/120) | |
history.pop(0) | |
history.append(total_items) | |
avg = sum(history) / len(history) | |
dlq_ave = dlq_state.ave_get() | |
avg_line.set_data(range(NUM_BARS), [dlq_ave for _ in range(NUM_BARS)]) | |
avg_line2.set_data(range(NUM_BARS), [average2 for _ in range(NUM_BARS)]) | |
# set the height of each bar | |
for i, bar_artist in enumerate(bars.patches): | |
hh = dl_queue.get_count(i) | |
# hh = len(queue.get(tick + i, [])) | |
bar_artist.set_height(hh) | |
secs, rem_ticks = divmod(tick, 60) | |
mins, secs = divmod(secs, 60) | |
tick_delta = -(1 + game.tick - dl_queue.deadline) | |
ticks_text.set_text(f'Tick: {tick} ({tick_delta})\n{mins:02d}m{secs:02d}s+{rem_ticks}') | |
return bars.patches + [avg_line, avg_line2, ticks_text] | |
def update_bars(tick): | |
artists = [] | |
for i in range(TICKS_PER_FRAME): | |
update_tick = tick * TICKS_PER_FRAME + i + 1 | |
artists = do_update(update_tick) | |
return artists | |
ani = animation.FuncAnimation( | |
fig=fig, | |
func=update_bars, | |
frames=range(VIDEO_LENGTH) if OUTPUT_VIDEO else None, | |
interval=1 if OUTPUT_VIDEO else 1000 / 60, | |
cache_frame_data=False, | |
blit=True, | |
repeat=False | |
) | |
if OUTPUT_VIDEO: | |
ani.save(OUTPUT_VIDEO, fps=60, dpi=100) | |
else: | |
plt.show() | |
if OUTPUT_DATA: | |
with open(OUTPUT_DATA, 'w') as f: | |
f.write('\n'.join(str(n) for n in n_processed_history)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Implements game.tick. | |
''' | |
class FactorioGame: | |
def __init__(self): | |
self.tick = 0 | |
def elapse(self): | |
self.tick += 1 | |
game = FactorioGame() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment