Created
March 28, 2023 08:02
-
-
Save Athe-kunal/afb6b68f375ec0ebec8d10e8497b95e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import gymnasium as gym | |
import numpy as np | |
from numpy import random as rd | |
class StockTradingEnv(gym.Env): | |
def __init__( | |
self, | |
config, | |
initial_account=1e6, | |
gamma=0.99, | |
turbulence_thresh=99, | |
min_stock_rate=0.1, | |
max_stock=1e2, | |
initial_capital=1e6, | |
buy_cost_pct=1e-3, | |
sell_cost_pct=1e-3, | |
reward_scaling=2**-11, | |
initial_stocks=None, | |
): | |
price_ary = config["price_array"] | |
tech_ary = config["tech_array"] | |
turbulence_ary = config["turbulence_array"] | |
if_train = config["if_train"] | |
self.price_ary = price_ary.astype(np.float32) | |
self.tech_ary = tech_ary.astype(np.float32) | |
self.turbulence_ary = turbulence_ary | |
self.tech_ary = self.tech_ary * 2**-7 | |
self.turbulence_bool = (turbulence_ary > turbulence_thresh).astype(np.float32) | |
self.turbulence_ary = ( | |
self.sigmoid_sign(turbulence_ary, turbulence_thresh) * 2**-5 | |
).astype(np.float32) | |
stock_dim = self.price_ary.shape[1] | |
self.gamma = gamma | |
self.max_stock = max_stock | |
self.min_stock_rate = min_stock_rate | |
self.buy_cost_pct = buy_cost_pct | |
self.sell_cost_pct = sell_cost_pct | |
self.reward_scaling = reward_scaling | |
self.initial_capital = initial_capital | |
self.initial_stocks = ( | |
np.zeros(stock_dim, dtype=np.float32) | |
if initial_stocks is None | |
else initial_stocks | |
) | |
# reset() | |
self.day = None | |
self.amount = None | |
self.stocks = None | |
self.total_asset = None | |
self.gamma_reward = None | |
self.initial_total_asset = None | |
# environment information | |
self.env_name = "StockEnv" | |
# self.state_dim = 1 + 2 + 2 * stock_dim + self.tech_ary.shape[1] | |
# # amount + (turbulence, turbulence_bool) + (price, stock) * stock_dim + tech_dim | |
self.state_dim = 1 + 2 + 3 * stock_dim + self.tech_ary.shape[1] | |
# amount + (turbulence, turbulence_bool) + (price, stock) * stock_dim + tech_dim | |
self.stocks_cd = None | |
self.action_dim = stock_dim | |
self.max_step = self.price_ary.shape[0] - 1 | |
self.if_train = if_train | |
self.if_discrete = False | |
self.target_return = 10.0 | |
self.episode_return = 0.0 | |
self.observation_space = gym.spaces.Box( | |
low=-3000, high=3000, shape=(self.state_dim,), dtype=np.float32 | |
) | |
self.action_space = gym.spaces.Box( | |
low=-1, high=1, shape=(self.action_dim,), dtype=np.float32 | |
) | |
def reset(self,seed=None,options=None): | |
self.day = 0 | |
price = self.price_ary[self.day] | |
if self.if_train: | |
self.stocks = ( | |
self.initial_stocks + rd.randint(0, 64, size=self.initial_stocks.shape) | |
).astype(np.float32) | |
self.stocks_cool_down = np.zeros_like(self.stocks) | |
self.amount = ( | |
self.initial_capital * rd.uniform(0.95, 1.05) | |
- (self.stocks * price).sum() | |
) | |
else: | |
self.stocks = self.initial_stocks.astype(np.float32) | |
self.stocks_cool_down = np.zeros_like(self.stocks) | |
self.amount = self.initial_capital | |
self.total_asset = self.amount + (self.stocks * price).sum() | |
self.initial_total_asset = self.total_asset | |
self.gamma_reward = 0.0 | |
return self.get_state(price),{} # state | |
def step(self, actions): | |
actions = (actions * self.max_stock).astype(int) | |
self.day += 1 | |
price = self.price_ary[self.day] | |
self.stocks_cool_down += 1 | |
if self.turbulence_bool[self.day] == 0: | |
min_action = int(self.max_stock * self.min_stock_rate) # stock_cd | |
for index in np.where(actions < -min_action)[0]: # sell_index: | |
if price[index] > 0: # Sell only if current asset is > 0 | |
sell_num_shares = min(self.stocks[index], -actions[index]) | |
self.stocks[index] -= sell_num_shares | |
self.amount += ( | |
price[index] * sell_num_shares * (1 - self.sell_cost_pct) | |
) | |
self.stocks_cool_down[index] = 0 | |
for index in np.where(actions > min_action)[0]: # buy_index: | |
if ( | |
price[index] > 0 | |
): # Buy only if the price is > 0 (no missing data in this particular date) | |
buy_num_shares = min(self.amount // price[index], actions[index]) | |
self.stocks[index] += buy_num_shares | |
self.amount -= ( | |
price[index] * buy_num_shares * (1 + self.buy_cost_pct) | |
) | |
self.stocks_cool_down[index] = 0 | |
else: # sell all when turbulence | |
self.amount += (self.stocks * price).sum() * (1 - self.sell_cost_pct) | |
self.stocks[:] = 0 | |
self.stocks_cool_down[:] = 0 | |
state = self.get_state(price) | |
total_asset = self.amount + (self.stocks * price).sum() | |
reward = (total_asset - self.total_asset) * self.reward_scaling | |
self.total_asset = total_asset | |
self.gamma_reward = self.gamma_reward * self.gamma + reward | |
done = self.day == self.max_step | |
if done: | |
reward = self.gamma_reward | |
self.episode_return = total_asset / self.initial_total_asset | |
return state, reward, done,done, dict() | |
def get_state(self, price): | |
amount = np.array(self.amount * (2**-12), dtype=np.float32) | |
scale = np.array(2**-6, dtype=np.float32) | |
return np.hstack( | |
( | |
amount, | |
self.turbulence_ary[self.day], | |
self.turbulence_bool[self.day], | |
price * scale, | |
self.stocks * scale, | |
self.stocks_cool_down, | |
self.tech_ary[self.day], | |
) | |
) # state.astype(np.float32) | |
@staticmethod | |
def sigmoid_sign(ary, thresh): | |
def sigmoid(x): | |
return 1 / (1 + np.exp(-x * np.e)) - 0.5 | |
return sigmoid(ary / thresh) * thresh |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment