Skip to content

Instantly share code, notes, and snippets.

Avatar

Confluent, Inc. confluentgist

View GitHub Profile
@confluentgist
confluentgist / gist:63871738ff2df410b8d488d0235ec908
Created February 5, 2019 05:01
Create KSQL STREAM for preprocessing with Python
View gist:63871738ff2df410b8d488d0235ec908
client.create_stream_as(table_name='creditcardfraud_preprocessed_avro',
select_columns=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class'],
src_table='creditcardfraud_source',
conditions='Class IS NOT NULL',
kafka_topic='creditcardfraud_preprocessed_avro',
value_format='AVRO')
@confluentgist
confluentgist / gist:fec77a553824119bfef29119c62826e8
Last active March 15, 2019 23:23
Connect to KSQL and CREATE TABLE Using Python
View gist:fec77a553824119bfef29119c62826e8
from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
client.create_stream(table_name='creditcardfraud_source',
columns_type=['Id bigint', 'Timestamp varchar', 'User varchar', 'Time int', 'V1 double', 'V2 double', 'V3 double', 'V4 double', 'V5 double', 'V6 double', 'V7 double', 'V8 double', 'V9 double', 'V10 double', 'V11 double', 'V12 double', 'V13 double', 'V14 double', 'V15 double', 'V16 double', 'V17 double', 'V18 double', 'V19 double', 'V20 double', 'V21 double', 'V22 double', 'V23 double', 'V24 double', 'V25 double', 'V26 double', 'V27 double', 'V28 double', 'Amount double', 'Class string'],
topic='creditcardfraud_source',
value_format='DELIMITED')
View gist:375dc18cc721ef57a123df508ff64500
CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;
@confluentgist
confluentgist / gist:5c4870e2af590465093fe89a9e0fa5b9
Last active March 15, 2019 23:23
Data Anonymization in KSQL
View gist:5c4870e2af590465093fe89a9e0fa5b9
SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source;
@confluentgist
confluentgist / gist:fba5c442d047270ac63a8499066651d4
Last active March 15, 2019 23:23
Data Augmentation with KSQL
View gist:fba5c442d047270ac63a8499066651d4
SELECT Id, IFNULL(Class, -1) FROM creditcardfraud_source;
View gist:cfcef6393cb2e2476dec25608b9de7a9
CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';
@confluentgist
confluentgist / 00_numeric.mapping_README.md
Last active August 30, 2022 11:44 — forked from rmoff/00_numeric.mapping_README.md
Kafka Connect JDBC Connector - numeric.mapping
View 00_numeric.mapping_README.md
View suppress.java
events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count(Materialized.as("count-metric"))
// graph servlet queries "count-metric"
View use-case-alerting.java
events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count(Materialized.as("count-metric"))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)
// graph servlet queries "count-metric"
View metrics-app-with-alerts.java
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
)
.count(Materialized.as("count-metric"))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)