Created
June 7, 2022 03:41
-
-
Save lakshith-403/206368e186dd7c288418cb71d58eaa95 to your computer and use it in GitHub Desktop.
bitcoin-RNN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
import torch.nn as nn | |
from torch.utils.data import Dataset | |
import pandas as pd | |
import numpy as np | |
import torch.optim as optim | |
import matplotlib.pyplot as plt | |
from sklearn.preprocessing import StandardScaler | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
print('cuda' if torch.cuda.is_available() else 'cpu') | |
sequence_length = 12 | |
batch_size = 1024 | |
epochs = 10 | |
learning_rate = 5e-5 | |
def create_seq_tuples(input_data, l): | |
x = [] | |
y = [] | |
L = len(input_data) | |
for i in range(L - l): | |
seq = input_data[i:i + l] | |
label = input_data[i + l: i + l + 1] | |
x.append(seq) | |
y.append(label) | |
return x, y | |
class FeatureDataset(Dataset): | |
def __init__(self, file_name): | |
global sequence_length | |
data_csv = pd.read_csv(file_name, header=0) | |
open_vector = np.array(data_csv['Open']) | |
x, y = create_seq_tuples(open_vector, sequence_length) | |
scaler = StandardScaler() | |
scaler.fit(x) | |
x = scaler.transform(x) | |
self.x = x | |
self.y = y | |
def __len__(self): | |
return len(self.x) | |
def __getitem__(self, idx): | |
return self.x[idx], self.y[idx] | |
data_set = FeatureDataset('bitstamp_cleaned.csv') | |
train_size = int(0.1 * len(data_set)) | |
test_size = len(data_set) - train_size | |
train_set, test_set = torch.utils.data.random_split(data_set, [train_size, test_size]) | |
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=True) | |
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True, drop_last=True) | |
# Feature shape -> (batch_size, seq_len) | |
# Label shape -> (batch_size, 1) | |
class RNN(nn.Module): | |
def __init__(self, input_size, hidden_size, num_layers): | |
global sequence_length | |
super(RNN, self).__init__() | |
self.hidden_size = hidden_size | |
self.num_layers = num_layers | |
self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True) | |
self.fc = nn.Linear(hidden_size , 1) | |
def forward(self, x): | |
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) | |
out, _ = self.rnn(x,h0) | |
out = out[:,-1:,:] | |
out = out.view(x.size(0), self.hidden_size) | |
out = self.fc(out) | |
return out | |
model = RNN(1, 256, 2).to(device) | |
model = model.float() | |
criterian = nn.MSELoss() | |
optimizer = optim.Adam(model.parameters(), lr=learning_rate) | |
losses = [] | |
for epoch in range(epochs): | |
for batch_idx, (x, y) in enumerate(train_loader): | |
x = x.to(device=device).view(batch_size, sequence_length, 1) | |
y = y.to(device=device) | |
pred = model(x.float()) | |
loss = criterian(pred, y.float()) | |
losses.append(loss.item()) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
if batch_idx % 50 == 0: | |
print(f"epoch: {epoch} \t batch: {batch_idx}") | |
plt.plot(losses) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment