Skip to content

Instantly share code, notes, and snippets.

@eabase
Created May 11, 2024 01:50
Show Gist options
  • Save eabase/bc8257605acec1536d16ea01ef3781da to your computer and use it in GitHub Desktop.
Save eabase/bc8257605acec1536d16ea01ef3781da to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
# lunar-lander-3.py - A demo of AI learning for landing physics on the moon, using pygame.
#----------------------------------------------------------
import os, sys
import random, tempfile
import time
import numpy as np
import winsound
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from collections import deque
import gymnasium as gym
from gymnasium import wrappers
#----------------------------------------------------------
# Global Constants
#----------------------------------------------------------
HL = '-'*60
WEIGHT_FILE = './weights/ckpt' # Saved Checkpoint files
CHECKPOINT_DIR = './weights' # Saved Weights file
TRAINING = False #
#TRAINING = True #
WIN_SIZE = (1280,720) # Set window Size to (1280 x 720)
DEBUG = False #
#----------------------------------------------------------
# Set Log Level: [DEBUG, INFO, WARN, ERROR, FATAL]
#----------------------------------------------------------
# 0 | INFO | [Default] Print all messages
# 1 | WARNING | Filter out INFO messages
# 2 | ERROR | Filter out INFO & WARNING messages
# 3 | NONE | Filter out all messages
#----------------------------------------------------------
#tf.get_logger().setLevel('INFO') # ok (Default: INFO)
#tf.get_logger().setLevel('ERROR') # ok (Default: INFO)
# Get OS environmental variables for TF:
LOGL = os.getenv('TF_CPP_MIN_LOG_LEVEL') # Set 1 to Disable CPU instruction warnings
ODNN = os.getenv('TF_ENABLE_ONEDNN_OPTS') # Default: 1
#----------------------------------------------------------
# The DISCOUNT_FACTOR
# Here are the values of this constant in order to achieve a proper
# balance of exploitation versus exploration at 5,000 episodes:
#
# 0.99910 - 99.99% exploitation + 0.01% exploration
# 0.99941 - 99.95% exploitation + 0.05% exploration
# 0.99954 - 99.90% exploitation + 0.10% exploration
# 0.99973 - 99.75% exploitation + 0.25% exploration
# 0.99987 - 99.50% exploitation + 0.50% exploration
#----------------------------------------------------------
LEARNING_RATE = [0.01, 0.001, 0.0001] #
DISCOUNT_FACTOR = [0.9, 0.99, 0.999] #
EPSILON_DECAY = [0.99910, 0.99941, 0.99954, 0.99973, 0.99987] #
LEARNING_EPISODES = 10 # 5000 # <for testing only>
TESTING_EPISODES = 5 # 100 # <for testing only>
REPLAY_BUFFER_SIZE = 250000 #
REPLAY_BUFFER_BATCH_SIZE = 32 #
MINIMUM_REWARD = -250 #
STATE_SIZE = 8 #
NUMBER_OF_ACTIONS = 4 #
#----------------------------------------------------------
# Helper Functions
#----------------------------------------------------------
def usage():
myName = os.path.basename(__file__)
print('\n Usage: python.exe -qu -X utf8=1 ./{}\n'.format(myName))
def ansi_vt_hack():
print(' Enabling ANSI colors (issue) by hack.')
os.system('')
def color(text, color_code):
# Text Coloring
# Usage: print(yellow("This is yellow"))
#if self.nposix:
#if not is_posix():
# return text
# for brighter colors, use "1;" in front of "color_code"
bright = '' # '1;'
return '\x1b[%s%sm%s\x1b[0m' % (bright, color_code, text)
def red(text): return color(text, 31) # # noqa
def gre(text): return color(text, 32) # '1;49;32' # noqa
def bgr(text): return color(text, '1;49;32') # bright green # noqa
def ora(text): return color(text, '0;49;91') # 31 - looks bad! # noqa
def yel(text): return color(text, 33) # # noqa
def bye(text): return color(text, '1;49;33') # bright yellow # noqa
def blu(text): return color(text, '1;49;34') # bright blue # noqa
def pur(text): return color(text, 35) # aka. magenta # noqa
def cya(text): return color(text, '0;49;96') # 36 # noqa
def whi(text): return color(text, '0;49;97') # bright white # noqa
#----------------------------------------------------------
# Class - AI Training Agent
#----------------------------------------------------------
class Agent:
def __init__(self, training, learning_rate, discount_factor, epsilon_decay):
self.training = training
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon_decay = epsilon_decay
self.epsilon = 1.0 if self.training else 0.0
self.replay_buffer = deque(maxlen=REPLAY_BUFFER_SIZE)
self.model = self._create_network() # model-1
self.target_model = self.model # model-2: This was a Duplicate! self._create_network()
self.optimizer = Adam(learning_rate=self.learning_rate)
if not training:
self._load_weights()
def choose_action(self, s):
if isinstance(s, tuple): s = s[0] # Extract the NumPy array from the tuple if 's' is a tuple
s = np.array(s, dtype=np.float32).reshape(1, STATE_SIZE) # Ensure that 's' is a NumPy array with the correct shape
if not self.training or np.random.rand() > self.epsilon:
return np.argmax(self.model.predict(s)[0])
return np.random.choice(NUMBER_OF_ACTIONS)
def store(self, s, a, r, s_, is_terminal):
if self.training:
self.replay_buffer.append((np.reshape(s, [1, STATE_SIZE]), a, r, np.reshape(s_, [1, STATE_SIZE]), is_terminal))
def optimize(self, s, a, r, s_, is_terminal):
if self.training and len(self.replay_buffer) > REPLAY_BUFFER_BATCH_SIZE:
batch = np.array(random.sample(list(self.replay_buffer), REPLAY_BUFFER_BATCH_SIZE))
s = np.vstack(batch[:, 0]) #
a = np.array(batch[:, 1], dtype=int) #
r = np.array(batch[:, 2], dtype=float) #
s_ = np.vstack(batch[:, 3]) #
nts = np.where(batch[:, 4] == False) # Non Terminal States (ex. 'non_terminal_states')
if len(nts[0]) > 0:
a_ = np.argmax(self.model.predict(s_)[nts, :][0], axis=1)
r[nts] += np.multiply(self.discount_factor, self.target_model.predict(s_)[nts, a_][0])
with tf.GradientTape() as tape:
y = self.model(s, training=True)
y = tf.Variable(y)
y[range(REPLAY_BUFFER_BATCH_SIZE), a].assign(r)
loss = tf.reduce_mean(tf.square(y - self.model(s)))
grads = tape.gradient(loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
def close(self):
if self.training:
self._save_weights()
def update(self):
#
if self.training:
self.target_model.set_weights(self.model.get_weights())
if self.epsilon > 0.01:
self.epsilon *= self.epsilon_decay
def _load_weights(self):
print(' [INFO] Loading agent weights from checkpoint directory: {}'.format(WEIGHT_FILE))
checkpoint = tf.train.Checkpoint(model=self.model)
latest_checkpoint = tf.train.latest_checkpoint(CHECKPOINT_DIR)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
print(" [INFO] Weights loaded successfully from:", latest_checkpoint)
else:
print(" [INFO] No weights found in directory, starting fresh.")
def _save_weights(self):
print(" [INFO] Saving agent weights to disk in file: ".format(WEIGHT_FILE))
self.model.save_weights(WEIGHT_FILE, save_format='tf')
def _create_network(self):
# Defining the network layers...
model = tf.keras.Sequential()
model.add(keras.Input(shape=(STATE_SIZE,))) # 8
model.add(layers.Dense(32, activation="relu")) # Layer-1
model.add(layers.Dense(32, activation="relu")) # Layer-2
model.add(layers.Dense(32, activation="relu")) # Layer-3
model.add(layers.Dense(NUMBER_OF_ACTIONS, activation=None)) # 4 Network
if (DEBUG):
# This is not working well...
print('\nPrinting Layer Weights:\n')
for layer in model.layers:
print(layer.weights)
print('\nPrinting Model Weights:\n')
print(model.weights)
print('\nPrinting Model Summary:\n')
model.summary()
print()
return model
'''
# in Box2D
class LunarLander():
def __init__(self):
self.verbose = 0
VIEWPORT_W = 500 # 1280 # 600
VIEWPORT_H = 500 # 720 # 400
'''
# @keras_export("keras.utils.Progbar")
'''
class Progbar:
# target: Total number of steps expected, None if unknown.
# width: Progress bar width on screen.
# verbose: Verbosity mode : 0 (silent), 1 (verbose), 2 (semi-verbose)
# interval: Minimum visual progress update interval (in seconds).
# stateful_metrics:
# Iterable of string names of metrics that should *not* be averaged over time.
# Metrics in this list will be displayed as-is. All others will be averaged by the progbar before display.
# unit_name: Display name for step counts (usually "step" or "sample").
def __init__(self, target, width=20, verbose=1, interval=0.05, stateful_metrics=None, unit_name="step"):
super(self)
'''
#----------------------------------------------------------
# MAIN
#----------------------------------------------------------
def main():
ansi_vt_hack()
WIDTH = 600 # 1024 # 500 # 600
HEIGHT = 500 # 768 # 500 # 400
WIN_SIZE = (WIDTH, HEIGHT)
VIEWPORT_W, VIEWPORT_H = WIN_SIZE
SCALE = 30 # 30
#------------------------------------------------------------
print('\n{}\nStarting Lunar Lander\n{}'.format(HL,HL))
print(f' TensorFlow Version : {tf.__version__}')
print(f' TF Physical Devices : {tf.config.list_physical_devices()[0][0]}') # '/physical_device:CPU:0'
print(f' TF Build Info : ') #
print(f' is_cuda_build : {tf.sysconfig.get_build_info()['is_cuda_build']}') # False
print(f' is_rocm_build : {tf.sysconfig.get_build_info()['is_rocm_build']}') # False
print(f' is_tensorrt_build : {tf.sysconfig.get_build_info()['is_tensorrt_build']}') # False
print(f' TF MSVCP DLL Names : {tf.sysconfig.get_build_info()['msvcp_dll_names']}') # 'msvcp140.dll,msvcp140_1.dll'
print(f' TF Compile Flags : {tf.sysconfig.get_compile_flags()}') #
print(f'Setting Window Size to : {WIN_SIZE}')
print('\nUsing Environment variable(s):')
#if ODNN is None or len(ODNN) == 0:
#if not ODNN:
# ODNN = yel('n/a')
# ODNN = yel(ODNN)
print(' {}={}\n'.format(bye('TF_ENABLE_ONEDNN_OPTS'), ODNN))
#------------------------------------------------------------
# Setup pygame stuff
#------------------------------------------------------------
import pygame
from pygame import display
display.init() #
display.set_mode(WIN_SIZE, display=1) # WIN_SIZE
display.set_caption("Lunar Lander 3") #
print('\npygame.display.Info: {}\n'.format(display.Info()))
expected_steps = 100 # None
bar = tf.keras.utils.Progbar(target=expected_steps, width=20, verbose=0) # Set the progress bar
tf.keras.utils.disable_interactive_logging() # Disable the logging function!
#------------------------------------------------------------
np.set_printoptions(precision=2) # WTF?
env = gym.make('LunarLander-v2', render_mode='human') # Gym: Make - 'LunarLanderContinuous-v2' has continous engines on
average_reward = deque(maxlen=100)
agent = Agent(TRAINING, LEARNING_RATE[2], DISCOUNT_FACTOR[1], EPSILON_DECAY[1])
print('\n Alpha: {:.4f}\n Gamma: {:.3f}\n Epsilon: {:.5f}\n'.format(agent.learning_rate, agent.discount_factor, agent.epsilon_decay))
if TRAINING:
MAX_EPS = LEARNING_EPISODES
else:
MAX_EPS = TESTING_EPISODES
for episode in range(MAX_EPS):
current_reward = 0
s = env.reset() # Gym: reset
#display.set_mode(WIN_SIZE) # Reset WIN_SIZE from default (600 x 400) in box2d/lunar_lander.py
step = 0 #
MAXS = 1000 # Max number of Steps
for t in range(MAXS):
if not TRAINING:
env.render() # Gym: Render
a = agent.choose_action(s) #
step_results = env.step(a) # Gym: Step
s_, r, is_terminal, info = step_results[:4] # The environment step: (observation, reward, terminated, truncated, info)
agent.store(s, a, r, s_, is_terminal) #
agent.optimize(s, a, r, s_, is_terminal) #
s = s_ # not used? ?
step += 1
print(' N:{: 4d}/{}'.format(step, MAXS), end='\x0d') # Progress info, using: \0x0d = \r = [CR]
bar.update(step) # Progbar update (if enabled).
current_reward += r
if is_terminal or current_reward < MINIMUM_REWARD:
break
agent.update()
average_reward.append(current_reward)
episode_results = (step, MAXS, episode, current_reward, np.average(average_reward), agent.epsilon)
print(' N:{: 4d}/{} E:{:04d}, cR:{:.2f}, aR:{:.2f}, e:{:.2f}'.format(*episode_results))
env.close() # Close Gym
agent.close() # Close Agent (Class)
print(yel('\nok\n'))
#----------------------------------------------------------
# Main Thread
#----------------------------------------------------------
if __name__ == "__main__":
# Catch a keyboard interruption (E.g. CTRL-C)
# Signals: 2=SIGINT, 3=SIGQUIT (from 'kill -l')
try:
main()
except KeyboardInterrupt as e:
winsound.Beep(500, 500) # Beep(frequency, duration)
print('\n\n[{}] Caught Keyboard Interrupt'.format(red('SIGINT')), flush=True)
print('[{}] Interrupted by User: {}\n'.format(yel('INFO'), cya('Exiting')))
sys.exit(3)
#----------------------------------------------------------
# END
#----------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment