This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
—@rmoff January 9, 2019
import numpy as np | |
import tensorflow as tf | |
import tensorflow_io.kafka as kafka_io | |
# 1. Consume streaming data with Kafka and TensorFlow I/O | |
def func_x(x): | |
# Decode image to (28, 28) | |
x = tf.io.decode_raw(x, out_type=tf.uint8) | |
x = tf.reshape(x, [28, 28]) | |
# Convert to float32 for tf.keras |
--- | |
version: '2' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.3.1 | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- "2181:2181" |
CREATE STREAM products ...; | |
CREATE STREAM products_repartitioned | |
WITH (PARTITIONS=30) AS | |
SELECT * FROM products | |
EMIT CHANGES; |
// Continuously aggregating a KStream into a KTable. | |
KStream<String, String> locationUpdatesStream = ...; | |
KTable<String, Long> locationsPerUser | |
= locationUpdatesStream | |
.groupBy((k, v) -> v.username) | |
.count(); |
-- Continuously aggregating a stream into a table with a ksqlDB push query. | |
CREATE STREAM locationUpdatesStream ...; | |
CREATE TABLE locationsPerUser AS | |
SELECT username, COUNT(*) | |
FROM locationUpdatesStream | |
GROUP BY username | |
EMIT CHANGES; |
// Create KTable from Kafka topic. | |
KTable<String, String> table = builder.table("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
-- Create ksqlDB table from Kafka topic. | |
CREATE TABLE myTable (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', KEY='username', VALUE_FORMAT='...'); |
// Create KStream from Kafka topic. | |
StreamsBuilder builder = new StreamsBuilder(); | |
KStream<String, String> stream = | |
builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
-- Create ksqlDB stream from Kafka topic. | |
CREATE STREAM myStream (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...'); |