This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
// Create KTable from Kafka topic. | |
KTable<String, String> table = builder.table("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
CREATE STREAM products ...; | |
CREATE STREAM products_repartitioned | |
WITH (PARTITIONS=30) AS | |
SELECT * FROM products | |
EMIT CHANGES; |
--- | |
version: '2' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.3.1 | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- "2181:2181" |
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* |
import numpy as np | |
import tensorflow as tf | |
import tensorflow_io.kafka as kafka_io | |
# 1. Consume streaming data with Kafka and TensorFlow I/O | |
def func_x(x): | |
# Decode image to (28, 28) | |
x = tf.io.decode_raw(x, out_type=tf.uint8) | |
x = tf.reshape(x, [28, 28]) | |
# Convert to float32 for tf.keras |
// Continuously aggregating a KStream into a KTable. | |
KStream<String, String> locationUpdatesStream = ...; | |
KTable<String, Long> locationsPerUser | |
= locationUpdatesStream | |
.groupBy((k, v) -> v.username) | |
.count(); |
This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
—@rmoff January 9, 2019
-- Continuously aggregating a stream into a table with a ksqlDB push query. | |
CREATE STREAM locationUpdatesStream ...; | |
CREATE TABLE locationsPerUser AS | |
SELECT username, COUNT(*) | |
FROM locationUpdatesStream | |
GROUP BY username | |
EMIT CHANGES; |
-- Create ksqlDB table from Kafka topic. | |
CREATE TABLE myTable (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', KEY='username', VALUE_FORMAT='...'); |
-- Create ksqlDB stream from Kafka topic. | |
CREATE STREAM myStream (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...'); |