Created
January 29, 2017 13:49
-
-
Save g00dgame/dc3fa93caddcd75af31245277ab423c5 to your computer and use it in GitHub Desktop.
Skiing-v0 Heuristic AI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.7 | |
import gym | |
import os | |
import numpy as np | |
import random | |
from gym import wrappers | |
from scipy import ndimage | |
from collections import deque | |
GAME = "Skiing" | |
roi_h = 180 | |
roi_w = 144 | |
roi_y = 64 | |
roi_x = 8 | |
skier_roi_h = 22 | |
skier_border_y = skier_roi_h/2.0 | |
flags_y_threshold = 45.0 | |
history_length = 10 | |
skier_color = [214, 92, 92] | |
flag1_color = [66, 72, 200] | |
flag2_color = [184, 50, 50] | |
def mask_by_color(image, color): | |
return np.bitwise_and( np.bitwise_and(image[:, :, 0] == color[0], image[:, :, 1] == color[1] ), image[:, :, 2] == color[2]) | |
def detect_objects(observation): | |
observation = observation[roi_y:roi_y + roi_h, roi_x:roi_x + roi_w] | |
skier_area = observation[0:skier_roi_h, :] | |
skier_mask = mask_by_color(skier_area, skier_color) | |
skier_y, skier_x = ndimage.measurements.center_of_mass(skier_mask) | |
flags_mask = np.bitwise_or( mask_by_color(observation, flag1_color), mask_by_color(observation, flag2_color) ) | |
flags_cc, flags_count = ndimage.label(flags_mask) | |
flags_coordinates = ndimage.measurements.center_of_mass(flags_mask, flags_cc, range(1, flags_count+1)) | |
targets = [] | |
for i, flag1_coordinate in enumerate(flags_coordinates): | |
for j, flag2_coordinate in enumerate(flags_coordinates): | |
if i < j and abs(flag1_coordinate[0] - flag2_coordinate[0]) < 10 and flag1_coordinate[0] > flags_y_threshold and flag2_coordinate[0] > flags_y_threshold: | |
targets.append( [(flag1_coordinate[0] + flag2_coordinate[0])/2.0, (flag1_coordinate[1] + flag2_coordinate[1])/2.0] ) | |
return observation, skier_x, targets | |
def compute_speed(positions, time_step_track): | |
avg_speed = 0.0 | |
sum_time_step = time_step_track[0] | |
for i in xrange(1, len(positions)): | |
avg_speed += np.float64(positions[0] - positions[i])/sum_time_step | |
sum_time_step += time_step_track[i] | |
avg_speed /= (len(positions)-1) | |
return avg_speed*5.0 | |
def compute_flags_yspeed(tracked_targets, time_step_track): | |
avg_speed = 0.0 | |
if len(tracked_targets) == 0: | |
return None | |
track_counts = map(lambda v: len(v), tracked_targets) | |
index = np.argmax(track_counts) | |
if len(tracked_targets[index]) < 2: | |
return 0.0 | |
positions = map(lambda coord: coord[0], tracked_targets[index]) | |
return compute_speed(positions, time_step_track) | |
def get_nearest_target(tracked_targets): | |
track_y = map(lambda v: v[0][0], tracked_targets) | |
return tracked_targets[np.argmin(track_y)][0] | |
def main(): | |
env = gym.make('Skiing-v0') | |
monitor_dir = 'Skiing-v0-monitor' | |
if not os.path.exists(monitor_dir): | |
os.makedirs(monitor_dir) | |
env = wrappers.Monitor(env, monitor_dir) | |
nearest_target = [0, 0] | |
mean_reward = 0.0 | |
for session_t in xrange(1000000): | |
observation = env.reset() | |
roi, skier_x, targets = detect_objects(observation) | |
skier_x_track = deque() | |
time_step_track = deque() | |
tracked_targets = [] | |
for target in targets: | |
tracked_targets.append([target]) | |
for i in xrange(history_length): | |
skier_x_track.appendleft(skier_x) | |
time_step_track.appendleft(3.0) | |
sum_reward = 0.0 | |
done = False | |
idle_start_time = 0 | |
skier_x = 0.0 | |
skier_xspeed = 0.0 | |
nearest_target = [0.0, 0.0] | |
flags_yspeed = 0.0 | |
use_heuristic_strategy = True | |
for t in range(5000): | |
action_index = 0 | |
if(use_heuristic_strategy): | |
if(t%4 == 1): | |
if flags_yspeed > 0.0: | |
time_to_target = (nearest_target[0] - skier_border_y)/flags_yspeed | |
prediction_x = time_to_target*skier_xspeed + skier_x | |
aim = nearest_target[1] | |
if( prediction_x > aim + 2 ): | |
action_index = 2 | |
elif( prediction_x < aim - 2 ): | |
action_index = 1 | |
else: | |
action_index = 0 | |
else: | |
action_index = random.randrange(1, 3) | |
else: | |
action_index = 0 | |
env.render() | |
observation, reward, done, info = env.step(action_index) | |
sum_reward += reward | |
roi, skier_x, targets = detect_objects(observation) | |
new_tracked_targets = [] | |
for target in targets: | |
is_append_to_exist_target = False | |
for tracked_target in tracked_targets: | |
if(abs(target[1] - tracked_target[0][1]) < 5): | |
if len(tracked_target) < history_length: | |
new_tracked_targets.append([target] + tracked_target) | |
else: | |
new_tracked_targets.append([target] + tracked_target[0:-1]) | |
is_append_to_exist_target = True | |
break | |
if not is_append_to_exist_target: | |
new_tracked_targets.append([target]) | |
tracked_targets = new_tracked_targets | |
skier_x_track.pop() | |
skier_x_track.appendleft(skier_x) | |
if(abs(reward) > 0.1): | |
time_step_track.pop() | |
time_step_track.appendleft(-reward) | |
if done: | |
break | |
if(len(tracked_targets) > 0): | |
nearest_target = get_nearest_target(tracked_targets) | |
flags_yspeed = compute_flags_yspeed(tracked_targets, time_step_track) | |
flags_yspeed = abs(flags_yspeed) | |
skier_xspeed = compute_speed(skier_x_track, time_step_track) | |
mean_reward += sum_reward | |
print("Race Complete, reward: {}, mean: {}".format(sum_reward, mean_reward/(session_t+1))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment