Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
# -*- coding: utf-8 -*-
"""
Created on Wed Jan 25 10:12:34 2017
@author: Adam Green
adam.green@adgefficiency.com
adgefficiency.com
"""
import os
import pandas as pd
import numpy as np
import random
# importing plotly
import plotly as py
import plotly.graph_objs as go
import plotly.tools as tls
tls.set_credentials_file(username="adamg33", api_key="MkcK97tvoMFcmLH4cus0")
# setting directories
base_path = os.path.dirname(os.path.abspath(__file__))
prices_path = os.path.join(base_path, 'prices.csv')
fig1_path = os.path.join(base_path, 'Figure 1.html')
def initStateSpace():
# creating list of all possible states
# state = (charge, settlement period)
states = []
for charge in range(0, 110, 10): # battery charge from 0 to 100 in 10 MWh increments
for SP in range(1, 49): # settlement periods from 1 to 48
states.append((charge, SP))
return states
def initStateActions(states):
# creates our state/action combinations
av = {} # dict with keys = state/actions. value = s/a value (Q)
sa = {} # dict with key = state, value = [actions]
for state in states:
if state[0] == 0: # if empty
av[(state, 1)] = 0.0 # 1 = charge
av[(state, 2)] = 0.0 # 2 = no nothing
sa[state] = [1, 2]
elif state[0] == 100: # if full
av[(state, 0)] = 0.0 # 0 = discharge
av[(state, 2)] = 0.0 # 2 = no nothing
sa[state] = [0, 2]
else:
av[(state, 0)] = 0.0 # 0 = discharge
av[(state, 1)] = 0.0 # 1 = charge
av[(state, 2)] = 0.0 # 2 = no nothing
sa[state] = [0, 1, 2]
return av, sa
def initSAcount(stateActions):
# dictionary to count number of times a state/action encountered during episode
# key = state/action, value = count
counts = {}
for sa in stateActions:
counts[sa] = 0
return counts
def initState(): # creating initial state
SP = 1 # always start at the beginning of the day
initial_charge = 50 # always set at 50 MWh
state = (initial_charge, SP)
return state
def getprices(): # prices are all in £/MWh
prices = [31.19, 31.19, 30.94, 30.94, 30.55, 30.55, 29.98, 29.98, 30.03, 30.03, 30.94, 30.94,
33.0, 33.0, 35.4, 35.4, 37.82, 37.82, 45.72, 45.72, 45.79, 45.79, 79.4, 79.4, 76.78,
76.78, 74.87, 74.87, 35.79, 35.79, 36.95, 36.95, 74.79, 74.79, 74.74, 74.74, 69.82, 69.82,
41.8, 41.8, 41.69, 41.69, 46.07, 46.07, 37.9, 37.9, 43.11, 43.11]
return prices
def updateQtable(av_table, av_count, returns):
# updating our Q (aka action-value) table
# ********
for key in returns:
av_table[key] = av_table[key] + (1 / av_count[key]) * (returns[key] - av_table[key])
return av_table
def qsv(state, av_table):
# returning the Q value for each action for a given state
if state[0] == 0: # if empty
charge = av_table[(state, 1)]
nothing = av_table[(state, 2)]
discharge = min(charge, nothing) - 10000
elif state[0] == 100: # if full
discharge = av_table[(state, 0)]
nothing = av_table[(state, 2)]
charge = min(discharge, nothing) - 10000
else:
discharge = av_table[(state, 0)]
charge = av_table[(state, 1)]
nothing = av_table[(state, 2)]
return np.array([discharge, charge, nothing])
def battery(state, action): # the technical model
# battery can choose to :
# discharge 10 MWh (action = 0)
# charge 10 MWh or (action = 1)
# do nothing (action = 2)
charge = state[0] # our charge level
SP = state[1] # the current settlement period
action = action # our selected action
prices = getprices()
price = prices[SP - 1] # the price in this settlement period
if action == 0: # discharging
new_charge = charge - 10
new_charge = max(0, new_charge) # we can't discharge below 0
charge_delta = charge - new_charge
reward = charge_delta * price
if action == 1: # charging
new_charge = charge + 10
new_charge = min(100, new_charge)
charge_delta = charge - new_charge
reward = charge_delta * price
if action == 2: # nothing
charge_delta = 0
reward = 0
new_charge = charge - charge_delta
new_SP = SP + 1
state = (new_charge, new_SP)
return state, reward, charge_delta
def single_episode(): # running a single episode
state = initState() # get starting state
returns = {} # dict to hold state/actions for this episode
episode_reward = 0
episode_net_charge = 0
# action selection
for SP in range(1, 49):
if (random.random() < epsilon):
action = random.choice(sa_table[state])
else:
act_probs = qsv(state, av_table)
action = np.argmax(act_probs) # select an action
state_action = ((state, action))
returns[state_action] = 0 # adding action value for returns list (default of zero)
av_count[state_action] += 1 # incrementing up our count of s/a
state, reward, net_charge = battery(state, action) # running a half hour through the battery
episode_reward = episode_reward + reward
episode_net_charge = episode_net_charge + net_charge
return returns, av_count, episode_reward, episode_net_charge
# setting up our experiment
epochs = 10000 # number of episodes to run
epsilon = .9
epsilon_min = 0.1
epsilon_decay = 0.9
state_space = initStateSpace()
av_table, sa_table = initStateActions(state_space) # creating the s/a function table
av_count = initSAcount(av_table) # initializing the count of each s/a
rewards, max_reward, net_charge, epsilons = [], [], [], []
print('Running episodes')
for k in range(epochs):
returns, av_count, episode_reward, episode_net_charge = single_episode()
for key in returns: # setting the returns all equal to the episode reward
returns[key] = episode_reward
rewards.append(episode_reward)
max_reward.append(max(rewards))
net_charge.append(episode_net_charge)
epsilons.append(epsilon)
# updating Q table
av_table = updateQtable(av_table, av_count, returns)
# decaying epislon
if epsilon > epsilon_min:
epsilon *= epsilon_decay
if k % (epochs / 10) == 0: # printing progress
print(str(100 * k / epochs) + ' % done')
# results processing
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(111, projection='3d', )
ax.azim = 230
ax.set_xlabel('Settlement Period')
ax.set_ylabel('Charge')
ax.set_zlabel('State-Value')
x, y, z = [], [], [] # intializing the axes objects
lookup_table_actions = pd.DataFrame(index=range(1, 49), columns=list(range(0, 110, 10)))
lookup_table_values = pd.DataFrame(index=range(1, 49), columns=list(range(0, 110, 10)))
for state in state_space:
# creating our 3-d plot
y.append(state[0]) # charge
x.append(state[1]) # settlement period
# finding the optimal action for each state
state_actions = list(zip([state, state], sa_table[state]))
sa_values = [av_table[sa] for sa in state_actions]
optimal_value = max(sa_values)
optimal_index = np.argmax(sa_values)
actions_L = ['Discharge', 'Charge', 'Do Nothing']
optimal_action = actions_L[optimal_index]
lookup_table_actions.loc[state[1], state[0]] = optimal_action
lookup_table_values.loc[state[1], state[0]] = optimal_value
z.append(optimal_value) # the state value for this s/a pair
# creating a two new columns in lookup_table_values
# one for the maximum & one for the charge level this corresponds to
lookup_table_values['Maximum value'] = lookup_table_values.max(axis=1)
lookup_table_values['Optimal charge'] = lookup_table_values.idxmax(axis=1)
ax.plot_trisurf(x,y,z, linewidth=.02, cmap=cm.jet)
fig.show()
base_path = os.path.dirname(os.path.abspath(__file__))
lookup_table_path = os.path.join(base_path, 'Results', 'action table.csv')
lookup_table_values_path = os.path.join(base_path, 'Results', 'value table.csv')
rewards_path = os.path.join(base_path, 'Results', 'rewards.csv')
prices_DF = pd.DataFrame(getprices(), index=range(1, 49))
prices_DF.columns = ['Price']
lookup_table_values = pd.concat([lookup_table_values, prices_DF], axis=1)
results_DF = pd.DataFrame(index=list(range(1, epochs + 1)))
results_DF.index.rename('Epochs', inplace=True)
results_DF.loc[:, 'Episode Rewards'] = rewards
results_DF.loc[:, 'Maximum Reward'] = max_reward
results_DF.loc[:, 'Net Charge'] = net_charge
results_DF.loc[:, 'Epsilon'] = epsilons
lookup_table_actions.to_csv(lookup_table_path)
lookup_table_values.to_csv(lookup_table_values_path)
results_DF.to_csv(rewards_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment