Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
PyTorch RNN training example
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
from torch import optim
import numpy as np
import math, random
# Generating a noisy multi-sin wave
def sine_2(X, signal_freq=60.):
return (np.sin(2 * np.pi * (X) / signal_freq) + np.sin(4 * np.pi * (X) / signal_freq)) / 2.0
def noisy(Y, noise_range=(-0.05, 0.05)):
noise = np.random.uniform(noise_range[0], noise_range[1], size=Y.shape)
return Y + noise
def sample(sample_size):
random_offset = random.randint(0, sample_size)
X = np.arange(sample_size)
Y = noisy(sine_2(X + random_offset))
return Y
# Define the model
class SimpleRNN(nn.Module):
def __init__(self, hidden_size):
super(SimpleRNN, self).__init__()
self.hidden_size = hidden_size
self.inp = nn.Linear(1, hidden_size)
self.rnn = nn.LSTM(hidden_size, hidden_size, 2, dropout=0.05)
self.out = nn.Linear(hidden_size, 1)
def step(self, input, hidden=None):
input = self.inp(input.view(1, -1)).unsqueeze(1)
output, hidden = self.rnn(input, hidden)
output = self.out(output.squeeze(1))
return output, hidden
def forward(self, inputs, hidden=None, force=True, steps=0):
if force or steps == 0: steps = len(inputs)
outputs = Variable(torch.zeros(steps, 1, 1))
for i in range(steps):
if force or i == 0:
input = inputs[i]
else:
input = output
output, hidden = self.step(input, hidden)
outputs[i] = output
return outputs, hidden
n_epochs = 100
n_iters = 50
hidden_size = 10
model = SimpleRNN(hidden_size)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
losses = np.zeros(n_epochs) # For plotting
for epoch in range(n_epochs):
for iter in range(n_iters):
_inputs = sample(50)
inputs = Variable(torch.from_numpy(_inputs[:-1]).float())
targets = Variable(torch.from_numpy(_inputs[1:]).float())
# Use teacher forcing 50% of the time
force = random.random() < 0.5
outputs, hidden = model(inputs, None, force)
optimizer.zero_grad()
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
losses[epoch] += loss.data[0]
if epoch > 0:
print(epoch, loss.data[0])
# Use some plotting library
# if epoch % 10 == 0:
# show_plot('inputs', _inputs, True)
# show_plot('outputs', outputs.data.view(-1), True)
# show_plot('losses', losses[:epoch] / n_iters)
# Generate a test
# outputs, hidden = model(inputs, False, 50)
# show_plot('generated', outputs.data.view(-1), True)
# Online training
hidden = None
while True:
inputs = get_latest_sample()
outputs, hidden = model(inputs, hidden)
optimizer.zero_grad()
loss = criterion(outputs, inputs)
loss.backward()
optimizer.step()
@Dhanachandra
Copy link

Dhanachandra commented Apr 6, 2020

Using teacher forcing, we are supposed to feed the ground truth value to the RNN. But in this implementation, I don't see the ground truth value is fed in the RNN.

@ulassbin
Copy link

ulassbin commented Nov 26, 2020

Worked on this idea, fixed some of the issues. It was a great idea to use sine to do stuff with RNN, thanks!

import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
from torch import optim
import numpy as np
import math, random
import matplotlib.pyplot as plt
# Generating a noisy multi-sin wave 

def sine_2(X, signal_freq=60.):
    return (np.sin(2 * np.pi * (X) / signal_freq) + np.sin(4 * np.pi * (X) / signal_freq)) / 2.0

def noisy(Y, noise_range=(-0.05, 0.05)):
    noise = np.random.uniform(noise_range[0], noise_range[1], size=Y.shape)
    return Y + noise

def sample(sample_size):
    random_offset = random.randint(0, sample_size)
    X = np.arange(sample_size)
    x_base = sine_2(X + random_offset)
    Y = noisy(x_base)
    return x_base, Y

# Define the model

class SimpleRNN(nn.Module):
    def __init__(self, hidden_size, n_layers, batch_size):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.batch_size = batch_size
        #self.inp = nn.Linear(1, hidden_size) 
        self.rnn = nn.RNN(hidden_size, hidden_size, 2, batch_first=True)
        self.out = nn.Linear(hidden_size, hidden_size) # 10 in and 10 out

    def step(self, input, hidden=None):
        #input = self.inp(input.view(1, -1)).unsqueeze(1)
        output, hidden = self.rnn(input, hidden)
        output = self.out(output.squeeze(1))
        return output, hidden

    def forward(self, inputs, hidden=None):
        hidden = self.__init__hidden()
        print("Forward hidden {}".format(hidden.shape))
        print("Forward inps {}".format(inputs.shape))
        output, hidden = self.rnn(inputs.float(), hidden.float())
        print("Out1 {}".format(output.shape))
        output = self.out(output.float());
        print("Forward outputs {}".format(output.shape))

        return output, hidden

    def __init__hidden(self):
       hidden = torch.zeros(self.n_layers, self.batch_size, self.hidden_size, dtype=torch.float64)
       return hidden

n_epochs = 100
n_iters = 50
hidden_size = 1
n_layers = 2
batch_size = 5
seq_length = 10
n_sample_size = 50
model = SimpleRNN(hidden_size, n_layers, int(n_sample_size / seq_length))
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

losses = np.zeros(n_epochs) # For plotting

for epoch in range(n_epochs):

    for iter in range(n_iters):
        _targets, _inputs = sample(n_sample_size)
        inputs = Variable(torch.from_numpy(np.array([_inputs[0:10], _inputs[10:20], _inputs[20:30], _inputs[30:40], _inputs[40:50]], dtype=np.double)).unsqueeze(2));
        targets = Variable(torch.from_numpy(np.array([_targets[0:10], _targets[10:20], _targets[20:30], _targets[30:40], _targets[40:50]], dtype=np.double)).unsqueeze(2).float()) # [49]

        print("Inputs {}, targets {}".format(inputs.shape, targets.shape))
        # Use teacher forcing 50% of the time
        #force = random.random() < 0.5
        outputs, hidden = model(inputs.double(), None)

        optimizer.zero_grad()
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        losses[epoch] += loss
        if iter % 10 == 0:
            plt.clf();
            plt.ion()
            plt.title("Epoch {}, iter {}".format(epoch, iter))
            plt.plot(torch.flatten(outputs.detach()),'r-',linewidth=1,label='Output')
            plt.plot(torch.flatten(targets),'c-',linewidth=1,label='Label')
            plt.plot(torch.flatten(inputs),'g-',linewidth=1,label='Input')
            plt.draw();
            plt.pause(0.05);

    if epoch > 0:
        print(epoch, loss)

    # Use some plotting library
    # if epoch % 10 == 0:
        # show_plot('inputs', _inputs, True)
        # show_plot('outputs', outputs.data.view(-1), True)
        # show_plot('losses', losses[:epoch] / n_iters)

        # Generate a test
        # outputs, hidden = model(inputs, False, 50)
        # show_plot('generated', outputs.data.view(-1), True)

# Online training
hidden = None

# while True:
#     inputs = get_latest_sample()
#     outputs, hidden = model(inputs, hidden)

#     optimizer.zero_grad()
#     loss = criterion(outputs, inputs)
#     loss.backward()
#     optimizer.step()
 ~~~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment