Skip to content

Instantly share code, notes, and snippets.

@rpicatoste
Created March 20, 2018 17:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rpicatoste/dc68e89c69c169fb254d6621e9ab6438 to your computer and use it in GitHub Desktop.
Save rpicatoste/dc68e89c69c169fb254d6621e9ab6438 to your computer and use it in GitHub Desktop.
Example of the quadcopter flying
# %%
import csv
import numpy as np
from task import Task
from agents.agent import DDPG
from plot_functions import plot_results
from collections import defaultdict
import copy
import matplotlib.pyplot as plt
from agents.ou_noise import OUNoise
## Modify the values below to give the quadcopter a different starting position.
file_output = 'data.txt' # file name for saved results
plt.close('all')
# Run task with agent
def run_test_episode(agent: DDPG, task: Task, file_output):
print('\nRunning test episode ...')
labels = ['time', 'x', 'y', 'z', 'phi', 'theta', 'psi', 'x_velocity',
'y_velocity', 'z_velocity', 'phi_velocity', 'theta_velocity',
'psi_velocity', 'rotor_speed1', 'rotor_speed2', 'rotor_speed3', 'rotor_speed4', 'reward']
results = {x: [] for x in labels}
aux_noise = copy.copy(agent.noise)
agent.noise = OUNoise(agent.action_size, 0.0, 0.0, 0.0)
state = agent.reset_episode() # start a new episode
rewards_lists = defaultdict(list)
print('state', state)
print('state.shape', state.shape)
# Run the simulation, and save the results.
with open(file_output, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(labels)
while True:
rotor_speeds = agent.act(state)
rotor_speeds = [405] * 4
rotor_speeds = [640, 605, 605, 605]
next_state, reward, done, new_rewards = task.step(rotor_speeds)
for key, value in new_rewards.items():
rewards_lists[key].append(value)
to_write = [task.sim.time] + list(task.sim.pose) + list(task.sim.v) + list(task.sim.angular_v) + list(
rotor_speeds) + [reward]
for ii in range(len(labels)):
results[labels[ii]].append(to_write[ii])
writer.writerow(to_write)
state = next_state
if done:
break
# Restore noise
agent.noise = copy.copy(aux_noise)
print('Finished test episode!\n')
return results, rewards_lists
# %% Parameters
exploration_mu = 405
exploration_theta = 0.15 * 3
exploration_sigma = 0.2 * 5
buffer_size = 100000
batch_size = 64
gamma = 0.99
tau = 0.01 # 0.001
actor_learning_rate = 0.0001 * 0.10
critic_learning_rate = 0.001 * 0.10
num_episodes = 35 # 1000
# %% Training with agen
print('\n\nStart training...')
num_episodes_to_plot = max(100, num_episodes / 5)
target_pos = np.array([0.0, 0.0, 10.0])
init_pose = np.array([0.0, 0.0, 5.0, 0.0, 0.0, 0.0])
init_velocities = np.array([0.0, 0.0, 0.0])
task = Task(init_pose=init_pose,
init_velocities=init_velocities,
target_pos=target_pos)
agent = DDPG(task,
exploration_mu=exploration_mu,
exploration_theta=exploration_theta,
exploration_sigma=exploration_sigma,
buffer_size=buffer_size,
batch_size=batch_size,
gamma=gamma,
tau=tau,
actor_learning_rate=actor_learning_rate,
critic_learning_rate=critic_learning_rate
)
results, rewards_lists = run_test_episode(agent, task, file_output)
plot_results(results, target_pos, 'Run without training', rewards_lists)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment