Last active
April 11, 2019 06:05
-
-
Save sheminusminus/26360b6caa13c4a467d3e5771421f2b7 to your computer and use it in GitHub Desktop.
Totally amateur Tensorflow experiment, made largely by modifying other scripts, to train a neural network (LSTM) to compose songs (we used Disney songs as input)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Scans the MIDI files in and converts them to the format expected by the neural network. | |
# | |
# We encode the MIDI note number as a one-hot vector, and the duration of the | |
# note as another one-hot vector. We combine all these vectors inside a big | |
# Numpy array and save it as X.npy. | |
import os | |
import struct | |
import numpy as np | |
from collections import defaultdict | |
# Map to standard notes. | |
map_notes = { | |
21: 44, 22: 42, 24: 46, 25: 46, 26: 46, | |
60: 46, 62: 44, 63: 42, 75: 41, 80: 47, 82: 48, | |
} | |
def read_32bit(f): | |
return struct.unpack(">I", f.read(4))[0] | |
def read_16bit(f): | |
return struct.unpack(">H", f.read(2))[0] | |
def skip_bytes(f, length): | |
global byte_count | |
f.seek(length, 1) | |
byte_count -= length | |
def peek_byte(f): | |
byte = f.read(1) | |
f.seek(-1, 1) | |
return struct.unpack("B", byte)[0] | |
def next_byte(f): | |
global byte_count | |
byte_count -= 1 | |
return struct.unpack("B", f.read(1))[0] | |
def read_var_length(f): | |
value = next_byte(f) | |
if value & 0x80 != 0: | |
value &= 0x7F | |
while True: | |
byte = next_byte(f) | |
value = (value << 7) + (byte & 0x7F) | |
if byte & 0x80 == 0: break | |
return value | |
def read_track(f): | |
global current_track | |
global byte_count | |
global event_count | |
global ticks_until_next_bar | |
status = 0 | |
total_ticks = 0 | |
extra_ticks = 0 | |
track_events = [] | |
while byte_count > 0: | |
ticks = read_var_length(f) | |
total_ticks += ticks | |
if peek_byte(f) & 0x80 != 0: | |
status = next_byte(f) | |
code = status & 0xF0 | |
# NOTE_OFF | |
if code == 0x80: | |
channel = code & 0x0F | |
note_number = next_byte(f) | |
velocity = next_byte(f) | |
extra_ticks += ticks | |
# NOTE_ON | |
elif code == 0x90: | |
channel = code & 0x0F | |
note_number = next_byte(f) | |
velocity = next_byte(f) | |
# First note of new file needs to be moved up to the next bar. | |
if len(track_events) == 0: | |
ticks += ticks_until_next_bar | |
ticks_until_next_bar = 0 | |
ticks += extra_ticks | |
extra_ticks = 0 | |
note_counts[note_number] += 1 | |
tick_counts[ticks] += 1 | |
event_count += 1 | |
last_tick = total_ticks | |
track_events.append((note_number, ticks)) | |
# KEY_PRESSURE, CONTROL_CHANGE, PITCH_BEND | |
elif code in [0xA0, 0xB0, 0xE0]: | |
data1 = next_byte(f) | |
data2 = next_byte(f) | |
# print("Event %u" & status) | |
extra_ticks += ticks | |
# PROGRAM_CHANGE, CHANNEL_PRESSURE | |
elif code in [0xC0, 0xD0]: | |
data1 = next_byte(f) | |
# print("Event %u" & status) | |
extra_ticks += ticks | |
# SYS_EX | |
elif status == 0xF0: | |
length = read_var_length(f) | |
skip_bytes(f, length) | |
# print("SysEx") | |
extra_ticks += ticks | |
# SYSTEM_RESET | |
elif status == 0xFF: | |
typ = next_byte(f) | |
length = read_var_length(f) | |
skip_bytes(f, length) | |
# print("Meta type", typ, "length", length) | |
extra_ticks += ticks | |
else: | |
print("Unsupported event:", status) | |
exit() | |
global midi_events, stats | |
midi_events += track_events | |
stats.append(len(track_events)) | |
ticks_until_next_bar = 480 - (last_tick % 480) | |
# print("Ticks left until next bar", ticks_until_next_bar) | |
current_track += 1 | |
def read_chunk(f): | |
global byte_count | |
fourcc = f.read(4) | |
byte_count = read_32bit(f) | |
if fourcc == b"MTrk": | |
read_track(f) | |
else: | |
print("Skipping chunk '%s', %u bytes" % (fourcc, byte_count)) | |
skip_bytes(f, byte_count) | |
def read_midi(f): | |
global current_track | |
fourcc = f.read(4) | |
if fourcc != b"MThd": | |
print("Expected MThd header") | |
return | |
if read_32bit(f) != 6: | |
print("Expected '6'") | |
return | |
fmt = read_16bit(f) | |
if fmt != 0: | |
print("Cannot handle format", fmt) | |
return | |
num_tracks = read_16bit(f) | |
if num_tracks != 1: | |
print("Cannot handle multiple tracks") | |
return | |
ticks_per_beat = read_16bit(f) | |
if ticks_per_beat & 0x8000 != 0: | |
print("SMPTE time codes not supported") | |
return | |
current_track = 0 | |
while current_track < num_tracks: | |
read_chunk(f) | |
def import_midi_file(filename): | |
print("Importing '%s'" % filename) | |
with open(filename, "rb") as f: | |
read_midi(f) | |
################################################################################ | |
# This array will store all the MIDI events we're interested in. | |
midi_events = [] | |
# For gathering statistics on length etc. | |
stats = [] | |
# We're glueing all the input files together. | |
ticks_until_next_bar = 0 | |
# To count how often each note / tick value occurs. | |
note_counts = defaultdict(int) | |
tick_counts = defaultdict(int) | |
# Scan all MIDI files. | |
file_count = 0 | |
event_count = 0 | |
for root, directories, filenames in os.walk("Data"): | |
for filename in filenames: | |
if filename.endswith(".mid"): | |
import_midi_file(os.path.join(root, filename)) | |
file_count += 1 | |
print("Done! Scanned %d files, %d MIDI events" % (file_count, event_count)) | |
unique_notes = len(note_counts) | |
print("Unique notes:", unique_notes) | |
unique_ticks = len(tick_counts) | |
print("Unique ticks:", unique_ticks) | |
print("Statistics: min %g, max %g, average %g events per MIDI file" % (np.min(stats), np.max(stats), np.mean(stats))) | |
# These lookup tables are used for converting the notes and durations | |
# to one-hot encoded vectors. | |
ix_to_note = sorted(note_counts.keys()) | |
note_to_ix = { n:i for i,n in enumerate(ix_to_note) } | |
ix_to_tick = sorted(tick_counts.keys()) | |
tick_to_ix = { t:i for i,t in enumerate(ix_to_tick) } | |
# Save these tables because we'll need them to convert back to MIDI notes | |
# when sampling from the trained LSTM. | |
import pickle | |
pickle.dump(ix_to_note, open("ix_to_note.p", "wb")) | |
pickle.dump(ix_to_tick, open("ix_to_tick.p", "wb")) | |
# Encode the data as a matrix of note_counts + tick_counts columns and | |
# event_count rows. The notes and ticks will be one-hot encoded. | |
X = np.zeros((len(midi_events), unique_notes + unique_ticks), dtype=np.float32) | |
print("Training file shape:", X.shape) | |
for i, (note, tick) in enumerate(midi_events): | |
note_onehot = np.zeros(unique_notes) | |
note_onehot[note_to_ix[note]] = 1.0 | |
X[i, 0:unique_notes] = note_onehot | |
tick_onehot = np.zeros(unique_ticks) | |
tick_onehot[tick_to_ix[tick]] = 1.0 | |
X[i, unique_notes:] = tick_onehot | |
np.save("X.npy", X) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# LSTM for training. | |
# | |
# The training procedure is based on Andrej Karpathy's min-char-rnn script from | |
# https://gist.github.com/karpathy/d4dee566867f8291f086 | |
import os | |
import sys | |
import numpy as np | |
import tensorflow as tf | |
import pickle | |
import struct | |
checkpoint_dir = "checkpoints" | |
summaries_dir = "logs" | |
hidden_size = 200 # number of neurons in hidden layer | |
unroll_steps = 21 # number of steps to unroll the RNN for | |
training_steps = 100000 | |
################################################################################ | |
def usage(): | |
script_name = sys.argv[0] | |
print("Usage:") | |
print(" %s train train a new model" % script_name) | |
print(" %s train <checkpoint_file> resume training" % script_name) | |
print(" %s sample <checkpoint_file> sample from saved model" % script_name) | |
print(" %s export <checkpoint_file> save the weights" % script_name) | |
print(" %s random drum like a monkey" % script_name) | |
sys.exit(1) | |
mode = None | |
if len(sys.argv) >= 2: | |
if sys.argv[1] == "train": | |
mode = "train" | |
if len(sys.argv) >= 3: | |
model_file = sys.argv[2] | |
print("Resuming training from model %s" % model_file) | |
else: | |
model_file = None | |
print("Training new model") | |
print("Saving model to %s" % checkpoint_dir) | |
elif sys.argv[1] == "sample": | |
if len(sys.argv) >= 3: | |
mode = "sample" | |
model_file = sys.argv[2] | |
print("Sampling from model %s" % model_file) | |
elif sys.argv[1] == "export": | |
mode = "export" | |
model_file = sys.argv[2] | |
print("Exporting from model %s" % model_file) | |
elif sys.argv[1] == "random": | |
mode = "random" | |
if mode is None: | |
usage() | |
################################################################################ | |
def weight_variable(shape): | |
return tf.Variable(tf.truncated_normal(shape, stddev=0.01)) | |
class RNN: | |
def __init__(self, note_vector_size, tick_vector_size, hidden_size, unroll_steps): | |
"""Creates a new RNN object. | |
Parameters | |
---------- | |
note_vector_size: int | |
number of elements in each (one-hot encoded) input note | |
tick_vector_size: int | |
number of elements in each (one-hot encoded) input duration | |
hidden_size: int | |
size of hidden layer of neurons | |
unroll_steps: int | |
number of steps to unroll the RNN for | |
""" | |
self.note_vector_size = note_vector_size | |
self.tick_vector_size = tick_vector_size | |
self.vector_size = self.note_vector_size + self.tick_vector_size | |
self.hidden_size = hidden_size | |
self.unroll_steps = unroll_steps | |
self.build_graph() | |
def build_graph(self): | |
print("Creating graph...") | |
with tf.name_scope("hyperparameters"): | |
self.learning_rate = tf.placeholder(tf.float32, name="learning-rate") | |
# The dimensions of the input tensor x and the target tensor y are | |
# (unroll_steps, vector_size) but we leave the first dimension as None, | |
# so that in sample() we can pass in a single value at a time. | |
with tf.name_scope("inputs"): | |
self.x = tf.placeholder(tf.float32, [None, self.vector_size], name="x-input") | |
# Because we train to predict the next element, y contains almost the | |
# same elements as x but shifted one step in time: y[t] = x[t-1]. | |
self.y = tf.placeholder(tf.float32, [None, self.vector_size], name="y-input") | |
# Input for the initial memory state of the LSTM. This is the last memory | |
# state of the previous time rnn.train() was called. | |
self.h = tf.placeholder(tf.float32, [1, self.hidden_size], name="h-prev") | |
self.c = tf.placeholder(tf.float32, [1, self.hidden_size], name="c-prev") | |
# Model parameters for a single LSTM layer. This is what the network will learn. | |
with tf.name_scope("lstm-cell"): | |
# This matrix combines the weights for x, h, and the bias. | |
self.Wx = weight_variable([self.vector_size + self.hidden_size + 1, self.hidden_size*4]) | |
# Parameters of hidden (h) to output (y). This is also what the network will learn. | |
with tf.name_scope("lstm-to-output"): | |
# This matrix combines the weights and the bias. | |
self.Wy = weight_variable([self.hidden_size + 1, self.vector_size]) | |
# The logic for the LSTM cell. We unroll the network into unroll_steps steps, | |
# each with its own cell. The cell stores hidden state ("h") but also cell state | |
# ("c"). | |
hs = [self.h] | |
cs = [self.c] | |
ys_note = [] | |
ys_tick = [] | |
for t in range(self.unroll_steps): | |
# Create an input vector of size [x + h + 1]. The 1 is for the bias. | |
h_flat = tf.reshape(hs[t], [self.hidden_size]) | |
combined = tf.concat([self.x[t], h_flat, tf.ones(1)], axis=0) | |
# Turn the vector into a matrix with shape (1, size) so we can matmul() | |
# it. | |
reshaped = tf.reshape(combined, [1, self.vector_size + self.hidden_size + 1]) | |
# Compute the new hidden state and cell state, which depends on the "current" | |
# input x[t] and the previous hidden state, h[t - 1] and c[t - 1]. | |
cell = tf.matmul(reshaped, self.Wx) | |
cell = tf.reshape(cell, [4, self.hidden_size]) | |
cell_c = tf.sigmoid(cell[0]) * cs[t] + tf.sigmoid(cell[1]) * tf.tanh(cell[3]) | |
cell_h = tf.sigmoid(cell[2]) * tf.tanh(cell_c) | |
# Formulas used from https://en.wikipedia.org/wiki/Long_short-term_memory | |
# Store the new hidden and cell state, which we need to compute the | |
# output for this time step ys[t]. | |
hs.append(cell_h) | |
cs.append(cell_c) | |
# Add 1 for the bias. | |
combined = tf.concat([cell_h, tf.ones((1, 1))], axis=1) | |
y_pred = tf.matmul(combined, self.Wy) | |
# Each ys[t] is the predicted element for step t in the RNN, a matrix of shape | |
# (1, vector_size). We reshape it so that ys will be (unroll_steps, vector_size) | |
# and so we can more easily compare it to self.y, which also has that shape. | |
y_pred = tf.reshape(y_pred, [self.vector_size]) | |
# Predict the next note. | |
y_pred_note = tf.nn.softmax(y_pred[:self.note_vector_size]) | |
ys_note.append(y_pred_note) | |
# Predict the next duration. | |
y_pred_tick = tf.nn.softmax(y_pred[self.note_vector_size:]) | |
ys_tick.append(y_pred_tick) | |
# We don't need to remember any of the intermediate steps, only the first | |
# one (for sampling) and the last one (for training the next batch). | |
self.y_pred_note = ys_note[0] | |
self.y_pred_tick = ys_tick[0] | |
self.first_h = hs[1] # since hs[0] is the old one | |
self.last_h = hs[-1] | |
self.first_c = cs[1] # since cs[0] is the old one | |
self.last_c = cs[-1] | |
# The following operations are only used during training, not for inference. | |
# Need to split up the expected output into note and duration. | |
y_note = self.y[:, :self.note_vector_size] | |
y_tick = self.y[:, self.note_vector_size:] | |
with tf.name_scope("loss-function"): | |
# Softmax, so use cross entropy loss. | |
self.loss = (tf.reduce_mean(-tf.reduce_sum(y_note * tf.log(ys_note), reduction_indices=[1])) | |
+ tf.reduce_mean(-tf.reduce_sum(y_tick * tf.log(ys_tick), reduction_indices=[1]))) | |
with tf.name_scope("train"): | |
optimizer = tf.train.RMSPropOptimizer(self.learning_rate) | |
# Apply gradient clipping. | |
grads_and_vars = optimizer.compute_gradients(self.loss) | |
clipped = [(tf.clip_by_value(grad, -5.0, 5.0), var) for grad, var in grads_and_vars] | |
self.train_op = optimizer.apply_gradients(clipped) | |
# The accuracy op computes the % correct predictions. This is the accuracy | |
# across a single unrolled chunk of data. | |
with tf.name_scope("accuracy"): | |
# Combine notes and ticks into a new tensor that looks like this: | |
# [[note1,tick1], [note2,tick2], ..., [note_n, tick_n]] | |
y_stacked = tf.stack([tf.argmax(y_note, 1), tf.argmax(y_tick, 1)], axis=1) | |
ys_stacked = tf.stack([tf.argmax(ys_note, 1), tf.argmax(ys_tick, 1)], axis=1) | |
# Then compare the predictions with the truth. We count success | |
# if both the note and the tick are correct. | |
correct_prediction = tf.to_float(tf.reduce_all(tf.equal(y_stacked, ys_stacked), axis=1)) | |
self.accuracy = tf.reduce_mean(correct_prediction) | |
self.init = tf.global_variables_initializer() | |
def prepare_for_training(self, sess): | |
sess.run(self.init) | |
# Compute the loss at iteration 0. This is the "ideal" loss when the weights | |
# are all 0. Because we initialize the weights with small random numbers, the | |
# true initial loss will be slightly different. | |
initial_loss = -np.log(1.0/self.note_vector_size) + -np.log(1.0/self.tick_vector_size) | |
print("Expected initial loss:", initial_loss) | |
def train(self, sess, x, y, h, c, learning_rate): | |
feed = {self.x: x, self.y: y, self.h: h, self.c: c, self.learning_rate: learning_rate} | |
ops = [self.train_op, self.loss, self.last_h, self.last_c] | |
_, loss_value, h, c = sess.run(ops, feed_dict=feed) | |
return loss_value, h, c | |
def sample(self, sess, h, c, seed_ix_note, seed_ix_tick, n): | |
x = np.zeros((1, self.vector_size)) | |
ixes = [] | |
for t in range(n): | |
# One-hot encode the input values. | |
x[0, seed_ix_note] = 1 | |
x[0, self.note_vector_size + seed_ix_tick] = 1 | |
# Do the forward pass. Note that we don't need the entire "unrolled" | |
# RNN now. We only feed in a single example and we compute a single | |
# output. | |
feed = {self.x: x, self.h: h, self.c: c} | |
ops = [self.y_pred_note, self.y_pred_tick, self.first_h, self.first_c] | |
predicted_note, predicted_tick, h, c = sess.run(ops, feed_dict=feed) | |
# Randomly sample from the output probability distributions. | |
ix_note = np.random.choice(range(self.note_vector_size), p=predicted_note.ravel()) | |
ix_tick = np.random.choice(range(self.tick_vector_size), p=predicted_tick.ravel()) | |
ixes.append((ix_note, ix_tick)) | |
# Use the output as the next input. | |
x[0, seed_ix_note] = 0 | |
x[0, self.note_vector_size + seed_ix_tick] = 0 | |
seed_ix_note = ix_note | |
seed_ix_tick = ix_tick | |
return ixes | |
################################################################################ | |
class Data: | |
def __init__(self, filename): | |
print("Loading data...") | |
self.ix_to_note = pickle.load(open("ix_to_note.p", "rb")) | |
self.ix_to_tick = pickle.load(open("ix_to_tick.p", "rb")) | |
self.unique_notes = len(self.ix_to_note) | |
self.unique_ticks = len(self.ix_to_tick) | |
self.note_to_ix = { n:i for i,n in enumerate(self.ix_to_note) } | |
self.tick_to_ix = { t:i for i,t in enumerate(self.ix_to_tick) } | |
self.X = np.load(filename) | |
self.data_size = self.X.shape[0] | |
self.reset() | |
def reset(self): | |
self.p = 0 | |
def next_batch(self, unroll_steps): | |
# Reached the end? Then go back to start of data. | |
new_epoch = False | |
if self.p + unroll_steps + 1 >= self.data_size: | |
new_epoch = True | |
self.p = 0 | |
x, y = self.get_range(self.p, unroll_steps) | |
# Move data pointer ahead. | |
self.p += unroll_steps | |
return x, y, new_epoch | |
def get_range(self, start, length): | |
x = self.X[start : start+length ] | |
y = self.X[start+1 : start+length+1] | |
return x, y | |
def to_text(self, ixes): | |
return ",".join(str(self.ix_to_note[ix_note]) + ":" + \ | |
str(self.ix_to_tick[ix_tick]) for ix_note, ix_tick in ixes) | |
################################################################################ | |
def write_32bit(f, value): | |
f.write(struct.pack(">I", value)) | |
def write_16bit(f, value): | |
f.write(struct.pack(">H", value & 0xffff)) | |
def write_byte(f, value): | |
f.write(struct.pack("B", value & 0xff)) | |
def write_var_length(f, value): | |
count = 0 | |
buf = value & 0x7f | |
value >>= 7 | |
while value != 0: | |
buf <<= 8 | |
buf |= (value & 0x7f) | 0x80 | |
value >>= 7 | |
while True: | |
write_byte(f, buf) | |
count += 1 | |
if buf & 0x80: | |
buf >>= 8 | |
else: | |
return count | |
def write_midi_file(filename, notes_and_ticks): | |
print("Saving MIDI file '%s'" % filename) | |
with open(filename, "wb") as f: | |
f.write(bytes([0x4D, 0x54, 0x68, 0x64])) # MThd | |
write_32bit(f, 6) | |
write_16bit(f, 0) # format 0 | |
write_16bit(f, 1) # one track | |
write_16bit(f, 480) # ticks per beat | |
f.write(bytes([0x4D, 0x54, 0x72, 0x6b])) # MTrk | |
# Remember this position to write chunk length afterwards. | |
length_offset = f.tell() | |
write_32bit(f, 0) | |
byte_count = 0 | |
for note, ticks in notes_and_ticks: | |
# Write delta time for this event. Subtract 1 tick | |
# from the previous NOTE_OFF event. | |
delta = max(0, ticks - 1) | |
byte_count += write_var_length(f, delta) | |
# Write a NOTE_ON event for the new note. | |
write_byte(f, 0x9A) # channel 10 | |
write_byte(f, note) # MIDI note number | |
write_byte(f, 0x64) # velocity | |
byte_count += 3 | |
# Write delta time of 1 tick. | |
byte_count += write_var_length(f, 1) | |
# Write a NOTE_OFF event for the note. | |
write_byte(f, 0x8A) # channel 10 | |
write_byte(f, note) # MIDI note number | |
write_byte(f, 0x64) # velocity | |
byte_count += 3 | |
# Write the end-of-track marker. | |
byte_count += write_var_length(f, 0) | |
write_byte(f, 0xff) | |
write_byte(f, 0x2f) | |
write_byte(f, 0x00) | |
byte_count += 3 | |
# Fill in the byte_count in the chunk length header. | |
f.seek(length_offset) | |
write_32bit(f, byte_count) | |
################################################################################ | |
def train(rnn, data, steps): | |
print("Training RNN...") | |
tf.gfile.MakeDirs(checkpoint_dir) | |
with tf.Session() as sess: | |
# For writing training checkpoints and reading them back in. | |
saver = tf.train.Saver() | |
rnn.prepare_for_training(sess) | |
h = np.zeros((1, rnn.hidden_size)) | |
c = np.zeros((1, rnn.hidden_size)) | |
# Continue training from a previously saved checkpoint. | |
if model_file is not None: | |
saver.restore(sess, model_file) | |
# Compute initial loss over the first batch, so we have a starting point | |
# for smoothing the loss. | |
x, y, _ = data.next_batch(rnn.unroll_steps) | |
feed = {rnn.x: x, rnn.y: y, rnn.h: h, rnn.c: c} | |
smooth_loss = sess.run(rnn.loss, feed_dict=feed) | |
print("Initial loss: %f" % smooth_loss) | |
tf.summary.scalar("cross-entropy-loss", rnn.loss) | |
summary_op = tf.summary.merge_all() | |
summary_writer = tf.summary.FileWriter(summaries_dir, sess.graph) | |
epoch = 1 | |
start_n = 0 | |
lr = 1e-2 | |
for n in range(start_n, steps + 1): | |
# Get the next chunk of data. | |
x, y, new_epoch = data.next_batch(rnn.unroll_steps) | |
if new_epoch: | |
# Reset the RNN's memory on every new epoch. | |
h = np.zeros((1, rnn.hidden_size)) | |
c = np.zeros((1, rnn.hidden_size)) | |
epoch += 1 | |
# Train the RNN. | |
loss_value, h, c = rnn.train(sess, x, y, h, c, learning_rate=lr) | |
smooth_loss = smooth_loss * 0.999 + loss_value * 0.001 | |
# Update summaries. | |
if n % 100 == 0: | |
feed = {rnn.x: x, rnn.y: y, rnn.h: h, rnn.c: c} | |
summary = sess.run(summary_op, feed_dict=feed) | |
summary_writer.add_summary(summary, n) | |
summary_writer.flush() | |
if n % 100 == 0: | |
print("step %d, epoch: %d, loss: %f (smoothed %f), lr: %g" % \ | |
(n, epoch, loss_value, smooth_loss, lr)) | |
# Sample from the model now and then to see how well it works. | |
if n % 1000 == 0: | |
seed_ix_note = np.argmax(x[0, :data.unique_notes]) | |
seed_ix_tick = np.argmax(x[0, data.unique_notes:]) | |
sampled = rnn.sample(sess, h, c, seed_ix_note, seed_ix_tick, 400) | |
print("----\n%s\n----" % data.to_text(sampled)) | |
# Compute accuracy across the entire dataset. | |
if n % 1000 == 0: | |
num_chunks = data.data_size // rnn.unroll_steps | |
print("Computing accuracy over %d chunks... " % num_chunks, end="") | |
scores = np.zeros(num_chunks) | |
for b in range(num_chunks): | |
x, y = data.get_range(b*unroll_steps, unroll_steps) | |
feed = {rnn.x: x, rnn.y: y, rnn.h: h, rnn.c: c} | |
scores[b] = sess.run(rnn.accuracy, feed_dict=feed) | |
print("score: %f" % scores.mean()) | |
# Save the model. | |
if n % 500 == 0: | |
checkpoint_file = os.path.join(checkpoint_dir, "model-%d" % n) | |
saver.save(sess, checkpoint_file) | |
print("*** SAVED MODEL '%s' ***" % checkpoint_file) | |
summary_writer.close() | |
################################################################################ | |
def sample(rnn, data): | |
print("Sampling...") | |
with tf.Session() as sess: | |
# Load the saved model back into the session. | |
saver = tf.train.Saver() | |
saver.restore(sess, model_file) | |
# Start with an empty memory. | |
h = np.zeros((1, rnn.hidden_size)) | |
c = np.zeros((1, rnn.hidden_size)) | |
first_ix_note = data.note_to_ix[36] | |
first_ix_tick = 0 | |
sampled = rnn.sample(sess, h, c, first_ix_note, first_ix_tick, 100) | |
print("----\n%s\n----" % data.to_text(sampled)) | |
notes = [] | |
for ix_note, ix_tick in sampled: | |
notes.append((data.ix_to_note[ix_note], data.ix_to_tick[ix_tick]*30)) | |
write_midi_file("generated.mid", notes) | |
################################################################################ | |
def export_weights(rnn): | |
with tf.Session() as sess: | |
saver = tf.train.Saver() | |
saver.restore(sess, model_file) | |
print("Wx shape:", rnn.Wx.shape) | |
print("Wy shape:", rnn.Wy.shape) | |
rnn.Wx.eval().tofile("Wx.bin") | |
rnn.Wy.eval().tofile("Wy.bin") | |
################################################################################ | |
def random_notes(data): | |
notes = [] | |
for i in range(200): | |
note_ix = np.random.randint(data.unique_notes) | |
tick_ix = np.random.randint(data.unique_ticks) | |
notes.append((data.ix_to_note[note_ix], data.ix_to_tick[tick_ix])) | |
write_midi_file("random.mid", notes) | |
################################################################################ | |
data = Data("X.npy") | |
rnn = RNN(data.unique_notes, data.unique_ticks, hidden_size, unroll_steps) | |
if mode == "train": | |
train(rnn, data, steps=training_steps) | |
elif mode == "sample": | |
sample(rnn, data) | |
elif mode == "export": | |
export_weights(rnn) | |
elif mode == "random": | |
random_notes(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment