Skip to content

Instantly share code, notes, and snippets.

@jamesanto
Last active February 8, 2024 16:56
Show Gist options
  • Save jamesanto/da27479e15614c270a738881833aa6d6 to your computer and use it in GitHub Desktop.
Save jamesanto/da27479e15614c270a738881833aa6d6 to your computer and use it in GitHub Desktop.
Stock Prediction
import csv
import math
import os.path
from calendar import monthrange, isleap
from datetime import datetime, date
from typing import List
import lightning as L
import torch
import torch.nn as nn
import torch.nn.functional as F
from lightning.pytorch.utilities.types import STEP_OUTPUT
from torch.utils import data as td
class ParallelLinear(nn.Module):
def __init__(self, in_features, out_features, parallelism):
super(ParallelLinear, self).__init__()
self.parallelism = parallelism
self.weights = nn.Parameter(torch.Tensor(parallelism, out_features, in_features))
self.biases = nn.Parameter(torch.Tensor(1, parallelism, 1, out_features))
# Initialize parameters
nn.init.kaiming_uniform_(self.weights, a=math.sqrt(5))
nn.init.zeros_(self.biases)
def forward(self, x):
output = torch.einsum('bsti,soi->bsto', x, self.weights) + self.biases
return output
class StockEncoder(nn.Module):
def __init__(self,
num_stocks,
num_features,
pre_norm: bool = True):
super(StockEncoder, self).__init__()
self.query_proj = ParallelLinear(num_features, num_features, num_stocks)
self.key_proj = ParallelLinear(num_features, num_features, num_stocks)
self.value_proj = ParallelLinear(num_features, num_features, num_stocks)
self.mlp = nn.Sequential(
ParallelLinear(num_features, num_features * 2, num_stocks),
nn.ReLU(),
ParallelLinear(num_features * 2, num_features, num_stocks)
)
self.norm1 = nn.LayerNorm(num_features)
self.norm2 = nn.LayerNorm(num_features)
self.pre_norm = pre_norm
def forward(self, x):
# x = [batch_size, num_stocks, time, num_features]
if self.pre_norm:
x = self.norm1(x)
query = self.query_proj(x)
key = self.key_proj(x)
value = self.value_proj(x)
# Scaled Dot Product Attention
scaling = query.size(-1) ** 0.5
scores_time = torch.matmul(query, key.transpose(-1, -2)) / scaling
scores_time = F.softmax(scores_time, dim=-1)
output_time = torch.matmul(scores_time, value)
scores_stocks = torch.matmul(query.transpose(1, 2), key.permute(0, 2, 3, 1)) / scaling
scores_stocks = F.softmax(scores_stocks, dim=-1)
output_stocks = torch.matmul(scores_stocks, value.transpose(1, 2))
output = output_time + output_stocks.transpose(1, 2)
output = self.norm2(x + output)
outputs = self.mlp(output)
outputs = output + outputs
if not self.pre_norm:
outputs = self.norm1(outputs)
return outputs
class Model(nn.Module):
def __init__(self,
num_stocks: int,
num_days: int,
num_features: int,
num_layers: int,
max_price: float):
super().__init__()
self.feature_scaling = ParallelLinear(in_features=num_features, out_features=num_features,
parallelism=num_stocks)
self.encoders = nn.ModuleList([
StockEncoder(num_stocks=num_stocks, num_features=num_features, pre_norm=True)
for _ in range(num_layers)
])
self.norm = nn.LayerNorm(num_features)
self.output = nn.Linear(in_features=num_days * num_features, out_features=1)
self.scaling = num_features ** 0.5
self.max_price = max_price
def forward(self, x):
out = x
out = self.feature_scaling(out)
out = out * self.scaling
for encoder in self.encoders:
out = encoder(out)
out = self.norm(out)
out = self.output(out.view(out.shape[0], out.shape[1], -1)).squeeze(dim=-1)
return out
def lightning(self) -> 'ModelLightning':
return ModelLightning(self)
class ModelLightning(L.LightningModule):
def __init__(self, model: Model):
super().__init__()
self.model = model
def _step(self, batch, train: bool = True):
features, targets = batch
targets = targets
predictions = self.model(features)[:, 1:]
loss = nn.functional.mse_loss(predictions, targets)
l1_loss = nn.functional.l1_loss(predictions * self.model.max_price, targets * self.model.max_price)
stage = 'train' if train else 'val'
self.log(f'{stage}_loss', loss, prog_bar=True)
self.log(f'{stage}_l1_loss', l1_loss, prog_bar=True)
return {
'loss': loss,
'predictions': predictions
}
def training_step(self, batch) -> STEP_OUTPUT:
return self._step(batch, train=True)
def validation_step(self, batch) -> STEP_OUTPUT:
return self._step(batch, train=False)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
class Dataset(td.Dataset):
def __init__(self,
stocks: List[str],
dates: List[date],
features: torch.Tensor,
targets: torch.Tensor,
num_days: int,
max_price: float):
self.stocks = stocks
self.dates = dates
self.features = features
self.targets = targets
self.num_days = num_days
self.max_price = max_price
self.length = len(self.dates) - num_days + 1
def with_num_days(self, num_days: int) -> 'Dataset':
return Dataset(
stocks=self.stocks,
dates=self.dates,
features=self.features,
targets=self.targets,
num_days=num_days,
max_price=self.max_price
)
def save(self, path: str):
torch.save(self, path)
@classmethod
def load(cls, path: str) -> 'Dataset':
return torch.load(path)
@classmethod
def make(cls, path: str, num_days: int):
_dates = set()
_stock_features = {}
max_price = 0.0
max_volume = 0.0
with open(path, 'r') as fd:
for row in csv.DictReader(fd):
_date = datetime.strptime(row['date'], '%Y-%m-%d').date()
_dates.add(_date)
_name = row['Name']
features = [
float(row['volume'].strip()) if row['volume'].strip() != '' else 0.0,
float(row['open'].strip()) if row['open'].strip() != '' else 0.0,
float(row['high'].strip()) if row['high'].strip() != '' else 0.0,
float(row['low'].strip()) if row['low'].strip() != '' else 0.0,
float(row['close'].strip()) if row['close'].strip() != '' else 0.0
]
max_price = max(max_price, max(features[1:]))
max_volume = max(max_volume, features[0])
features_by_date = _stock_features.get(_name, {})
features_by_date[_date] = features
_stock_features[_name] = features_by_date
stocks = list(_stock_features.keys())
dates = list(_dates)
dates.sort()
_empty_features = [0.0, 0.0, 0.0, 0.0, 0.0]
features = []
targets = []
for stock in stocks:
stock_features = []
target_features = []
for i in range(len(dates) - 1):
_date = dates[i]
_features = _stock_features.get(stock, {}).get(_date, _empty_features)
stock_features.append(_features)
_next_date = dates[i + 1]
_next_features = _stock_features.get(stock, {}).get(_next_date, _empty_features)
target_features.append(_next_features[-1])
features.append(stock_features)
targets.append(target_features)
_date_features = []
for _date in dates[:-1]:
_date_features.append([
_date.weekday() / 6, # day of week
_date.day / monthrange(_date.year, _date.month)[1], # day of month
_date.day / (365 + isleap(_date.year)), # day of year
_date.month / 12, # month of year
(_date.year - 2000) / 2000
])
features = torch.tensor(features) / torch.tensor([max_volume, max_price, max_price, max_price, max_price])
features = torch.cat([torch.tensor(_date_features).unsqueeze(0), features])
targets = torch.tensor(targets) / max_price
return cls(
stocks=stocks,
dates=dates[:-1],
features=features,
targets=targets,
num_days=num_days,
max_price=max_price
)
@classmethod
def make_or_load(cls,
csv_path: str,
num_days: int,
save_path: str,
save_if_not_exists: bool = True) -> 'Dataset':
if os.path.exists(save_path):
ret = cls.load(save_path)
else:
ret = cls.make(path=csv_path, num_days=num_days)
if save_if_not_exists:
ret.save(save_path)
return ret
def __len__(self):
return self.length
def __getitem__(self, idx):
if idx > self.length:
raise StopIteration()
features = self.features[:, idx:idx + self.num_days, :]
targets = self.targets[:, idx + self.num_days - 1]
return features, targets
def split(self, factor: float):
len1 = int(len(self) * factor)
ds1 = Dataset(
stocks=self.stocks,
dates=self.dates[:len1],
features=self.features[:, :len1, :],
targets=self.targets[:, :len1],
num_days=self.num_days,
max_price=self.max_price,
)
ds2 = Dataset(
stocks=self.stocks,
dates=self.dates[len1:],
features=self.features[:, len1:, :],
targets=self.targets[:, len1:],
num_days=self.num_days,
max_price=self.max_price
)
return ds1, ds2
def main():
batch_size = 4
num_layers = 2
num_days = 10
ds = Dataset.make_or_load(
csv_path='data/raw/all_stocks_5yr.csv',
num_days=num_days,
save_path='/tmp/t1.ds'
)
print(len(ds))
ds_train, ds_val = ds.split(0.8)
dl_train = td.DataLoader(ds_train, batch_size=batch_size, shuffle=True)
dl_val = td.DataLoader(ds_val, batch_size=batch_size)
model = Model(
num_stocks=len(ds.stocks) + 1,
num_days=num_days,
num_features=ds.features.shape[-1],
num_layers=num_layers,
max_price=ds.max_price
)
trainer = L.Trainer(
max_epochs=5,
val_check_interval=1.0,
num_sanity_val_steps=1,
check_val_every_n_epoch=1
)
trainer.fit(
model=model.lightning(),
train_dataloaders=dl_train,
val_dataloaders=dl_val,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment