Created
May 11, 2024 01:50
-
-
Save eabase/bc8257605acec1536d16ea01ef3781da to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
#---------------------------------------------------------------------- | |
# lunar-lander-3.py - A demo of AI learning for landing physics on the moon, using pygame. | |
#---------------------------------------------------------- | |
import os, sys | |
import random, tempfile | |
import time | |
import numpy as np | |
import winsound | |
import tensorflow as tf | |
from tensorflow import keras | |
from tensorflow.keras import layers | |
from tensorflow.keras.layers import Dense | |
from tensorflow.keras.optimizers import Adam | |
from collections import deque | |
import gymnasium as gym | |
from gymnasium import wrappers | |
#---------------------------------------------------------- | |
# Global Constants | |
#---------------------------------------------------------- | |
HL = '-'*60 | |
WEIGHT_FILE = './weights/ckpt' # Saved Checkpoint files | |
CHECKPOINT_DIR = './weights' # Saved Weights file | |
TRAINING = False # | |
#TRAINING = True # | |
WIN_SIZE = (1280,720) # Set window Size to (1280 x 720) | |
DEBUG = False # | |
#---------------------------------------------------------- | |
# Set Log Level: [DEBUG, INFO, WARN, ERROR, FATAL] | |
#---------------------------------------------------------- | |
# 0 | INFO | [Default] Print all messages | |
# 1 | WARNING | Filter out INFO messages | |
# 2 | ERROR | Filter out INFO & WARNING messages | |
# 3 | NONE | Filter out all messages | |
#---------------------------------------------------------- | |
#tf.get_logger().setLevel('INFO') # ok (Default: INFO) | |
#tf.get_logger().setLevel('ERROR') # ok (Default: INFO) | |
# Get OS environmental variables for TF: | |
LOGL = os.getenv('TF_CPP_MIN_LOG_LEVEL') # Set 1 to Disable CPU instruction warnings | |
ODNN = os.getenv('TF_ENABLE_ONEDNN_OPTS') # Default: 1 | |
#---------------------------------------------------------- | |
# The DISCOUNT_FACTOR | |
# Here are the values of this constant in order to achieve a proper | |
# balance of exploitation versus exploration at 5,000 episodes: | |
# | |
# 0.99910 - 99.99% exploitation + 0.01% exploration | |
# 0.99941 - 99.95% exploitation + 0.05% exploration | |
# 0.99954 - 99.90% exploitation + 0.10% exploration | |
# 0.99973 - 99.75% exploitation + 0.25% exploration | |
# 0.99987 - 99.50% exploitation + 0.50% exploration | |
#---------------------------------------------------------- | |
LEARNING_RATE = [0.01, 0.001, 0.0001] # | |
DISCOUNT_FACTOR = [0.9, 0.99, 0.999] # | |
EPSILON_DECAY = [0.99910, 0.99941, 0.99954, 0.99973, 0.99987] # | |
LEARNING_EPISODES = 10 # 5000 # <for testing only> | |
TESTING_EPISODES = 5 # 100 # <for testing only> | |
REPLAY_BUFFER_SIZE = 250000 # | |
REPLAY_BUFFER_BATCH_SIZE = 32 # | |
MINIMUM_REWARD = -250 # | |
STATE_SIZE = 8 # | |
NUMBER_OF_ACTIONS = 4 # | |
#---------------------------------------------------------- | |
# Helper Functions | |
#---------------------------------------------------------- | |
def usage(): | |
myName = os.path.basename(__file__) | |
print('\n Usage: python.exe -qu -X utf8=1 ./{}\n'.format(myName)) | |
def ansi_vt_hack(): | |
print(' Enabling ANSI colors (issue) by hack.') | |
os.system('') | |
def color(text, color_code): | |
# Text Coloring | |
# Usage: print(yellow("This is yellow")) | |
#if self.nposix: | |
#if not is_posix(): | |
# return text | |
# for brighter colors, use "1;" in front of "color_code" | |
bright = '' # '1;' | |
return '\x1b[%s%sm%s\x1b[0m' % (bright, color_code, text) | |
def red(text): return color(text, 31) # # noqa | |
def gre(text): return color(text, 32) # '1;49;32' # noqa | |
def bgr(text): return color(text, '1;49;32') # bright green # noqa | |
def ora(text): return color(text, '0;49;91') # 31 - looks bad! # noqa | |
def yel(text): return color(text, 33) # # noqa | |
def bye(text): return color(text, '1;49;33') # bright yellow # noqa | |
def blu(text): return color(text, '1;49;34') # bright blue # noqa | |
def pur(text): return color(text, 35) # aka. magenta # noqa | |
def cya(text): return color(text, '0;49;96') # 36 # noqa | |
def whi(text): return color(text, '0;49;97') # bright white # noqa | |
#---------------------------------------------------------- | |
# Class - AI Training Agent | |
#---------------------------------------------------------- | |
class Agent: | |
def __init__(self, training, learning_rate, discount_factor, epsilon_decay): | |
self.training = training | |
self.learning_rate = learning_rate | |
self.discount_factor = discount_factor | |
self.epsilon_decay = epsilon_decay | |
self.epsilon = 1.0 if self.training else 0.0 | |
self.replay_buffer = deque(maxlen=REPLAY_BUFFER_SIZE) | |
self.model = self._create_network() # model-1 | |
self.target_model = self.model # model-2: This was a Duplicate! self._create_network() | |
self.optimizer = Adam(learning_rate=self.learning_rate) | |
if not training: | |
self._load_weights() | |
def choose_action(self, s): | |
if isinstance(s, tuple): s = s[0] # Extract the NumPy array from the tuple if 's' is a tuple | |
s = np.array(s, dtype=np.float32).reshape(1, STATE_SIZE) # Ensure that 's' is a NumPy array with the correct shape | |
if not self.training or np.random.rand() > self.epsilon: | |
return np.argmax(self.model.predict(s)[0]) | |
return np.random.choice(NUMBER_OF_ACTIONS) | |
def store(self, s, a, r, s_, is_terminal): | |
if self.training: | |
self.replay_buffer.append((np.reshape(s, [1, STATE_SIZE]), a, r, np.reshape(s_, [1, STATE_SIZE]), is_terminal)) | |
def optimize(self, s, a, r, s_, is_terminal): | |
if self.training and len(self.replay_buffer) > REPLAY_BUFFER_BATCH_SIZE: | |
batch = np.array(random.sample(list(self.replay_buffer), REPLAY_BUFFER_BATCH_SIZE)) | |
s = np.vstack(batch[:, 0]) # | |
a = np.array(batch[:, 1], dtype=int) # | |
r = np.array(batch[:, 2], dtype=float) # | |
s_ = np.vstack(batch[:, 3]) # | |
nts = np.where(batch[:, 4] == False) # Non Terminal States (ex. 'non_terminal_states') | |
if len(nts[0]) > 0: | |
a_ = np.argmax(self.model.predict(s_)[nts, :][0], axis=1) | |
r[nts] += np.multiply(self.discount_factor, self.target_model.predict(s_)[nts, a_][0]) | |
with tf.GradientTape() as tape: | |
y = self.model(s, training=True) | |
y = tf.Variable(y) | |
y[range(REPLAY_BUFFER_BATCH_SIZE), a].assign(r) | |
loss = tf.reduce_mean(tf.square(y - self.model(s))) | |
grads = tape.gradient(loss, self.model.trainable_variables) | |
self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) | |
def close(self): | |
if self.training: | |
self._save_weights() | |
def update(self): | |
# | |
if self.training: | |
self.target_model.set_weights(self.model.get_weights()) | |
if self.epsilon > 0.01: | |
self.epsilon *= self.epsilon_decay | |
def _load_weights(self): | |
print(' [INFO] Loading agent weights from checkpoint directory: {}'.format(WEIGHT_FILE)) | |
checkpoint = tf.train.Checkpoint(model=self.model) | |
latest_checkpoint = tf.train.latest_checkpoint(CHECKPOINT_DIR) | |
if latest_checkpoint: | |
checkpoint.restore(latest_checkpoint) | |
print(" [INFO] Weights loaded successfully from:", latest_checkpoint) | |
else: | |
print(" [INFO] No weights found in directory, starting fresh.") | |
def _save_weights(self): | |
print(" [INFO] Saving agent weights to disk in file: ".format(WEIGHT_FILE)) | |
self.model.save_weights(WEIGHT_FILE, save_format='tf') | |
def _create_network(self): | |
# Defining the network layers... | |
model = tf.keras.Sequential() | |
model.add(keras.Input(shape=(STATE_SIZE,))) # 8 | |
model.add(layers.Dense(32, activation="relu")) # Layer-1 | |
model.add(layers.Dense(32, activation="relu")) # Layer-2 | |
model.add(layers.Dense(32, activation="relu")) # Layer-3 | |
model.add(layers.Dense(NUMBER_OF_ACTIONS, activation=None)) # 4 Network | |
if (DEBUG): | |
# This is not working well... | |
print('\nPrinting Layer Weights:\n') | |
for layer in model.layers: | |
print(layer.weights) | |
print('\nPrinting Model Weights:\n') | |
print(model.weights) | |
print('\nPrinting Model Summary:\n') | |
model.summary() | |
print() | |
return model | |
''' | |
# in Box2D | |
class LunarLander(): | |
def __init__(self): | |
self.verbose = 0 | |
VIEWPORT_W = 500 # 1280 # 600 | |
VIEWPORT_H = 500 # 720 # 400 | |
''' | |
# @keras_export("keras.utils.Progbar") | |
''' | |
class Progbar: | |
# target: Total number of steps expected, None if unknown. | |
# width: Progress bar width on screen. | |
# verbose: Verbosity mode : 0 (silent), 1 (verbose), 2 (semi-verbose) | |
# interval: Minimum visual progress update interval (in seconds). | |
# stateful_metrics: | |
# Iterable of string names of metrics that should *not* be averaged over time. | |
# Metrics in this list will be displayed as-is. All others will be averaged by the progbar before display. | |
# unit_name: Display name for step counts (usually "step" or "sample"). | |
def __init__(self, target, width=20, verbose=1, interval=0.05, stateful_metrics=None, unit_name="step"): | |
super(self) | |
''' | |
#---------------------------------------------------------- | |
# MAIN | |
#---------------------------------------------------------- | |
def main(): | |
ansi_vt_hack() | |
WIDTH = 600 # 1024 # 500 # 600 | |
HEIGHT = 500 # 768 # 500 # 400 | |
WIN_SIZE = (WIDTH, HEIGHT) | |
VIEWPORT_W, VIEWPORT_H = WIN_SIZE | |
SCALE = 30 # 30 | |
#------------------------------------------------------------ | |
print('\n{}\nStarting Lunar Lander\n{}'.format(HL,HL)) | |
print(f' TensorFlow Version : {tf.__version__}') | |
print(f' TF Physical Devices : {tf.config.list_physical_devices()[0][0]}') # '/physical_device:CPU:0' | |
print(f' TF Build Info : ') # | |
print(f' is_cuda_build : {tf.sysconfig.get_build_info()['is_cuda_build']}') # False | |
print(f' is_rocm_build : {tf.sysconfig.get_build_info()['is_rocm_build']}') # False | |
print(f' is_tensorrt_build : {tf.sysconfig.get_build_info()['is_tensorrt_build']}') # False | |
print(f' TF MSVCP DLL Names : {tf.sysconfig.get_build_info()['msvcp_dll_names']}') # 'msvcp140.dll,msvcp140_1.dll' | |
print(f' TF Compile Flags : {tf.sysconfig.get_compile_flags()}') # | |
print(f'Setting Window Size to : {WIN_SIZE}') | |
print('\nUsing Environment variable(s):') | |
#if ODNN is None or len(ODNN) == 0: | |
#if not ODNN: | |
# ODNN = yel('n/a') | |
# ODNN = yel(ODNN) | |
print(' {}={}\n'.format(bye('TF_ENABLE_ONEDNN_OPTS'), ODNN)) | |
#------------------------------------------------------------ | |
# Setup pygame stuff | |
#------------------------------------------------------------ | |
import pygame | |
from pygame import display | |
display.init() # | |
display.set_mode(WIN_SIZE, display=1) # WIN_SIZE | |
display.set_caption("Lunar Lander 3") # | |
print('\npygame.display.Info: {}\n'.format(display.Info())) | |
expected_steps = 100 # None | |
bar = tf.keras.utils.Progbar(target=expected_steps, width=20, verbose=0) # Set the progress bar | |
tf.keras.utils.disable_interactive_logging() # Disable the logging function! | |
#------------------------------------------------------------ | |
np.set_printoptions(precision=2) # WTF? | |
env = gym.make('LunarLander-v2', render_mode='human') # Gym: Make - 'LunarLanderContinuous-v2' has continous engines on | |
average_reward = deque(maxlen=100) | |
agent = Agent(TRAINING, LEARNING_RATE[2], DISCOUNT_FACTOR[1], EPSILON_DECAY[1]) | |
print('\n Alpha: {:.4f}\n Gamma: {:.3f}\n Epsilon: {:.5f}\n'.format(agent.learning_rate, agent.discount_factor, agent.epsilon_decay)) | |
if TRAINING: | |
MAX_EPS = LEARNING_EPISODES | |
else: | |
MAX_EPS = TESTING_EPISODES | |
for episode in range(MAX_EPS): | |
current_reward = 0 | |
s = env.reset() # Gym: reset | |
#display.set_mode(WIN_SIZE) # Reset WIN_SIZE from default (600 x 400) in box2d/lunar_lander.py | |
step = 0 # | |
MAXS = 1000 # Max number of Steps | |
for t in range(MAXS): | |
if not TRAINING: | |
env.render() # Gym: Render | |
a = agent.choose_action(s) # | |
step_results = env.step(a) # Gym: Step | |
s_, r, is_terminal, info = step_results[:4] # The environment step: (observation, reward, terminated, truncated, info) | |
agent.store(s, a, r, s_, is_terminal) # | |
agent.optimize(s, a, r, s_, is_terminal) # | |
s = s_ # not used? ? | |
step += 1 | |
print(' N:{: 4d}/{}'.format(step, MAXS), end='\x0d') # Progress info, using: \0x0d = \r = [CR] | |
bar.update(step) # Progbar update (if enabled). | |
current_reward += r | |
if is_terminal or current_reward < MINIMUM_REWARD: | |
break | |
agent.update() | |
average_reward.append(current_reward) | |
episode_results = (step, MAXS, episode, current_reward, np.average(average_reward), agent.epsilon) | |
print(' N:{: 4d}/{} E:{:04d}, cR:{:.2f}, aR:{:.2f}, e:{:.2f}'.format(*episode_results)) | |
env.close() # Close Gym | |
agent.close() # Close Agent (Class) | |
print(yel('\nok\n')) | |
#---------------------------------------------------------- | |
# Main Thread | |
#---------------------------------------------------------- | |
if __name__ == "__main__": | |
# Catch a keyboard interruption (E.g. CTRL-C) | |
# Signals: 2=SIGINT, 3=SIGQUIT (from 'kill -l') | |
try: | |
main() | |
except KeyboardInterrupt as e: | |
winsound.Beep(500, 500) # Beep(frequency, duration) | |
print('\n\n[{}] Caught Keyboard Interrupt'.format(red('SIGINT')), flush=True) | |
print('[{}] Interrupted by User: {}\n'.format(yel('INFO'), cya('Exiting'))) | |
sys.exit(3) | |
#---------------------------------------------------------- | |
# END | |
#---------------------------------------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment