Created
October 10, 2021 15:52
-
-
Save csiki/dd2c990aa2cbdf5533d0daba6fb04b8b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
import random | |
import time | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from copy import deepcopy | |
from controller.controller import Controller # hardware control code | |
from vizdoom import DoomGame, Button, GameVariable, ScreenFormat, ScreenResolution, AutomapMode | |
def init_corridor(cfg, wad_path): | |
game = DoomGame() | |
game.set_doom_scenario_path(wad_path) | |
game.set_doom_map("map01") | |
game.set_screen_resolution(cfg['res']) | |
game.set_screen_format(ScreenFormat.RGB24) | |
game.set_render_hud(cfg['render_hud']) | |
game.add_game_args("+fullscreen 1 +viz_nocheat 0") | |
game.set_render_crosshair(False) | |
game.set_render_weapon(True) | |
game.set_render_decals(True) | |
game.set_render_particles(True) | |
game.set_automap_buffer_enabled(cfg['automap']) | |
game.set_automap_rotate(False) | |
game.set_automap_render_textures(False) | |
game.set_automap_mode(AutomapMode.OBJECTS_WITH_SIZE) | |
buttons = cfg['buttons'] | |
for b in buttons: | |
game.add_available_button(b) | |
vars_needed = [GameVariable.AMMO3, GameVariable.DAMAGE_TAKEN, GameVariable.HEALTH, | |
GameVariable.POSITION_X, GameVariable.POSITION_Y, GameVariable.ANGLE, | |
GameVariable.VELOCITY_X, GameVariable.VELOCITY_Y, | |
GameVariable.USER1, GameVariable.USER2, GameVariable.FRAGCOUNT, GameVariable.HITCOUNT, | |
GameVariable.ATTACK_READY] | |
for v in vars_needed: | |
game.add_available_game_variable(v) | |
game.set_episode_timeout(cfg['ep_timeout']) | |
game.set_episode_start_time(10) | |
game.set_window_visible(cfg['win_visible']) | |
game.set_living_reward(cfg['living_r']) | |
game.set_console_enabled(True) | |
game.init() | |
# after-init settings | |
game.set_console_enabled(True) | |
if cfg['automap']: | |
game.send_game_command('am_showmonsters true') | |
game.send_game_command('am_colorset 2') | |
# extra command settings | |
game.send_game_command('sv_cheats 1') | |
actions = [] | |
for i in range(len(buttons)): | |
a = [0] * len(buttons) | |
a[i] = 1 | |
actions.append(a) | |
return game, actions | |
if __name__ == '__main__': | |
cfg = { | |
'res': ScreenResolution.RES_1920X1080, | |
'win_visible': True, | |
'render_hud': False, | |
'automap': False, | |
'buttons': [Button.MOVE_FORWARD_BACKWARD_DELTA, Button.TURN_LEFT_RIGHT_DELTA, | |
Button.MOVE_LEFT_RIGHT_DELTA, Button.ATTACK], # action set | |
'ep_timeout': 1000 * 45, # in Doom ticks, episode timeout | |
'skiprate': 4, # frames to skip after each action | |
'living_r': -.1 / 4. / 2., # reward for just being | |
} | |
map_id = 0 | |
# CTRL options | |
f_train_t = 2000 # forward roll time | |
f_train_pwm = 220 # forward roll motor pwm | |
t_between_train = 22 # time between rolling the ball to train forward movement | |
checkpoint_reward = 255 | |
idle_reward = 8 # amount of water just for standing on the ball | |
idle_reward_interval = 4 | |
mov_reward_scaler = 0.1 | |
t_between_shots = 3 | |
# turn: increases to left, decreases to right (counterclockwise) | |
move_2d = False | |
forward_angle = 90 # direction in game to run towards | |
t_between_straighten = 5 | |
# shoot - different levels have the monster at different y positions | |
stop_2_shoot_y = [np.inf, 1250, 1400, 1550, 1670, 1800, 1930, 2060, 2350] | |
t_between_raise = 6 | |
raise_t = 1500 | |
# ctrl init | |
ctrl = Controller() | |
sleep_time = 0.01 | |
dfbs, dtlrs, dmlrs = [], [], [] | |
speed_win_t = 0.05 # in sec, window of time to calculate speed in | |
mov_scaler = 0.0008 # scale motion data to in-game movement speed | |
turn_scaler = 0.0002 | |
# game init | |
map_path = 'doom/scenarios/corridor_straight.wad' | |
if map_id > 0: # load maps with an imp spawned at different positions depending on map_id | |
map_path = f'doom/scenarios/corridor_imp_{map_id}.wad' | |
print(f'init {map_path}') | |
game, actions = init_corridor(cfg, map_path) | |
episodes = 1000 | |
# each episode is a replay of the corridor map | |
for i in range(episodes): | |
print(f'episode #{i + 1}') | |
game.new_episode() | |
game_vars = game.get_state().game_variables | |
prev_ammo, prev_dmg_taken, prev_hp = game_vars[0], game_vars[1], game_vars[2] | |
# further controller init | |
last_forward = time.time() | |
last_straighten = time.time() | |
last_reward = time.time() | |
last_shot = time.time() | |
last_raised = time.time() | |
prev_step_t = 0.01 # time between game iterations | |
checkpoints_y = np.arange(1300, 2500, 150) # corridor positions as checkpoints | |
checkpoint_rewards = np.zeros_like(checkpoints_y) + checkpoint_reward | |
checkpoint_i = 0 # last checkpoint reached | |
prev_fragz = 0 | |
# run game | |
while not game.is_episode_finished(): | |
step_start = time.time() | |
# retrieve game state | |
state = game.get_state() | |
n = state.number | |
vars = state.game_variables | |
screen_buf = state.screen_buffer | |
depth_buf = state.depth_buffer | |
labels_buf = state.labels_buffer | |
automap_buf = state.automap_buffer | |
labels = state.labels | |
if n == 1: # settings when the game just started | |
game.send_game_command('fov 120') | |
game.send_game_command('vid_setmode 3440 1440') | |
# calculate speed of ball movement == rat's movement | |
t = time.time() | |
speed_fb_x, speed_fb_y = ctrl.rel_mov1.get_avg_speed_since(t - max(prev_step_t, speed_win_t)) | |
speed_lr_x, speed_lr_y = ctrl.rel_mov2.get_avg_speed_since(t - max(prev_step_t, speed_win_t)) | |
speed_fb = speed_fb_y * mov_scaler | |
speed_tlr_estimates = [speed_fb_x, speed_lr_x] | |
speed_tlr = speed_tlr_estimates[np.argmax(np.abs(speed_tlr_estimates))] * turn_scaler | |
speed_mlr = speed_lr_y * mov_scaler | |
# ignore motion sensor jiggle | |
speed_fb = 0 if -.8 < speed_fb < .8 else speed_fb | |
speed_mlr = 0 if -.8 < speed_mlr < .8 else speed_mlr | |
speed_tlr = 0 if -.05 < speed_tlr < .05 else speed_tlr | |
# get game variables | |
ammo, dmg_taken, hp = vars[0], vars[1], vars[2] | |
xpos, ypos, angle = vars[3:6] | |
fragz = vars[-2] # its the hitcount really | |
speed_tlr = speed_tlr if move_2d else 0 # turn off left-right turn if not allowed | |
speed_mlr = 0 | |
# shot detection | |
shoot = 0 | |
btn_state = ctrl.get_btn_state() | |
if btn_state: | |
shoot = 1 | |
last_shot = time.time() | |
shoot = 0 if map_id == 0 else shoot # no shooting on the map without monsters | |
# take action | |
action = [speed_fb, speed_tlr, speed_mlr, shoot] | |
r = game.make_action(action, cfg['skiprate']) | |
prev_ammo, prev_dmg_taken, prev_hp = ammo, dmg_taken, hp | |
# train forward roll: | |
# extend lin. actuator, roll motor forward, contract lin. actuator, dispense reward | |
if abs(90 - angle) < 30 and time.time() - last_forward > t_between_train \ | |
and (fragz > 0 or (ypos < stop_2_shoot_y[map_id] and fragz == 0)): # designed for 1 kill only | |
ctrl.train_forward_mov(f_train_pwm, f_train_t) | |
last_forward = time.time() | |
# straighten: if the rat is not looking towards the exit, | |
# rotate the ball in the right direction | |
if abs(forward_angle - angle) < 20: | |
last_straighten = time.time() | |
if time.time() - last_straighten > t_between_straighten: | |
if 90 < angle < 270: | |
dangle = 90 - angle | |
else: | |
dangle = abs(90 - angle) | |
ctrl.turn(dangle) | |
last_straighten = time.time() | |
# train to shoot | |
# raise push-pull solenoid if close to monster and haven't shot this round | |
if fragz == 0 and ypos > stop_2_shoot_y[map_id] and time.time() - last_raised > t_between_raise: | |
ctrl.pull_sol(raise_t) | |
last_raised = time.time() | |
# give reward for passing checkpoints | |
if checkpoint_i < len(checkpoints_y) and ypos >= checkpoints_y[checkpoint_i]: | |
ctrl.dispense(checkpoint_rewards[checkpoint_i]) | |
last_forward = time.time() | |
last_reward = time.time() | |
checkpoint_i += 1 | |
# give reward for killing | |
if fragz > prev_fragz: | |
prev_fragz = fragz | |
for _ in range(4): | |
ctrl.dispense(255) # whole lotta water | |
# give idle reward for staying on the ball | |
if time.time() - last_reward > idle_reward_interval: | |
ctrl.dispense(idle_reward) | |
last_reward = time.time() | |
if sleep_time > 0: | |
time.sleep(sleep_time) | |
prev_step_t = time.time() - step_start | |
print('episode finished') | |
ctrl.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment