Skip to content

Instantly share code, notes, and snippets.

View confluentgist's full-sized avatar

Confluent, Inc. confluentgist

View GitHub Profile
@confluentgist
confluentgist / reconfiguring-buffer-size-at-runtime.java
Last active March 19, 2019 03:48 — forked from vvcephei/suppression-blog-figure-5.java
Reconfiguring Buffer Size at Runtime
builder
.table("users")
.suppress(Suppressed.untilTimeLimit(
BufferConfig.maxBytes(myConfig.getUsersBufferSize())
))
...
driver.pipeInput(recordFactory.create(
/* topic */ "input",
/* key */ "A",
/* value */ "v1",
/* timestamp */ 10L
));
// Stream time is now 10L
driver.pipeInput(recordFactory.create("input", "A", "v2", 11L));
// Stream time is now 11L
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
)
.count(Materialized.as("count-metric"))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)
events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count(Materialized.as("count-metric"))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)
// graph servlet queries "count-metric"
events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count(Materialized.as("count-metric"))
// graph servlet queries "count-metric"
@confluentgist
confluentgist / 00_numeric.mapping_README.md
Last active August 30, 2022 11:44 — forked from rmoff/00_numeric.mapping_README.md
Kafka Connect JDBC Connector - numeric.mapping
CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';
@confluentgist
confluentgist / gist:fba5c442d047270ac63a8499066651d4
Last active March 15, 2019 23:23
Data Augmentation with KSQL
SELECT Id, IFNULL(Class, -1) FROM creditcardfraud_source;
@confluentgist
confluentgist / gist:5c4870e2af590465093fe89a9e0fa5b9
Last active March 15, 2019 23:23
Data Anonymization in KSQL
SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source;
CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;