Skip to content

Instantly share code, notes, and snippets.

View kaiwaehner's full-sized avatar

Kai Waehner kaiwaehner

View GitHub Profile
// Configure Kafka Streams Application
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique
// in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
// Where to find Kafka broker(s).
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;
@kaiwaehner
kaiwaehner / gist:a5d6710c2d038bada2d3dfd7a66d6450
Created January 9, 2019 09:01
Create KSQL STREAM for Preprocessing with Python
client.create_stream_as(table_name='creditcardfraud_preprocessed_avro',
select_columns=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class'],
src_table='creditcardfraud_source',
conditions='Class IS NOT NULL',
kafka_topic='creditcardfraud_preprocessed_avro',
value_format='AVRO')
@kaiwaehner
kaiwaehner / gist:6a16c2b5fb88ff1d09534309343e5dd0
Created January 9, 2019 09:00
Connect to KSQL and CREATE TABLE using Python
from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
client.create_stream(table_name='creditcardfraud_source',
columns_type=['Id bigint', 'Timestamp varchar', 'User varchar', 'Time int', 'V1 double', 'V2 double', 'V3 double', 'V4 double', 'V5 double', 'V6 double', 'V7 double', 'V8 double', 'V9 double', 'V10 double', 'V11 double', 'V12 double', 'V13 double', 'V14 double', 'V15 double', 'V16 double', 'V17 double', 'V18 double', 'V19 double', 'V20 double', 'V21 double', 'V22 double', 'V23 double', 'V24 double', 'V25 double', 'V26 double', 'V27 double', 'V28 double', 'Amount double', 'Class string'],
topic='creditcardfraud_source',
value_format='DELIMITED')
CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';
@kaiwaehner
kaiwaehner / gist:128053794064d7709028449ca42b8c21
Created January 9, 2019 08:56
Data Augmentation with KSQL
SELECT Id, IFNULL(Class, -1) FROM creditcardfraud_source;
@kaiwaehner
kaiwaehner / gist:46ec08597794bb827162751365f52695
Created January 9, 2019 08:55
Data Anonymisation in KSQL
SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source;
CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;
create stream AnomalyDetectionWithFilter as select rowtime, eventid, anomaly(sensorinput) as Anomaly from carsensor where anomaly(sensorinput) > 5;