Skip to content

Instantly share code, notes, and snippets.

View fhussonnois's full-sized avatar

Florian Hussonnois fhussonnois

View GitHub Profile
@fhussonnois
fhussonnois / DeduplicationTransformer.java
Created March 11, 2024 09:42
Kafka Streams : DeduplicationTransformer
public class DeduplicationTransformer<K, V, E> implements ValueTransformerWithKey<K, V, V> {
private ProcessorContext context;
private WindowStore<E, Long> eventIdStore;
private final String storeName;
private final KeyValueMapper<K, V, E> idExtractor;
@fhussonnois
fhussonnois / build.gradle
Created March 25, 2021 16:05
Azkarra Gradle Fat Jar
group 'org.example'
version '1.0'
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:6.1.0'
}
@fhussonnois
fhussonnois / example-streams-store-api-response.json
Created March 10, 2021 23:33
Azkarra Streams Framework - StateStore Lag and Offsets
[
{
"name": "Count",
"partition_restore_infos": [
{
"starting_offset": 0,
"ending_offset": 0,
"total_restored": 0,
"duration": "PT0.102756S",
"partition": 0,
@Component
public class ServerSentEventsWordCountTopology
// (O) Extend EventStreamSupport or implement the EventStreamProvider interface
extends EventStreamSupport
implements TopologyProvider {
private EventStream<String, Long> wordCountStream;
public ServerSentEventsWordCountTopology() {
// (1) Create a new EventStream for word counts updates.
@fhussonnois
fhussonnois / azkarra-cloudevents-example.json
Created September 24, 2020 15:00
Azkarra Streams CloudEvents Example
{
"id": "appid:basic-word-count-0-8-0;appsrv:localhost:8080;ts:1600959526290",
"source": "arn://kafka=s0wwlkImS6CZ4SQ7gOM9eA/host=localhost/port=8080",
"subject": "arn://kafka=s0wwlkImS6CZ4SQ7gOM9eA/host=localhost/port=8080/streams=basic-word-count-0-8-0",
"specversion": "1.0",
"type": "io.streamthoughts.azkarra.streams.stateupdateevent",
"time": "2020-09-24T14:58:46.290+0000",
"datacontenttype": "application/json",
"data": {
"state": "RUNNING",
@fhussonnois
fhussonnois / csv-example-00.sh
Created August 13, 2020 19:18
connect-file-pulse-csv-example-00
curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
@fhussonnois
fhussonnois / azkarra-streams-cloud-events.json
Last active May 12, 2020 12:21
Azkarra Streams - Streams State Cloud Event - Sample
{
"id": "appid:word-count;appsrv:localhost:8080;ts:1588976019636",
"source": "azkarra/ks/localhost:8080",
"specversion": "1.0",
"type": "io.streamthoughts.azkarra.streams.stateupdateevent",
"time": "2020-05-08T22:13:39.636+0000",
"datacontenttype": "application/json",
"ioazkarramonitorintervalms": 10000,
"ioazkarrastreamsappid": "word-count",
"ioazkarraversion": "0.7.0",
Connect FilePulse Connect Spooldir Connect FileStreams
Connector Type source source source / sink
License Apache License 2.0 Apache License 2.0 Apache License 2.0
Available on Confluent Hub YES YES YES
Docker image YES NO NO
Delivery semantics At-least-once At-least-once At-most-once
Usable in production YES YES NO
Supported file formats(out-of-the box) Delimited, Binary, JSON, Avro, XML (limited) Delimited, JSON Text file
{
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"filters": "GroupMultilineException, ParseLog4jLog",
"filters.GroupMultilineException.negate": "false",
"filters.GroupMultilineException.pattern": "^[\\t]",
"filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
"filters.ParseLog4jLog.match": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
"filters.ParseLog4jLog.overwrite": "message",
@fhussonnois
fhussonnois / example-file-pulse-conditional.properties
Last active January 22, 2020 13:41
example-file-pulse-conditional.properties
# The type of the filter
filters.TagSecurityException.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
# The name of the field to add
filters.TagSecurityException.field=tags
# the value of the field
filters.TagSecurityException.values=SecurityAlert
# The filter condition expression (using Simple Connect Expression Language)
filters.TagSecurityException.if="{{ contains($value.logmessage, BadCredentialsException) }}"
# Invert the boolean value return from the if expression
filters.TagSecurityException.invert=false