Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save battlecook/8db93bc4087e495e32303242f880a8d1 to your computer and use it in GitHub Desktop.
Save battlecook/8db93bc4087e495e32303242f880a8d1 to your computer and use it in GitHub Desktop.
from MySQLdb import _mysql
from tensorflow import keras
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model
import numpy as np
import tensorflow as tf
import random
mysql_host = 'host'
mysql_user = 'user'
mysql_pw = 'password'
mysql_db = 'db'
mysql_client = _mysql.connect(host=mysql_host, user=mysql_user, passwd=mysql_pw, db=mysql_db)
table_name = 'USER_MATRIX'
user_count = 100
size_x = 4
size_y = 4
for user_id in range(user_count):
x = random.randrange(0, size_x)
y = random.randrange(0, size_y)
value = random.randrange(-7, 7)
insert_query = '''insert into {} (user_id, x, y, value) VALUES({},{},{},{})'''.format(table_name,str(user_id), str(x), str(y), value)
mysql_client.query(insert_query)
# https://www.tensorflow.org/tutorials/generative/autoencoder
class Denoise(Model):
def __init__(self, row_count, column_count):
super(Denoise, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Input(shape=(row_count, column_count, 1)),
layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)])
self.decoder = tf.keras.Sequential([
layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
class SqlDataGenerator(keras.utils.Sequence):
def __init__(self, ids, data_size, batch_size=10):
self.user_ids = ids
self.batch_size = batch_size
self.indexes = np.arange(len(self.user_ids))
self.data_size = data_size
def __len__(self):
return int(np.floor(len(self.user_ids) / self.batch_size))
def __getitem__(self, index):
indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
batch_user_ids = [self.user_ids[k] for k in indexes]
in_query = ','.join(batch_user_ids)
query = '''
SELECT user_id, x, y, value FROM {} WHERE user_id IN ( {} )
'''.format(table_name, in_query)
user_matrix = {}
mysql_client.query(query)
res = mysql_client.store_result()
for i in range(res.num_rows()):
row = res.fetch_row()[0]
user_id = str(row[0].decode("utf-8"))
x = int(row[1].decode("utf-8"))
y = int(row[2].decode("utf-8"))
value = int(row[3].decode("utf-8"))
if user_id not in user_matrix:
user_matrix[user_id] = np.zeros((self.data_size[0], self.data_size[1]))
user_matrix[user_id][x][y] = value
data = list()
for user_id, matrix in user_matrix.items():
data.append(np.array(matrix)[..., tf.newaxis])
data = np.array(data)
return data, data
def on_epoch_end(self):
self.indexes = np.arange(len(self.user_ids))
user_ids = [str(i) for i in range(0, user_count)]
autoencoder = Denoise(size_x, size_y)
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
batch_size = 20
train_generator = SqlDataGenerator(user_ids, (size_x, size_y), batch_size)
autoencoder.fit(
x=train_generator,
validation_data=train_generator,
batch_size=batch_size,
epochs=1,
)
autoencoder.encoder.summary()
autoencoder.decoder.summary()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment