Skip to content

Instantly share code, notes, and snippets.

@g00dgame
Created January 29, 2017 13:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save g00dgame/dc3fa93caddcd75af31245277ab423c5 to your computer and use it in GitHub Desktop.
Save g00dgame/dc3fa93caddcd75af31245277ab423c5 to your computer and use it in GitHub Desktop.
Skiing-v0 Heuristic AI
#!/usr/bin/python2.7
import gym
import os
import numpy as np
import random
from gym import wrappers
from scipy import ndimage
from collections import deque
GAME = "Skiing"
roi_h = 180
roi_w = 144
roi_y = 64
roi_x = 8
skier_roi_h = 22
skier_border_y = skier_roi_h/2.0
flags_y_threshold = 45.0
history_length = 10
skier_color = [214, 92, 92]
flag1_color = [66, 72, 200]
flag2_color = [184, 50, 50]
def mask_by_color(image, color):
return np.bitwise_and( np.bitwise_and(image[:, :, 0] == color[0], image[:, :, 1] == color[1] ), image[:, :, 2] == color[2])
def detect_objects(observation):
observation = observation[roi_y:roi_y + roi_h, roi_x:roi_x + roi_w]
skier_area = observation[0:skier_roi_h, :]
skier_mask = mask_by_color(skier_area, skier_color)
skier_y, skier_x = ndimage.measurements.center_of_mass(skier_mask)
flags_mask = np.bitwise_or( mask_by_color(observation, flag1_color), mask_by_color(observation, flag2_color) )
flags_cc, flags_count = ndimage.label(flags_mask)
flags_coordinates = ndimage.measurements.center_of_mass(flags_mask, flags_cc, range(1, flags_count+1))
targets = []
for i, flag1_coordinate in enumerate(flags_coordinates):
for j, flag2_coordinate in enumerate(flags_coordinates):
if i < j and abs(flag1_coordinate[0] - flag2_coordinate[0]) < 10 and flag1_coordinate[0] > flags_y_threshold and flag2_coordinate[0] > flags_y_threshold:
targets.append( [(flag1_coordinate[0] + flag2_coordinate[0])/2.0, (flag1_coordinate[1] + flag2_coordinate[1])/2.0] )
return observation, skier_x, targets
def compute_speed(positions, time_step_track):
avg_speed = 0.0
sum_time_step = time_step_track[0]
for i in xrange(1, len(positions)):
avg_speed += np.float64(positions[0] - positions[i])/sum_time_step
sum_time_step += time_step_track[i]
avg_speed /= (len(positions)-1)
return avg_speed*5.0
def compute_flags_yspeed(tracked_targets, time_step_track):
avg_speed = 0.0
if len(tracked_targets) == 0:
return None
track_counts = map(lambda v: len(v), tracked_targets)
index = np.argmax(track_counts)
if len(tracked_targets[index]) < 2:
return 0.0
positions = map(lambda coord: coord[0], tracked_targets[index])
return compute_speed(positions, time_step_track)
def get_nearest_target(tracked_targets):
track_y = map(lambda v: v[0][0], tracked_targets)
return tracked_targets[np.argmin(track_y)][0]
def main():
env = gym.make('Skiing-v0')
monitor_dir = 'Skiing-v0-monitor'
if not os.path.exists(monitor_dir):
os.makedirs(monitor_dir)
env = wrappers.Monitor(env, monitor_dir)
nearest_target = [0, 0]
mean_reward = 0.0
for session_t in xrange(1000000):
observation = env.reset()
roi, skier_x, targets = detect_objects(observation)
skier_x_track = deque()
time_step_track = deque()
tracked_targets = []
for target in targets:
tracked_targets.append([target])
for i in xrange(history_length):
skier_x_track.appendleft(skier_x)
time_step_track.appendleft(3.0)
sum_reward = 0.0
done = False
idle_start_time = 0
skier_x = 0.0
skier_xspeed = 0.0
nearest_target = [0.0, 0.0]
flags_yspeed = 0.0
use_heuristic_strategy = True
for t in range(5000):
action_index = 0
if(use_heuristic_strategy):
if(t%4 == 1):
if flags_yspeed > 0.0:
time_to_target = (nearest_target[0] - skier_border_y)/flags_yspeed
prediction_x = time_to_target*skier_xspeed + skier_x
aim = nearest_target[1]
if( prediction_x > aim + 2 ):
action_index = 2
elif( prediction_x < aim - 2 ):
action_index = 1
else:
action_index = 0
else:
action_index = random.randrange(1, 3)
else:
action_index = 0
env.render()
observation, reward, done, info = env.step(action_index)
sum_reward += reward
roi, skier_x, targets = detect_objects(observation)
new_tracked_targets = []
for target in targets:
is_append_to_exist_target = False
for tracked_target in tracked_targets:
if(abs(target[1] - tracked_target[0][1]) < 5):
if len(tracked_target) < history_length:
new_tracked_targets.append([target] + tracked_target)
else:
new_tracked_targets.append([target] + tracked_target[0:-1])
is_append_to_exist_target = True
break
if not is_append_to_exist_target:
new_tracked_targets.append([target])
tracked_targets = new_tracked_targets
skier_x_track.pop()
skier_x_track.appendleft(skier_x)
if(abs(reward) > 0.1):
time_step_track.pop()
time_step_track.appendleft(-reward)
if done:
break
if(len(tracked_targets) > 0):
nearest_target = get_nearest_target(tracked_targets)
flags_yspeed = compute_flags_yspeed(tracked_targets, time_step_track)
flags_yspeed = abs(flags_yspeed)
skier_xspeed = compute_speed(skier_x_track, time_step_track)
mean_reward += sum_reward
print("Race Complete, reward: {}, mean: {}".format(sum_reward, mean_reward/(session_t+1)))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment