Created
December 19, 2013 19:20
-
-
Save mpeterv/8044646 to your computer and use it in GitHub Desktop.
Rewritten game.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import imp | |
import random | |
import sys | |
import traceback | |
from collections import defaultdict | |
from rgkit import rg | |
from rgkit.settings import settings, AttrDict | |
sys.modules['rg'] = rg # preserve backwards compatible robot imports | |
def init_settings(map_data): | |
settings.spawn_coords = map_data['spawn'] # FIX NAMING | |
settings.obstacles = map_data['obstacle'] | |
settings.start1 = map_data['start1'] | |
settings.start2 = map_data['start2'] | |
rg.set_settings(settings) | |
return settings | |
class GameState: | |
def __init__(self, settings, use_start=False, turn=0, next_robot_id=0): | |
self._settings = settings | |
self.robots = {} | |
self.turn = turn | |
self._next_robot_id = next_robot_id | |
if use_start: | |
for loc in self._settings.start1: | |
self.add_robot(loc, 0) | |
for loc in self._settings.start2: | |
self.add_robot(loc, 1) | |
def add_robot(self, loc, player_id, hp=None, robot_id=None): | |
if hp is None: | |
hp = self._settings.robot_hp | |
if robot_id is None: | |
robot_id = self._next_robot_id | |
self._next_robot_id += 1 | |
self.robots[loc] = AttrDict({ | |
'location': loc, | |
'hp': hp, | |
'player_id': player_id, | |
'robot_id': robot_id | |
}) | |
def remove_robot(self, loc): | |
if self.is_robot(loc): | |
del self.robots[loc] | |
def is_robot(self, loc): | |
return loc in self.robots | |
def _clear_spawn(self): | |
for loc in settings.spawn_coords: | |
if self.is_robot(loc): | |
self.remove_robot(loc) | |
def _spawn_robots(self): | |
# see http://stackoverflow.com/questions/2612648/reservoir-sampling | |
locations = [] | |
n = 0 | |
for loc in self._settings.spawn_coords: | |
n += 1 | |
if len(locations) < self._settings.spawn_per_player*2: | |
locations.append(loc) | |
else: | |
s = int(random.random()*n) | |
if s < self._settings.spawn_per_player*2: | |
locations[s] = loc | |
for i in xrange(self._settings.spawn_per_player): | |
self.add_robot(locations[i], 0) | |
self.add_robot(locations[i + self._settings.spawn_per_player], 1) | |
def _clear_dead(self): | |
dead = filter(lambda loc: self.robots[loc].hp <= 0, self.robots) | |
for loc in dead: | |
self.remove_robot(loc) | |
# actions = map (loc) -> (action) | |
# all actions must be valid | |
# returns new GameState | |
def apply_actions(self, actions, spawn=True): | |
new_state = GameState(self._settings, next_robot_id=self._next_robot_id, | |
turn=self.turn + 1) | |
def dest(loc): | |
if actions[loc][0] == 'move': | |
return actions[loc][1] | |
else: | |
return loc | |
hitpoints = defaultdict(lambda: set()) | |
def add(loc, target): | |
hitpoints[target].add(loc) | |
def remove(loc, target): | |
if loc in hitpoints[target]: | |
hitpoints[target].remove(loc) | |
def stuck(loc): | |
# we are not moving anywhere | |
# inform others | |
old_hitpoints = hitpoints[loc] | |
hitpoints[loc] = set([loc]) | |
for rival in old_hitpoints: | |
if rival != loc: | |
stuck(rival) | |
for loc in self.robots: | |
add(loc, dest(loc)) | |
for loc in self.robots: | |
if len(hitpoints[dest(loc)]) > 1 or (self.is_robot(dest(loc)) and | |
dest(loc) != loc and | |
dest(dest(loc)) == loc): | |
# we've got a problem | |
stuck(loc) | |
# calculate new locations | |
for loc, robot in self.robots.iteritems(): | |
if actions[loc][0] == 'move' and loc in hitpoints[loc]: | |
new_loc = loc | |
else: | |
new_loc = dest(loc) | |
new_state.add_robot(new_loc, | |
robot.player_id, robot.hp, robot.robot_id) | |
collisions = set() | |
for loc in self.robots: | |
for target in hitpoints[dest(loc)]: | |
if (target, loc) not in collisions: | |
collisions.add((loc, target)) | |
# apply collision damage | |
for (loc, loc2) in collisions: | |
if self.robots[loc].player_id != self.robots[loc2].player_id: | |
damage = self._settings.collision_damage | |
if actions[loc][0] != 'guard': | |
new_state.robots[loc].hp -= damage | |
if actions[loc2][0] != 'guard': | |
new_state.robots[loc2].hp -= damage | |
damage_map = defaultdict(lambda: [0, 0]) | |
for loc, robot in self.robots.iteritems(): | |
actor_id = robot.player_id | |
if actions[loc][0] == 'attack': | |
target = actions[loc][1] | |
damage = random.randint(*self._settings.attack_range) | |
damage_map[target][actor_id] += damage | |
if actions[loc][0] == 'suicide': | |
damage_map[target][1-actor_id] += self._settings.robot_hp | |
damage = self._settings.suicide_damage | |
for target in rg.locs_around(loc): | |
damage_map[target][actor_id] += damage | |
# apply damage | |
for loc, robot in new_state.robots.iteritems(): | |
damage_taken = damage_map[loc][1-robot.player_id] | |
if self.is_robot(loc) and actions[loc][0] == 'guard': | |
damage_taken /= 2 | |
robot.hp -= damage_taken | |
# clear dead bots | |
new_state._clear_dead() | |
if spawn: | |
if self.turn % 10 == 0: | |
new_state._clear_spawn() | |
new_state._spawn_robots() | |
return new_state | |
def get_scores(self): | |
scores = [0, 0] | |
for loc, robot in self.robots.iteritems(): | |
scores[robot.player_id] += 1 | |
return scores | |
# export GameState to be used by a robot | |
def get_game_info(self, player_id): | |
game_info = AttrDict() | |
game_info.robots = {loc: AttrDict(robot) | |
for loc, robot in self.robots.iteritems()} | |
for loc, robot in game_info.robots.iteritems(): | |
if robot.player_id != player_id: | |
del robot.robot_id | |
game_info.turn = self.turn | |
return game_info | |
# actor = location | |
# action = well, action | |
def is_valid_action(self, actor, action): | |
try: | |
if action[0] in ['move', 'attack']: | |
return action[1] in rg.locs_around( | |
actor, filter_out=['invalid', 'obstacle']) | |
elif action[0] in ['guard', 'suicide']: | |
return True | |
else: | |
return False | |
except Exception: | |
return False | |
class Player: | |
def __init__(self, code): | |
self._module = imp.new_module('usercode%d' % id(self)) | |
exec code in self._module.__dict__ | |
self._robot = self._module.__dict__['Robot']() | |
def set_player_id(self, player_id): | |
self._player_id = player_id | |
def reload(self): | |
self._robot = self._module.__dict__['Robot']() | |
def _get_action(self, game_state, game_info, robot): | |
try: | |
self._robot.location = robot.location | |
self._robot.hp = robot.hp | |
self._robot.player_id = robot.player_id | |
self._robot.robot_id = robot.robot_id | |
action = self._robot.act(game_info) | |
if not game_state.is_valid_action(robot.location, action): | |
raise Exception( | |
'Bot {0}: {1} is not a valid action from {2}'.format( | |
robot.robot_id + 1, action, robot.location) | |
) | |
except Exception: | |
traceback.print_exc(file=sys.stdout) | |
action = ['guard'] | |
return action | |
# returns map (loc) -> (action) for all bots of this player | |
# 'fixes' invalid actions | |
def get_actions(self, game_state): | |
game_info = game_state.get_game_info(self._player_id) | |
actions = {} | |
for loc, robot in game_state.robots.iteritems(): | |
if robot.player_id == self._player_id: | |
actions[loc] = self._get_action(game_state, game_info, robot) | |
return actions | |
class Match: | |
def __init__(self, settings, player1, player2, record_history=True): | |
self._settings = settings | |
self._player1 = player1 | |
self._player1.set_player_id(0) | |
self._player2 = player2 | |
self._player2.set_player_id(1) | |
self._record_history = record_history | |
if self._record_history: | |
self.history = [] | |
def run(self): | |
self.state = GameState(self._settings, use_start=True) | |
while self.state.turn < self._settings.max_turns: | |
self.run_turn() | |
if self._record_history: | |
self._save_to_history() | |
self.scores = self.state.get_scores() | |
def _save_to_history(self, actions=None): | |
data = AttrDict() | |
data.state = self.state | |
data.actions = actions | |
self.history.append(data) | |
def run_turn(self): | |
actions = self._player1.get_actions(self.state) | |
actions2 = self._player2.get_actions(self.state) | |
actions.update(actions2) | |
if self._record_history: | |
self._save_to_history(actions) | |
self.state = self.state.apply_actions(actions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment