Skip to content

Instantly share code, notes, and snippets.

@cty-yyds
Created April 8, 2021 21:17
Show Gist options
  • Save cty-yyds/04d1c3f0a8340a83024a33835a4757ed to your computer and use it in GitHub Desktop.
Save cty-yyds/04d1c3f0a8340a83024a33835a4757ed to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
def initialize_external_states(): # load data from external resources
states_info = pd.read_excel('some_datasheet.xlsx', index_col=0) # external states related to time
scaler = StandardScaler()
normalized_states_info = scaler.fit_transform(states_info.values)
return states_info, normalized_states_info
class TemplateEnvironment:
def __init__(self, variable_a, variable_b): # some variable we want to change for simulation
self.n_states = 4 # no. of states
self.n_actions = 2 # no. of actions
self.action1_max = 1 # action max
self.action2_max = 1 # action max
# system variable
self.variable_a = variable_a # e.g. battery capacity
self.variable_b = variable_b
# internal states
self.internal_state1 = 0 # e.g. battery level
self.internal_state2 = 0
# external states dataframe
self.states_info, self.normalized_states_info = initialize_external_states()
# keep track of step for loading external states
self.step = 0 # trading time step
def reset(self):
self.internal_state1 = 0
self.internal_state2 = 0
external_state1 = self.normalized_states_info.iloc[0, 0]
external_state2 = self.normalized_states_info.iloc[0, 1]
initial_states = np.hstack((self.internal_state1, self.internal_state2,
external_state1, external_state2)) # reset state
self.step = 0 # reset step
return initial_states
def sample(self):
# random select actions
action_1 = np.random.uniform(0, self.action1_max) # *2-1==>(-1, 1) sigmoid to tanh
action_2 = np.random.uniform(0, self.action2_max)
return np.array([action_1, action_2])
def make_action(self, action):
# ----------------------in scope parameters--------------------#
para_a = 1.2 # e.g. price coefficient buying from main power grid
para_b = 0.8 # e.g. price coefficient selling to main power grid
para_c = 0.0091 # e.g. battery wear cost
reward = 0
variable_reward = 0
# -----------------------load external states------------------#
external_state1 = self.states_info.iloc[self.step, 0]
external_state2 = self.states_info.iloc[self.step, 1]
# -----------------------load actions------------------#
action_1 = action[0] * 100 # multiply some scale
action_2 = action[1] * 50 # *2-1==>(-1, 1) sigmoid to tanh
# ---------------------calculate next internal state----------------#
# some random system functions and limits
self.internal_state1 += external_state1 * para_a * action_1
if self.internal_state1 > self.variable_a:
self.internal_state1 += self.variable_a
variable_reward = self.internal_state1 - self.variable_a
if self.internal_state1 < 0:
self.internal_state1 = 0
variable_reward += -self.internal_state1
self.internal_state2 += self.internal_state2 * action_2 * para_b
if self.internal_state2 > self.variable_b:
self.internal_state2 = self.variable_b
variable_reward += self.internal_state2 - self.variable_b
if self.internal_state2 < 0:
self.internal_state2 = 0
variable_reward += -self.internal_state2
# --------------------calculate reward----------------------------------#
reward += action_2 * external_state2
reward -= variable_reward * para_c # e.g. penalty
# -------------------return reward and next state------------------------#
self.step += 1 # next step
next_external_state1 = self.normalized_states_info.iloc[self.step, 0]
next_external_state2 = self.normalized_states_info.iloc[self.step, 1]
next_state = np.hstack((self.internal_state1, self.internal_state2,
next_external_state1, next_external_state2))
return reward, next_state # for debugging, you can also return any in-scope variable e.g. variable_reward
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment