Created
April 11, 2018 16:38
-
-
Save ntasfi/0b1952cd1e4ae8740c467fda661f281d to your computer and use it in GitHub Desktop.
gym-building with DQN Agent
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import gym_building | |
import random | |
import numpy as np | |
from collections import namedtuple | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.nn import init | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
use_cuda = torch.cuda.is_available() | |
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor | |
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor | |
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor | |
Tensor = FloatTensor | |
def make_env(): | |
from gym_building.envs.climate import Climate, DummyClimateModel | |
from gym_building.envs.rooms import Room | |
from gym_building.envs.blackbox import LaptopComputer | |
from gym_building.envs.action_consumer import FluorescentLight | |
from gym_building.envs.heaters import SmallHVAC | |
env = gym.make("building-v0") | |
env.setTimestep(60.0) # time in seconds | |
env.setMaxEpisodeTime(60.0*60.0*3.0) # time in seconds per "episode", here 3 hours | |
# create a climate and have this act on our building. | |
env.addClimate( | |
Climate("Utopia", DummyClimateModel()) | |
) | |
# create a room that is 10ft wide, 5 ft high, and 10ft deep. It will be be 15.0C initially | |
# and will be called MainRoom | |
env.addRoom(name="MainRoom", lengths={"W": 10.0, "H": 5.0, "Z": 10.0}, init_temperature=15.0) | |
# There will be a laptop plugged into the MainRoom | |
# The agent has no control over this entity and the laptop will | |
# transition between being plugged and unplugged with random probability. | |
env.addBlackbox( | |
LaptopComputer(), | |
name="Macbook", | |
room="MainRoom", | |
) | |
# There will be a light in the MainRoom that is on by default. | |
light = FluorescentLight(default_action=1) # default to on | |
# We will let our agent controller this light (turning it on and off) | |
env.addActionConsumer( | |
light, | |
name="Light0", | |
room="MainRoom", | |
) | |
# There will be a small HVAC system that heats only the MainRoom. | |
hvac = SmallHVAC(default_action=0) | |
# We will let our agent controller the output level of this HVAC. | |
# It has 4 ouput levels: off, 33%, 66%, and 100%. | |
env.addHeater( | |
hvac, | |
name="HVAC0", | |
outputRooms=["MainRoom"], | |
) | |
# Finally we define a custom reward. We want to keep the MainRoom temperature | |
# at 15C and minimize the amount of power used by the entire building. | |
def rewardFn(env, state): | |
tempDiff = -1.0*abs(15.0 - env.rooms["MainRoom"].temperature) | |
totalPwr = -1.0*state["total_power"] | |
return totalPwr + tempDiff | |
# Set the reward function | |
env.setRewardFn(rewardFn) | |
return env | |
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) | |
class ReplayMemory(object): | |
def __init__(self, capacity): | |
self.capacity = capacity | |
self.memory = [] | |
self.position = 0 | |
def push(self, *args): | |
"""Saves a transition.""" | |
if len(self.memory) < self.capacity: | |
self.memory.append(None) | |
self.memory[self.position] = Transition(*args) | |
self.position = (self.position + 1) % self.capacity | |
def sample(self, batch_size): | |
return random.sample(self.memory, batch_size) | |
def __len__(self): | |
return len(self.memory) | |
class DQN(nn.Module): | |
def __init__(self, n_actions=4, *args, **kwargs): | |
super(DQN, self).__init__(*args, **kwargs) | |
self.fc1 = nn.Linear(15, 60) | |
self.fc2 = nn.Linear(60, 30) | |
self.fc3 = nn.Linear(30, n_actions) | |
self.initParams() | |
def initParams(self): | |
for p in self.parameters(): | |
if p.dim() == 1: | |
init.constant(p.data, 0.0) | |
if p.dim() > 1: | |
init.kaiming_normal(p.data, 0.1) | |
def forward(self, x): | |
z = F.relu(self.fc1(x)) | |
z = F.relu(self.fc2(z)) | |
z = self.fc3(z) | |
return z | |
if __name__ == "__main__": | |
NUM_STEPS = int(4e6) | |
FREEZE_INTERVAL = int(10e3) | |
REPLAY_MEMORY_SIZE = int(1e6) | |
BATCH_SIZE = 64 | |
UPDATE_INTERVAL = 4 | |
EPS = 1.0 | |
EPS_STEPS = 1e6 | |
EPS_MIN = 0.1 | |
EPS_DECAY = (EPS-EPS_MIN)/EPS_STEPS | |
GAMMA = 0.99 | |
rm = ReplayMemory(REPLAY_MEMORY_SIZE) | |
env = make_env() | |
model = DQN(n_actions=env.nActions()).cuda() | |
model_target = DQN(n_actions=env.nActions()).cuda() | |
model_target.load_state_dict(model.state_dict()) | |
opt = optim.RMSprop(model.parameters(), lr=1e-5) | |
def select_action(state): | |
sample = random.random() | |
if sample > EPS: | |
with torch.no_grad(): | |
return model.forward(Variable(torch.from_numpy(state)).type(FloatTensor)).data.max(1)[1].view(1,1) | |
else: | |
return LongTensor([[random.randrange(env.nActions())]]) | |
def optimize_model(): | |
if len(rm) < BATCH_SIZE: | |
print "not enough data" | |
return | |
transitions = rm.sample(BATCH_SIZE) | |
batch = Transition(*zip(*transitions)) | |
# Compute a mask of non-final states and concatenate the batch elements | |
non_final_mask = ByteTensor(tuple(map(lambda s: s is not None, batch.next_state))) | |
ml = [torch.from_numpy(s) for s in batch.next_state if s is not None] | |
state_batch = Variable(torch.cat(batch.state)).cuda() | |
action_batch = Variable(torch.cat(batch.action)).cuda() | |
reward_batch = Variable(torch.cat(batch.reward)).cuda() | |
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the | |
state_action_values = model(state_batch).gather(1, action_batch) | |
# Compute V(s_{t+1}) for all next states. | |
next_state_values = Variable(torch.zeros(BATCH_SIZE).type(Tensor)) | |
with torch.no_grad(): | |
non_final_next_states = Variable(torch.cat(ml)).cuda() | |
next_state_values[non_final_mask] = model_target(non_final_next_states).max(1)[0] | |
next_state_values = next_state_values.unsqueeze(1) | |
# Compute the expected Q values | |
expected_state_action_values = (next_state_values * GAMMA) + reward_batch | |
# Undo volatility (which was used to prevent unnecessary gradients) | |
expected_state_action_values = Variable(expected_state_action_values.data) | |
# Compute Huber loss | |
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values) | |
# Optimize the model | |
opt.zero_grad() | |
loss.backward() | |
for param in model.parameters(): | |
param.grad.data.clamp_(-1, 1) | |
opt.step() | |
state = env.reset() | |
episode_rewards = [] | |
for step in range(NUM_STEPS): | |
action = select_action(state) | |
action_idx = np.argmax(action.data.cpu()) | |
state_next, reward, done, _ = env.step(action_idx) | |
if done: | |
state_next = None | |
rm.push(torch.from_numpy(state), action, state_next, torch.ones(1,1)*reward) | |
episode_rewards.append(reward) | |
state = state_next | |
if done: | |
print "n_steps={}; ep_reward={}; avg_ep_reward={}; eps={}; rm.n={}; elapsedEpisodeTime={}".format( | |
step, np.sum(episode_rewards), np.sum(episode_rewards)/float(len(episode_rewards)), EPS, len(rm), env.elapsedEpisodeTime) | |
episode_rewards = [] | |
state = env.reset() | |
if (step+1)%UPDATE_INTERVAL == 0: | |
optimize_model() | |
EPS = max(EPS-EPS_DECAY, EPS_MIN) | |
if (step+1)%FREEZE_INTERVAL == 0: | |
model_target.load_state_dict(model.state_dict()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hello,Where is the gym_building package?could you send me thanks