Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import torch
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from controller.controller import Controller # hardware control code
from vizdoom import DoomGame, Button, GameVariable, ScreenFormat, ScreenResolution, AutomapMode
def init_corridor(cfg, wad_path):
game = DoomGame()
game.set_doom_scenario_path(wad_path)
game.set_doom_map("map01")
game.set_screen_resolution(cfg['res'])
game.set_screen_format(ScreenFormat.RGB24)
game.set_render_hud(cfg['render_hud'])
game.add_game_args("+fullscreen 1 +viz_nocheat 0")
game.set_render_crosshair(False)
game.set_render_weapon(True)
game.set_render_decals(True)
game.set_render_particles(True)
game.set_automap_buffer_enabled(cfg['automap'])
game.set_automap_rotate(False)
game.set_automap_render_textures(False)
game.set_automap_mode(AutomapMode.OBJECTS_WITH_SIZE)
buttons = cfg['buttons']
for b in buttons:
game.add_available_button(b)
vars_needed = [GameVariable.AMMO3, GameVariable.DAMAGE_TAKEN, GameVariable.HEALTH,
GameVariable.POSITION_X, GameVariable.POSITION_Y, GameVariable.ANGLE,
GameVariable.VELOCITY_X, GameVariable.VELOCITY_Y,
GameVariable.USER1, GameVariable.USER2, GameVariable.FRAGCOUNT, GameVariable.HITCOUNT,
GameVariable.ATTACK_READY]
for v in vars_needed:
game.add_available_game_variable(v)
game.set_episode_timeout(cfg['ep_timeout'])
game.set_episode_start_time(10)
game.set_window_visible(cfg['win_visible'])
game.set_living_reward(cfg['living_r'])
game.set_console_enabled(True)
game.init()
# after-init settings
game.set_console_enabled(True)
if cfg['automap']:
game.send_game_command('am_showmonsters true')
game.send_game_command('am_colorset 2')
# extra command settings
game.send_game_command('sv_cheats 1')
actions = []
for i in range(len(buttons)):
a = [0] * len(buttons)
a[i] = 1
actions.append(a)
return game, actions
if __name__ == '__main__':
cfg = {
'res': ScreenResolution.RES_1920X1080,
'win_visible': True,
'render_hud': False,
'automap': False,
'buttons': [Button.MOVE_FORWARD_BACKWARD_DELTA, Button.TURN_LEFT_RIGHT_DELTA,
Button.MOVE_LEFT_RIGHT_DELTA, Button.ATTACK], # action set
'ep_timeout': 1000 * 45, # in Doom ticks, episode timeout
'skiprate': 4, # frames to skip after each action
'living_r': -.1 / 4. / 2., # reward for just being
}
map_id = 0
# CTRL options
f_train_t = 2000 # forward roll time
f_train_pwm = 220 # forward roll motor pwm
t_between_train = 22 # time between rolling the ball to train forward movement
checkpoint_reward = 255
idle_reward = 8 # amount of water just for standing on the ball
idle_reward_interval = 4
mov_reward_scaler = 0.1
t_between_shots = 3
# turn: increases to left, decreases to right (counterclockwise)
move_2d = False
forward_angle = 90 # direction in game to run towards
t_between_straighten = 5
# shoot - different levels have the monster at different y positions
stop_2_shoot_y = [np.inf, 1250, 1400, 1550, 1670, 1800, 1930, 2060, 2350]
t_between_raise = 6
raise_t = 1500
# ctrl init
ctrl = Controller()
sleep_time = 0.01
dfbs, dtlrs, dmlrs = [], [], []
speed_win_t = 0.05 # in sec, window of time to calculate speed in
mov_scaler = 0.0008 # scale motion data to in-game movement speed
turn_scaler = 0.0002
# game init
map_path = 'doom/scenarios/corridor_straight.wad'
if map_id > 0: # load maps with an imp spawned at different positions depending on map_id
map_path = f'doom/scenarios/corridor_imp_{map_id}.wad'
print(f'init {map_path}')
game, actions = init_corridor(cfg, map_path)
episodes = 1000
# each episode is a replay of the corridor map
for i in range(episodes):
print(f'episode #{i + 1}')
game.new_episode()
game_vars = game.get_state().game_variables
prev_ammo, prev_dmg_taken, prev_hp = game_vars[0], game_vars[1], game_vars[2]
# further controller init
last_forward = time.time()
last_straighten = time.time()
last_reward = time.time()
last_shot = time.time()
last_raised = time.time()
prev_step_t = 0.01 # time between game iterations
checkpoints_y = np.arange(1300, 2500, 150) # corridor positions as checkpoints
checkpoint_rewards = np.zeros_like(checkpoints_y) + checkpoint_reward
checkpoint_i = 0 # last checkpoint reached
prev_fragz = 0
# run game
while not game.is_episode_finished():
step_start = time.time()
# retrieve game state
state = game.get_state()
n = state.number
vars = state.game_variables
screen_buf = state.screen_buffer
depth_buf = state.depth_buffer
labels_buf = state.labels_buffer
automap_buf = state.automap_buffer
labels = state.labels
if n == 1: # settings when the game just started
game.send_game_command('fov 120')
game.send_game_command('vid_setmode 3440 1440')
# calculate speed of ball movement == rat's movement
t = time.time()
speed_fb_x, speed_fb_y = ctrl.rel_mov1.get_avg_speed_since(t - max(prev_step_t, speed_win_t))
speed_lr_x, speed_lr_y = ctrl.rel_mov2.get_avg_speed_since(t - max(prev_step_t, speed_win_t))
speed_fb = speed_fb_y * mov_scaler
speed_tlr_estimates = [speed_fb_x, speed_lr_x]
speed_tlr = speed_tlr_estimates[np.argmax(np.abs(speed_tlr_estimates))] * turn_scaler
speed_mlr = speed_lr_y * mov_scaler
# ignore motion sensor jiggle
speed_fb = 0 if -.8 < speed_fb < .8 else speed_fb
speed_mlr = 0 if -.8 < speed_mlr < .8 else speed_mlr
speed_tlr = 0 if -.05 < speed_tlr < .05 else speed_tlr
# get game variables
ammo, dmg_taken, hp = vars[0], vars[1], vars[2]
xpos, ypos, angle = vars[3:6]
fragz = vars[-2] # its the hitcount really
speed_tlr = speed_tlr if move_2d else 0 # turn off left-right turn if not allowed
speed_mlr = 0
# shot detection
shoot = 0
btn_state = ctrl.get_btn_state()
if btn_state:
shoot = 1
last_shot = time.time()
shoot = 0 if map_id == 0 else shoot # no shooting on the map without monsters
# take action
action = [speed_fb, speed_tlr, speed_mlr, shoot]
r = game.make_action(action, cfg['skiprate'])
prev_ammo, prev_dmg_taken, prev_hp = ammo, dmg_taken, hp
# train forward roll:
# extend lin. actuator, roll motor forward, contract lin. actuator, dispense reward
if abs(90 - angle) < 30 and time.time() - last_forward > t_between_train \
and (fragz > 0 or (ypos < stop_2_shoot_y[map_id] and fragz == 0)): # designed for 1 kill only
ctrl.train_forward_mov(f_train_pwm, f_train_t)
last_forward = time.time()
# straighten: if the rat is not looking towards the exit,
# rotate the ball in the right direction
if abs(forward_angle - angle) < 20:
last_straighten = time.time()
if time.time() - last_straighten > t_between_straighten:
if 90 < angle < 270:
dangle = 90 - angle
else:
dangle = abs(90 - angle)
ctrl.turn(dangle)
last_straighten = time.time()
# train to shoot
# raise push-pull solenoid if close to monster and haven't shot this round
if fragz == 0 and ypos > stop_2_shoot_y[map_id] and time.time() - last_raised > t_between_raise:
ctrl.pull_sol(raise_t)
last_raised = time.time()
# give reward for passing checkpoints
if checkpoint_i < len(checkpoints_y) and ypos >= checkpoints_y[checkpoint_i]:
ctrl.dispense(checkpoint_rewards[checkpoint_i])
last_forward = time.time()
last_reward = time.time()
checkpoint_i += 1
# give reward for killing
if fragz > prev_fragz:
prev_fragz = fragz
for _ in range(4):
ctrl.dispense(255) # whole lotta water
# give idle reward for staying on the ball
if time.time() - last_reward > idle_reward_interval:
ctrl.dispense(idle_reward)
last_reward = time.time()
if sleep_time > 0:
time.sleep(sleep_time)
prev_step_t = time.time() - step_start
print('episode finished')
ctrl.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment