Skip to content

Instantly share code, notes, and snippets.

  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
import numpy as np
import tensorflow as tf
import tensorflow_io.kafka as kafka_io
# 1. Consume streaming data with Kafka and TensorFlow I/O
def func_x(x):
# Decode image to (28, 28)
x =, out_type=tf.uint8)
x = tf.reshape(x, [28, 28])
# Convert to float32 for tf.keras
x = tf.image.convert_image_dtype(x, tf.float32)
return x
def func_y(y):
# Decode image to (,)
y =, out_type=tf.uint8)
y = tf.reshape(y, [])
return y
train_images = kafka_io.KafkaDataset(['xx:0'], group='xx', eof=True).map(func_x)
train_labels = kafka_io.KafkaDataset(['yy:0'], group='yy', eof=True).map(func_y)
train_kafka =, train_labels)).batch(1)
# 2. Train Analytic Model with TensorFlow and Keras without an additional Data Store
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation=tf.nn.relu),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
# default: steps_per_epoch=12000, epochs=5, steps_per_epoch=1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment