Skip to content

Instantly share code, notes, and snippets.

@ntasfi
Created April 11, 2018 16:38
Show Gist options
  • Save ntasfi/0b1952cd1e4ae8740c467fda661f281d to your computer and use it in GitHub Desktop.
Save ntasfi/0b1952cd1e4ae8740c467fda661f281d to your computer and use it in GitHub Desktop.
gym-building with DQN Agent
import gym
import gym_building
import random
import numpy as np
from collections import namedtuple
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import init
import torch.nn.functional as F
from torch.autograd import Variable
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor
def make_env():
from gym_building.envs.climate import Climate, DummyClimateModel
from gym_building.envs.rooms import Room
from gym_building.envs.blackbox import LaptopComputer
from gym_building.envs.action_consumer import FluorescentLight
from gym_building.envs.heaters import SmallHVAC
env = gym.make("building-v0")
env.setTimestep(60.0) # time in seconds
env.setMaxEpisodeTime(60.0*60.0*3.0) # time in seconds per "episode", here 3 hours
# create a climate and have this act on our building.
env.addClimate(
Climate("Utopia", DummyClimateModel())
)
# create a room that is 10ft wide, 5 ft high, and 10ft deep. It will be be 15.0C initially
# and will be called MainRoom
env.addRoom(name="MainRoom", lengths={"W": 10.0, "H": 5.0, "Z": 10.0}, init_temperature=15.0)
# There will be a laptop plugged into the MainRoom
# The agent has no control over this entity and the laptop will
# transition between being plugged and unplugged with random probability.
env.addBlackbox(
LaptopComputer(),
name="Macbook",
room="MainRoom",
)
# There will be a light in the MainRoom that is on by default.
light = FluorescentLight(default_action=1) # default to on
# We will let our agent controller this light (turning it on and off)
env.addActionConsumer(
light,
name="Light0",
room="MainRoom",
)
# There will be a small HVAC system that heats only the MainRoom.
hvac = SmallHVAC(default_action=0)
# We will let our agent controller the output level of this HVAC.
# It has 4 ouput levels: off, 33%, 66%, and 100%.
env.addHeater(
hvac,
name="HVAC0",
outputRooms=["MainRoom"],
)
# Finally we define a custom reward. We want to keep the MainRoom temperature
# at 15C and minimize the amount of power used by the entire building.
def rewardFn(env, state):
tempDiff = -1.0*abs(15.0 - env.rooms["MainRoom"].temperature)
totalPwr = -1.0*state["total_power"]
return totalPwr + tempDiff
# Set the reward function
env.setRewardFn(rewardFn)
return env
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class DQN(nn.Module):
def __init__(self, n_actions=4, *args, **kwargs):
super(DQN, self).__init__(*args, **kwargs)
self.fc1 = nn.Linear(15, 60)
self.fc2 = nn.Linear(60, 30)
self.fc3 = nn.Linear(30, n_actions)
self.initParams()
def initParams(self):
for p in self.parameters():
if p.dim() == 1:
init.constant(p.data, 0.0)
if p.dim() > 1:
init.kaiming_normal(p.data, 0.1)
def forward(self, x):
z = F.relu(self.fc1(x))
z = F.relu(self.fc2(z))
z = self.fc3(z)
return z
if __name__ == "__main__":
NUM_STEPS = int(4e6)
FREEZE_INTERVAL = int(10e3)
REPLAY_MEMORY_SIZE = int(1e6)
BATCH_SIZE = 64
UPDATE_INTERVAL = 4
EPS = 1.0
EPS_STEPS = 1e6
EPS_MIN = 0.1
EPS_DECAY = (EPS-EPS_MIN)/EPS_STEPS
GAMMA = 0.99
rm = ReplayMemory(REPLAY_MEMORY_SIZE)
env = make_env()
model = DQN(n_actions=env.nActions()).cuda()
model_target = DQN(n_actions=env.nActions()).cuda()
model_target.load_state_dict(model.state_dict())
opt = optim.RMSprop(model.parameters(), lr=1e-5)
def select_action(state):
sample = random.random()
if sample > EPS:
with torch.no_grad():
return model.forward(Variable(torch.from_numpy(state)).type(FloatTensor)).data.max(1)[1].view(1,1)
else:
return LongTensor([[random.randrange(env.nActions())]])
def optimize_model():
if len(rm) < BATCH_SIZE:
print "not enough data"
return
transitions = rm.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
non_final_mask = ByteTensor(tuple(map(lambda s: s is not None, batch.next_state)))
ml = [torch.from_numpy(s) for s in batch.next_state if s is not None]
state_batch = Variable(torch.cat(batch.state)).cuda()
action_batch = Variable(torch.cat(batch.action)).cuda()
reward_batch = Variable(torch.cat(batch.reward)).cuda()
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
state_action_values = model(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
next_state_values = Variable(torch.zeros(BATCH_SIZE).type(Tensor))
with torch.no_grad():
non_final_next_states = Variable(torch.cat(ml)).cuda()
next_state_values[non_final_mask] = model_target(non_final_next_states).max(1)[0]
next_state_values = next_state_values.unsqueeze(1)
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Undo volatility (which was used to prevent unnecessary gradients)
expected_state_action_values = Variable(expected_state_action_values.data)
# Compute Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
# Optimize the model
opt.zero_grad()
loss.backward()
for param in model.parameters():
param.grad.data.clamp_(-1, 1)
opt.step()
state = env.reset()
episode_rewards = []
for step in range(NUM_STEPS):
action = select_action(state)
action_idx = np.argmax(action.data.cpu())
state_next, reward, done, _ = env.step(action_idx)
if done:
state_next = None
rm.push(torch.from_numpy(state), action, state_next, torch.ones(1,1)*reward)
episode_rewards.append(reward)
state = state_next
if done:
print "n_steps={}; ep_reward={}; avg_ep_reward={}; eps={}; rm.n={}; elapsedEpisodeTime={}".format(
step, np.sum(episode_rewards), np.sum(episode_rewards)/float(len(episode_rewards)), EPS, len(rm), env.elapsedEpisodeTime)
episode_rewards = []
state = env.reset()
if (step+1)%UPDATE_INTERVAL == 0:
optimize_model()
EPS = max(EPS-EPS_DECAY, EPS_MIN)
if (step+1)%FREEZE_INTERVAL == 0:
model_target.load_state_dict(model.state_dict())
Copy link

ghost commented Nov 8, 2018

Hi ntasfi,
very interesting project.
Where can i find the gym_building package?
Luca

@skyttuuz
Copy link

hello,Where is the gym_building package?could you send me thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment