Skip to content

Instantly share code, notes, and snippets.

@bengardner
Created May 13, 2024 23:01
Show Gist options
  • Save bengardner/34d5746ef13c261139636a4782a4d14f to your computer and use it in GitHub Desktop.
Save bengardner/34d5746ef13c261139636a4782a4d14f to your computer and use it in GitHub Desktop.
'''
Direct implementation if DeadlineQueue from Lua.
'''
from game_tick import game
import math
class DeadlineQueue:
def __init__(self, slice_ticks, slice_count):
self.SLICE_TICKS = slice_ticks
self.SLICE_COUNT = slice_count
self.cur_slice = math.floor(game.tick / slice_ticks)
self.cur_index = 1 + (self.cur_slice % slice_count)
self.deadline = (1 + self.cur_slice) * slice_ticks
# NOTE: using a dict instead of an array to match lua
self.slices = dict()
for xx in range(1, slice_count + 1):
self.slices[xx] = dict()
def _queue(self, key, value, deadline, wrap_index):
rel_slice = max(0, math.floor(deadline / self.SLICE_TICKS) - self.cur_slice)
if rel_slice >= self.SLICE_COUNT:
if wrap_index:
rel_slice = self.SLICE_COUNT - 1
else:
return False
abs_index = 1 + (self.cur_index + rel_slice - 1) % self.SLICE_COUNT
value['_deadline'] = deadline
self.slices[abs_index][key] = value
return True
def queue(self, key, value, deadline):
self._queue(key, value, deadline, True)
def queue_maybe(self, key, value, deadline):
self._queue(key, value, deadline, False)
def purge(self, key):
for _, qq in self.slices.items():
if key in qq:
del qq[key]
def _advance_index(self):
self.cur_index = 1 + (self.cur_index % self.SLICE_COUNT)
self.cur_slice += 1
self.deadline += self.SLICE_TICKS
def next(self):
now = game.tick
while True:
deadline = self.deadline
qq = self.slices[self.cur_index]
#print("now:", now, "deadline:", deadline, "cur_index:", self.cur_index, "qq:", qq)
for key, val in list(qq.items()): # have to convert to a list to delete while iterating
if now >= val['_deadline']:
del qq[key]
return key, val
elif val['_deadline'] > deadline:
del qq[key]
self._queue(key, val, val['_deadline'], True)
if qq or now < deadline:
return None, None
self._advance_index()
def next_coarse(self):
now = game.tick
while True:
deadline = self.deadline
qq = self.slices[self.cur_index]
for key, val in list(qq.items()): # have to convert to a list to delete while iterating
if val['_deadline'] > deadline:
del qq[key]
self.queue(key, val, val['_deadline'], True)
else:
del qq[key]
return key, val
if now < deadline:
return None, None
self._advance_index()
def get_current_count(self):
return len(self.slices[self.cur_index])
def get_count(self, offset=None):
if offset is None:
offset = 0
cur_index = 1 + ((self.cur_index - 1 + offset) % self.SLICE_COUNT)
return len(self.slices[cur_index])
# some usage example code
if __name__ == '__main__':
dlq = DeadlineQueue(1, 60)
for x in range(60):
val = { x: x }
dlq.queue(x, val, x)
for x in range(200):
game.elapse()
print('Tick:', game.tick)
while True:
key, val = dlq.next()
if key is None:
break
print(" -", key, val)
dlq.queue(key, val, game.tick + 30)
from pathlib import Path
from collections import defaultdict
import math
import matplotlib.axes
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.animation as animation
from DeadlineQueue import DeadlineQueue
# Factorio emulation -- just the tick. defines global 'game.tick'
from game_tick import game
# number of ticks to simulate per frame (1 is real speed, 2 is 2x speed, etc)
TICKS_PER_FRAME = 1
# the maximum number of items to process per tick
MAX_PER_TICK = 70
# the minimum number of items to process per tick
MIN_PER_TICK = 10
# number of bars in the plot
NUM_BARS = 240
# size of the averaging window
AVE_LENGTH = 180
# if set, will render the animation to the file instead of displaying it
OUTPUT_VIDEO = None # 'combined2.mp4'
# how long to run the animation for when saving a video
VIDEO_LENGTH = 60 * 60
# if set, will write the the number of items proceed per tick to the specified file
# (when the video render ends, or when the window is closed)
OUTPUT_DATA = Path(OUTPUT_VIDEO).with_suffix('.txt') if OUTPUT_VIDEO else None
#-----------------------------------------------------------------------------#
# Set up the fake entities
def preload_entities(dl_queue):
# { period: count }
preload_entities = {
#120: 1000,
120: MAX_PER_TICK * 120 / 2,
#60: 1000,
#180: 1000,
#10*60: 1000,
}
# set up a bunch of 120-tick reloads
key = 1
for period, count in preload_entities.items():
for xx in range(math.ceil(count)):
key += 1
dl_queue.queue(key, { "key": key, "period": period }, period)
dl_queue = DeadlineQueue(1, NUM_BARS)
preload_entities(dl_queue)
#-----------------------------------------------------------------------------#
# {due tick: list of items}
queue = defaultdict(list)
queue[120] = list(range(1000))
n_processed_history = []
ax: matplotlib.axes.Axes
fig, ax = plt.subplots()
fig.set_size_inches(16 / 2, 9 / 2)
history = [0 for _ in range(120)]
average2 = 0
ticks_text = ax.annotate('Tick: 0', (0, 1), fontsize='x-large', xycoords='axes fraction', xytext=(10, -10), textcoords='offset points', ha='left', va='top', animated=True,)
#avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{len(history)} tick avg')[0]
avg_line = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='grey', linestyle='--', label=f'{AVE_LENGTH} tick avg')[0]
avg_line2 = ax.plot(range(NUM_BARS), [1000 for _ in range(NUM_BARS)], color='green', linestyle='--', label='weighted "avg"')[0]
bars = ax.bar(range(NUM_BARS), [0 for _ in range(NUM_BARS)], label='queue size')
ax.set(
xlim=[0, NUM_BARS],
ylim=[0, MAX_PER_TICK * 2],
xlabel='Queue due tick',
ylabel='Items in queue'
)
ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('+%d'))
ax.legend(loc='upper right')
fig.tight_layout()
def process_item(item):
period = item.get('period')
if period:
return period
return 120
class DlqState:
def __init__(self):
self.ave_sum = 0
self.ave_idx = 0
self.ave_arr = [0] * AVE_LENGTH
def ave_add_sample(self, count):
self.ave_sum -= self.ave_arr[self.ave_idx]
self.ave_arr[self.ave_idx] = count
self.ave_sum += count
self.ave_idx = (self.ave_idx + 1) % len(self.ave_arr)
def ave_get(self):
return self.ave_sum / len(self.ave_arr)
def get_ipt(self, tick_delta):
ipt = self.ave_get()
if tick_delta > 1:
ipt += tick_delta
return max(MIN_PER_TICK, min(MAX_PER_TICK, math.ceil(ipt)))
dlq_state = DlqState()
def update_deadline_queue(tick, average_items):
'''
Update method that updates up to MAX_PER_TICK items every tick
'''
game.tick = tick
global dlq_state
# grab the items-per-tick, which is the running average serviced plus
# a bonus depending on how late we are running.
ipt = dlq_state.get_ipt(tick - dl_queue.deadline)
total_processed = 0
total_items = None
# process up to ipt items
for _ in range(ipt):
key, val = dl_queue.next()
if key is None:
break
if total_items is None:
total_items = dl_queue.get_count() + 1
dt = process_item(val)
dl_queue.queue(key, val, tick + dt)
total_processed += 1
# update the running average
dlq_state.ave_add_sample(total_processed)
if total_items is None:
total_items = 0
print('{}/{}'.format(tick, dl_queue.deadline),
'ipt:', ipt,
'ave:',
math.floor(dlq_state.ave_get()),
"processed:", total_processed, total_items,
'next:', dl_queue.get_count(),
'dl:', dl_queue.deadline, dl_queue.deadline < tick)
return total_items, total_processed
def do_update(tick):
game.tick = tick
global average2
update_func = update_deadline_queue
total_items, num_processed = update_func(tick, round(average2))
if OUTPUT_DATA:
n_processed_history.append(num_processed)
average2 = average2 * (119/120) + total_items * (1/120)
history.pop(0)
history.append(total_items)
avg = sum(history) / len(history)
dlq_ave = dlq_state.ave_get()
avg_line.set_data(range(NUM_BARS), [dlq_ave for _ in range(NUM_BARS)])
avg_line2.set_data(range(NUM_BARS), [average2 for _ in range(NUM_BARS)])
# set the height of each bar
for i, bar_artist in enumerate(bars.patches):
hh = dl_queue.get_count(i)
# hh = len(queue.get(tick + i, []))
bar_artist.set_height(hh)
secs, rem_ticks = divmod(tick, 60)
mins, secs = divmod(secs, 60)
tick_delta = -(1 + game.tick - dl_queue.deadline)
ticks_text.set_text(f'Tick: {tick} ({tick_delta})\n{mins:02d}m{secs:02d}s+{rem_ticks}')
return bars.patches + [avg_line, avg_line2, ticks_text]
def update_bars(tick):
artists = []
for i in range(TICKS_PER_FRAME):
update_tick = tick * TICKS_PER_FRAME + i + 1
artists = do_update(update_tick)
return artists
ani = animation.FuncAnimation(
fig=fig,
func=update_bars,
frames=range(VIDEO_LENGTH) if OUTPUT_VIDEO else None,
interval=1 if OUTPUT_VIDEO else 1000 / 60,
cache_frame_data=False,
blit=True,
repeat=False
)
if OUTPUT_VIDEO:
ani.save(OUTPUT_VIDEO, fps=60, dpi=100)
else:
plt.show()
if OUTPUT_DATA:
with open(OUTPUT_DATA, 'w') as f:
f.write('\n'.join(str(n) for n in n_processed_history))
'''
Implements game.tick.
'''
class FactorioGame:
def __init__(self):
self.tick = 0
def elapse(self):
self.tick += 1
game = FactorioGame()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment