Skip to content

Instantly share code, notes, and snippets.

@icook
Last active December 28, 2015 17:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save icook/7534827 to your computer and use it in GitHub Desktop.
Save icook/7534827 to your computer and use it in GitHub Desktop.
A thread based runner for robot game
"""
This is a thread based comparison runner for robot game. Essentially it takes the latest
x number of scripts in a folder called "engines" and runs every combination against eachother
repeatedly. It shows simple W/L ratio to help figure out which algorithms work the best.
"""
import curses
import path
import sys
import threading
import traceback
import datetime
import pprint
import os
import ast
import game
import itertools
from tabulate import tabulate
from settings import settings
from time import sleep
def make_player(fname):
return game.Player(open(fname).read())
# how many files to run together
latest_count = 5
class GameRunner(threading.Thread):
""" Run a single instance of the robot game against two ai engines """
def __init__(self, run_record, map_data):
super(GameRunner, self).__init__()
self._stop = threading.Event()
self.run_record = run_record
self.map_data = map_data
self.timer = {}
def st(self, *args):
for a in args:
self.timer[a] = datetime.datetime.now()
def gt(self, key):
""" update an average for this timing sample """
# get the time sample
now = datetime.datetime.now()
rt = (now - self.timer[key]).microseconds
self.timer[key] = now
# update the average with a moving average value (avoids keeping a list)
ct = self.run_record[key + 's']
avg = self.run_record['avg_' + key]
self.run_record['avg_' + key] = ((ct * avg) + rt) / (ct + 1)
self.run_record[key + 's'] += 1
def run(self):
game.init_settings(self.map_data)
self.st('turn')
while not self._stop.is_set():
players = [make_player(self.run_record['p1']['filename']),
make_player(self.run_record['p2']['filename'])]
g = game.Game(*players, record_turns=False)
for i in range(settings.max_turns):
g.run_turn()
self.gt('turn')
scores = g.get_scores()
if scores[0] > scores[1]:
self.run_record['p1']['wins'] += 1
self.run_record['p2']['losses'] += 1
else:
self.run_record['p1']['losses'] += 1
self.run_record['p2']['wins'] += 1
self.run_record['games'] += 1
def stop(self):
self._stop.set()
def main(stdscr, threads):
# Clear screen
stdscr.clear()
# map data to be used in workers
map_name = os.path.join(os.path.dirname(__file__), 'maps/default.py')
map_data = ast.literal_eval(open(map_name).read())
# record all details of workers
run_records = []
# get the most recently modified x objects
d = path.path('./engines/').files('*.py')
d.sort(key=lambda x: x.mtime, reverse=True)
latest = d[:latest_count]
# create a thread for each combination and tally results
i = 0
for p1, p2 in itertools.combinations(latest, 2):
dct = {'p1': {'filename': p1,
'wins': 0,
'losses': 0},
'p2': {'filename': p2,
'wins': 0,
'losses': 0},
'avg_turn': 1,
'turns': 0,
'start': datetime.datetime.now(),
'games': 0
}
run_records.append(dct)
thread = GameRunner(run_records[i], map_data)
thread.start()
run_records[i]['thread'] = thread
threads.append(thread)
i += 1
while True:
data = {'P1 fname': [],
'P2 fname': [],
'P1 W/L': [],
'P2 W/L': [],
'Games': [],
'Turns/Sec': [],
'Turns': [],
'Seconds Running': [],
'Overall Turns/Sec': [],
}
for val in run_records:
p1 = val['p1']
p2 = val['p2']
try:
wl1 = float(p1['wins']) / p1['losses']
except ZeroDivisionError:
wl1 = -1
try:
wl2 = float(p2['wins']) / p2['losses']
except ZeroDivisionError:
wl2 = -1
secs = (datetime.datetime.now() - val['start']).seconds
if secs == 0: secs = 1
data['P1 fname'].append(p1['filename'])
data['P2 fname'].append(p2['filename'])
data['P1 W/L'].append(wl1)
data['P2 W/L'].append(wl2)
data['Games'].append(val['games'])
data['Turns/Sec'].append(1000000 / val['avg_turn'])
data['Turns'].append(val['turns'])
data['Overall Turns/Sec'].append(val['turns'] / secs)
data['Seconds Running'].append(secs)
tab = tabulate(data, headers="keys", floatfmt=".4f").replace("\n", "\r")
for i, t in enumerate(tab.splitlines(True)):
stdscr.addstr(i, 0, t)
stdscr.refresh()
sleep(1)
if __name__ == "__main__":
threads = []
try:
# Initialize curses
stdscr = curses.initscr()
# Turn off echoing of keys, and enter cbreak mode,
# where no buffering is performed on keyboard input
curses.noecho()
curses.cbreak()
# In keypad mode, escape sequences for special keys
# (like the cursor keys) will be interpreted and
# a special value like curses.KEY_LEFT will be returned
stdscr.keypad(1)
# Start color, too. Harmless if the terminal doesn't have
# color; user can test with has_color() later on. The try/catch
# works around a minor bit of over-conscientiousness in the curses
# module -- the error return from C start_color() is ignorable.
try:
curses.start_color()
except:
pass
main(stdscr, threads)
finally:
traceback.print_exc(file=sys.stdout)
# Set everything back to normal
if 'stdscr' in locals():
stdscr.keypad(0)
curses.echo()
curses.nocbreak()
curses.endwin()
for t in threads:
t.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment