Place the agent in your mmlf/agents
directory.
Place the configuration file in ~/.mmlf/config
.
Run with: ./run_mmlf --config world_simple_q.yaml
.
"""MMLF agent that implements Q-Learning. """ | |
__author__ = "Adriano Di Luzio & Danilo Francati" | |
__copyright__ = "Copyright 2015, Sapienza University of Rome, CS" | |
__credits__ = ['Mark Edgington'] | |
__license__ = "GPLv3" | |
__version__ = "1.0" | |
__maintainer__ = "Adriano Di Luzio" | |
__email__ = "adrianodl@hotmail.it" | |
import random | |
import pprint | |
import mmlf.framework.protocol | |
from mmlf.agents.agent_base import AgentBase | |
# Each agent has to inherit directly or indirectly from AgentBase | |
class QAgent(AgentBase): | |
"""Agent that chooses the next action in order to maximize the expected reward.""" | |
# Add default configuration for this agent to this static dict | |
# This specific parameter controls after how many steps we send information | |
# regarding the accumulated reward to the logger. | |
DEFAULT_CONFIG_DICT = { | |
'epsilon': 0.5, # Probability of action | |
'gamma': 0.9, # Reward distance | |
} | |
def __init__(self, *args, **kwargs): | |
# Create the agent info | |
self.agentInfo = \ | |
mmlf.framework.protocol.AgentInfo( | |
# Which communication protocol | |
# version can the agent handle? | |
versionNumber="0.3", | |
# Name of the agent (can be chosen arbitrarily) | |
agentName="Q", | |
# Can the agent be used in | |
# environments with continuous | |
# state spaces? | |
continuousState=False, | |
# Can the agent be used in | |
# environments with continuous | |
# action spaces? | |
continuousAction=False, | |
# Can the agent be used in | |
# environments with discrete | |
# action spaces? | |
discreteAction=True, | |
# Can the agent be used in | |
# non-episodic environments | |
nonEpisodicCapable=False | |
) | |
# Calls constructor of base class | |
# After this call, the agent has an attribute "self.configDict", | |
# The values of this dict are evaluated, i.e. instead of '100' (string), | |
# the key 'Reward log frequency' will have the same value 100 (int). | |
super(QAgent, self).__init__(*args, **kwargs) | |
# The superclass AgentBase implements the methods setStateSpace() and | |
# setActionSpace() which set the attributes stateSpace and actionSpace | |
# They can be overwritten if the agent has to modify these spaces | |
# for some reason | |
self.stateSpace = None | |
self.actionSpace = None | |
# The exploration rate of the agent | |
self.gamma = self.configDict.get('gamma', 0.9) | |
self.epsilon = self.configDict.get('epsilon', 0.5) | |
# The Q matrix | |
self.Q = None | |
# ##################### BEGIN COMMAND-HANDLING METHODS ################### | |
def setStateSpace(self, stateSpace): | |
""" Informs the agent about the state space of the environment | |
More information about state spaces can be found in | |
:ref:`state_and_action_spaces` | |
""" | |
super(QAgent, self).setStateSpace(stateSpace) | |
# Get a list of all the possible spaces | |
self.states = self.stateSpace.getStateList() | |
def setActionSpace(self, actionSpace): | |
"""Informs the agent about the action space of the environment | |
More information about action spaces can be found in | |
:ref:`state_and_action_spaces` | |
""" | |
super(QAgent, self).setActionSpace(actionSpace) | |
# We can only deal with one-dimensional action spaces | |
assert self.actionSpace.getNumberOfDimensions() == 1, \ | |
"Action space must be one-dimensional" | |
# Get a list of all actions this agent might take | |
self.actions = self.actionSpace.getActionList() | |
# Init the Q matrix | |
self._initQ() | |
def getAction(self): | |
"""Request the next action the agent want to execute.""" | |
self.previousAction = self.lastAction | |
self.previousState = self.lastState | |
# Each action of the agent corresponds to one step | |
self.action = self._chooseRandomAction() | |
# Create an action dictionary | |
# that maps action dimension to chosen action | |
actionDictionary = dict() | |
for index, actionName in enumerate(self.actionSpace.iterkeys()): | |
actionDictionary[actionName] = self.action[index] | |
# Call super class method since this updates some internal information | |
# (self.lastState, self.lastAction, self.reward, self.state, self.action) | |
super(QAgent, self).getAction() | |
return self._generateActionObject(actionDictionary) | |
def giveReward(self, reward): | |
"""Provides a reward to the agent """ | |
self._updateQ(reward) | |
def nextEpisodeStarted(self): | |
"""Informs the agent that a new episode has started.""" | |
# We delegate to the superclass, which does the following: | |
# self.episodeCounter += 1 | |
# self.stepCounter = 0 | |
super(QAgent, self).nextEpisodeStarted() | |
self.agentLog.info("Q matrix: \n%s", pprint.pformat(self.Q)) | |
# ####################### END COMMAND-HANDLING METHODS ################### | |
def _initQ(self): | |
"""Initialize the Q matrix | |
Build a dictionary whose keys are the possible spaces. | |
Each value is again a dictionary, whose keys are the possible actions. | |
Finally, the values are the Q values. | |
""" | |
# If the stateSpace has been set, create the spaces in Q | |
if self.states: | |
self.Q = {s: None for s in self.states} | |
# If the actionSpace has been set, create the actions | |
# for each state in already Q | |
if self.actions: | |
for s in self.Q: | |
self.Q[s] = {a: 0 for a in self.actions} | |
self.agentLog.info("New Q matrix: \n%s", pprint.pformat(self.Q)) | |
def _chooseRandomAction(self): | |
"Chooses an action randomly from the action space" | |
assert self.actionSpace, \ | |
"Error: Action requested before actionSpace was specified" | |
return random.choice(self.actions) | |
def _updateQ(self, reward): | |
"Update the Q value after an action from a state." | |
# self.agentLog.info("%s -> %s", self.previousState, self.previousAction) | |
# self.agentLog.info("%s -> %s", self.lastState, self.lastAction) | |
if self.previousState is not None and \ | |
self.previousAction is not None: | |
# Update the Q value | |
self.Q[self.previousState][self.previousAction] = \ | |
(1 - self.epsilon) * self.Q[self.previousState][self.previousAction] + \ | |
self.epsilon * \ | |
(reward + self.gamma * (max(self.Q[self.lastState].values()))) | |
# Each module that implements an agent must have a module-level attribute | |
# "AgentClass" that is set to the class that inherits from Agentbase | |
AgentClass = QAgent | |
# Furthermore, the name of the agent has to be assigned to "AgentName". This | |
# name is used in the GUI. | |
AgentName = "Q" |
worldPackage : maze2d | |
environment: | |
moduleName : "maze2d_environment" | |
configDict: | |
episodesUntilDoorChange : 10000 | |
MAZE : "maze_simple.cfg" | |
agent: | |
moduleName : "academic_agent" | |
configDict: | |
gamma : 0.9 | |
epsilon : 0.5 | |
monitor: | |
policyLogFrequency : 1000 |