Skip to content

Instantly share code, notes, and snippets.

@kracekumar
Created January 9, 2018 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kracekumar/545f9e9e86af1da107a21aae481e70a3 to your computer and use it in GitHub Desktop.
Save kracekumar/545f9e9e86af1da107a21aae481e70a3 to your computer and use it in GitHub Desktop.
tf.data Experiments
# Fizz Buzz in Tensorflow!
# see http://joelgrus.com/2016/05/23/fizz-buzz-in-tensorflow/
import time
import sys
import numpy as np
import tensorflow as tf
from utils import binary_encode, fizz_buzz_encode, fizz_buzz
#from models import gen, gen_all
NUM_DIGITS = 10
# We'll want to randomly initialize weights.
def init_weights(shape):
return tf.Variable(tf.random_normal(shape, stddev=0.01))
# Our model is a standard 1-hidden-layer multi-layer-perceptron with ReLU
# activation. The softmax (which turns arbitrary real-valued outputs into
# probabilities) gets applied in the cost function.
def model(X, w_h, w_o):
h = tf.nn.relu(tf.matmul(X, w_h))
return tf.matmul(h, w_o)
# Our variables. The input has width NUM_DIGITS, and the output has width 4.
X = tf.placeholder("float", [None, NUM_DIGITS])
Y = tf.placeholder("float", [None, 4])
# How many units in the hidden layer.
NUM_HIDDEN = 100
# Initialize the weights.
w_h = init_weights([NUM_DIGITS, NUM_HIDDEN])
w_o = init_weights([NUM_HIDDEN, 4])
# Finally, we need a way to turn a prediction (and an original number)
# into a fizz buzz output
BATCH_SIZE = 128
# Launch the graph in a session
def main():
# Predict y given x using the model.
trX = np.array([binary_encode(i, NUM_DIGITS) for i in range(101, 2 ** NUM_DIGITS)])
trY = np.array([fizz_buzz_encode(i) for i in range(101, 2 ** NUM_DIGITS)])
py_x = model(X, w_h, w_o)
# We'll train our model by minimizing a cost function.
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=py_x, labels=Y))
train_op = tf.train.GradientDescentOptimizer(0.05).minimize(cost)
# And we'll make predictions by choosing the largest output.
predict_op = tf.argmax(py_x, 1)
with tf.Session() as sess:
tf.initialize_all_variables().run()
for epoch in range(1):
# Shuffle the data before each training iteration.
p = np.random.permutation(range(len(trX)))
trX, trY = trX[p], trY[p]
# Train in batches of 128 inputs.
for start in range(0, len(trX), BATCH_SIZE):
end = start + BATCH_SIZE
sess.run(train_op, feed_dict={X: trX[start:end], Y: trY[start:end]})
# And print the current accuracy on the training data.
print(epoch, np.mean(np.argmax(trY, axis=1) ==
sess.run(predict_op, feed_dict={X: trX, Y: trY})))
# And now for some fizz buzz
numbers = np.arange(1, 101)
teX = np.transpose(binary_encode(numbers, NUM_DIGITS))
teY = sess.run(predict_op, feed_dict={X: teX})
output = np.vectorize(fizz_buzz)(numbers, teY)
tf.summary.FileWriter('plain', sess.graph)
print(output)
if __name__ == '__main__':
main()
"""Illustrate use of generator
"""
import sys
import os
import numpy as np
import tensorflow as tf
from functools import partial
from models import gen_test, gen_training, gen_range, get_training
from utils import binary_encode, fizz_buzz_encode, fizz_buzz, check_op
NUM_DIGITS = 20
HIDDEN_UNITS = 100
def init_weights(shape):
return tf.Variable(tf.random_normal(shape, stddev=0.01))
class FizzBuzzGenModel:
def create_placeholders(self):
self.X = tf.placeholder("float", [None, NUM_DIGITS])
self.Y = tf.placeholder("float", [None, 4])
def build_graph(self):
w_h = init_weights([NUM_DIGITS, HIDDEN_UNITS])
w_o = init_weights([HIDDEN_UNITS, 4])
h = tf.nn.relu(tf.matmul(self.X, w_h))
py_x = tf.matmul(h, w_o)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
logits=py_x, labels=self.Y))
train_op = tf.train.GradientDescentOptimizer(0.05).minimize(cost)
predict_op = tf.argmax(py_x, 1)
self.train_op, self.predict_op = train_op, predict_op
def evaluate(self):
dataset = tf.data.Dataset.from_generator(gen_test, (tf.float32, tf.float32, tf.float32)).batch(100)
value = dataset.make_one_shot_iterator().get_next()
try:
while True:
res = self.sess.run(value)
Y = self.sess.run(self.predict_op, feed_dict={self.X: res[0]})
except tf.errors.OutOfRangeError as e:
pass
print("Value, Predicted, Expected")
for idx, v in enumerate(res[2]):
print(v, fizz_buzz(v, Y[idx]), check_op(pos=Y[idx], val=res[1][idx]))
def remove_pad(self, pack):
items = []
for item in pack:
items.append([item[0], item[1][:4]])
return items
def train(self):
self.sess.run(tf.global_variables_initializer())
j = 0
try:
while True:
res = self.sess.run(self.iterator)
if len(res) == 2:
x, y = res
self.sess.run(self.train_op, feed_dict={self.X: x, self.Y: y})
print(j, np.mean(np.argmax(y, axis=1) ==
self.sess.run(self.predict_op, feed_dict={self.X: x, self.Y:y})))
j += 1
else:
for item in res:
vals = self.remove_pad(item)
x, y = zip(*vals)
self.sess.run(self.train_op, feed_dict={self.X: x, self.Y: y})
print(j, np.mean(np.argmax(y, axis=1) ==
self.sess.run(self.predict_op, feed_dict={self.X: x, self.Y: y})))
j += 1
except tf.errors.OutOfRangeError:
pass
def create_dataset(self):
if '--parallel' in sys.argv:
num_calls = 10
dataset = tf.data.Dataset.from_generator(
gen_range, (tf.int32, tf.int32))
self.dataset = dataset.map(lambda x, y: tf.py_func(get_training, [x, y], [tf.int64, tf.int64] * 64),
num_parallel_calls=10).repeat(1).batch(128)
else:
self.dataset = tf.data.Dataset.from_generator(
gen_training, (tf.float32, tf.float32)).repeat(1).batch(128)
def run(self):
with tf.Session() as sess:
self.sess = sess
self.create_dataset()
self.iterator = self.dataset.make_one_shot_iterator().get_next()
self.create_placeholders()
self.build_graph()
writer = tf.summary.FileWriter("iterator", sess.graph)
self.train()
self.evaluate()
def main():
model = FizzBuzzGenModel()
model.run()
if __name__ == '__main__':
main()
from sqlalchemy import Column, create_engine, Integer
from sqlalchemy.sql.expression import func
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.declarative import declarative_base
import numpy as np
from utils import binary_encode, fizz_buzz_encode
NUM_DIGITS = 20
Base = declarative_base()
class Data(Base):
__tablename__ = 'data'
id = Column(Integer, primary_key=True)
value = Column(Integer)
# 0 => Training, 1 => test
label = Column(Integer, default=0, index=True)
x = Column(postgresql.ARRAY(Integer))
y = Column(postgresql.ARRAY(Integer))
def create_session():
engine = create_engine('postgresql+psycopg2://username:password@localhost/db')
Session = sessionmaker(bind=engine)
return Session()
def main():
sess = create_session()
Base.metadata.create_all(sess.get_bind())
# Training
for i in range(101, 2 ** NUM_DIGITS):
x = [int(item) for item in binary_encode(i, NUM_DIGITS)]
y = [int(item) for item in fizz_buzz_encode(i)]
data = Data(x=x, y=y, value=i, label=0)
sess.add(data)
if i % 1000 == 0:
sess.commit()
print(i)
sess.commit()
# Test i in range
for i in range(1, 101):
x = [int(item) for item in binary_encode(i, NUM_DIGITS)]
y = [int(item) for item in fizz_buzz_encode(i)]
data = Data(x=x, y=y, value=i, label=1)
sess.add(data)
sess.commit()
def gen_range(groups=10, limit=1280):
jump = limit/groups
start, stop = 0, 0
while stop != limit:
stop = start + jump
yield start, stop
start = stop
def get_training(start=None, stop=None, limit=128):
# Dataset.map doesn't like generator so return all values at a same limit.
def pad(x, y):
"""Tensorflow expects both the values in the list of same shape.
So pad now and strip later.
"""
length_missing = len(x) - len(y)
for _ in range(length_missing):
y.append(-1)
return [x, y]
try:
sess = create_session()
res = sess.query(Data.x, Data.y).filter_by(label=0).order_by(func.random())
if start and stop:
res = res.slice(start, stop)
return [pad(item[0], item[1]) for item in res.limit(limit)]
finally:
sess.close()
def gen_training(start=None, stop=None, limit=1280):
try:
sess = create_session()
res = sess.query(Data.x, Data.y).filter_by(label=0).order_by(func.random()).limit(limit)
if start is not None and stop is not None:
res = res.slice(start, stop)
for item in res:
yield item[0], item[1]
finally:
sess.close()
def gen_test():
try:
sess = create_session()
for item in sess.query(Data.x, Data.y, Data.value).filter_by(label=1).order_by(func.random()):
yield item[0], item[1], item[2]
finally:
sess.close()
if __name__ == '__main__':
main()
import numpy as np
def fizz_buzz(i, prediction):
return [str(i), "fizz", "buzz", "fizzbuzz"][prediction]
# Represent each input by an array of its binary digits.
def binary_encode(i, num_digits):
return np.array([i >> d & 1 for d in range(num_digits)])
# One-hot encode the desired outputs: [number, "fizz", "buzz", "fizzbuzz"]
def fizz_buzz_encode(i):
if i % 15 == 0: return np.array([0, 0, 0, 1])
elif i % 5 == 0: return np.array([0, 0, 1, 0])
elif i % 3 == 0: return np.array([0, 1, 0, 0])
else: return np.array([1, 0, 0, 0])
def check_op(pos, val):
return val[pos] == 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment