Skip to content

Instantly share code, notes, and snippets.

View fhussonnois's full-sized avatar

Florian Hussonnois fhussonnois

View GitHub Profile
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
@fhussonnois
fhussonnois / .java
Created November 26, 2019 20:26
Kafka Streams WordCount Example
package example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
@fhussonnois
fhussonnois / example-file-pulse-log4j.properties
Created January 22, 2020 11:41
example-file-pulse-log4j.properties
# List of aliases for the filter chain, specifying the order in which the filters will be applied.
filters=GroupMultilineException, ParseLog4jLog
# Configuration for the filter with alias 'GroupMultilineException'
filters.GroupMultilineException.negate=false
filters.GroupMultilineException.pattern="^[\\t]"
filters.GroupMultilineException.type=io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter
# Configuration for the filter with alias 'ParseLog4jLog'
filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
@fhussonnois
fhussonnois / example-file-pulse-conditional.properties
Last active January 22, 2020 13:41
example-file-pulse-conditional.properties
# The type of the filter
filters.TagSecurityException.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
# The name of the field to add
filters.TagSecurityException.field=tags
# the value of the field
filters.TagSecurityException.values=SecurityAlert
# The filter condition expression (using Simple Connect Expression Language)
filters.TagSecurityException.if="{{ contains($value.logmessage, BadCredentialsException) }}"
# Invert the boolean value return from the if expression
filters.TagSecurityException.invert=false
{
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"filters": "GroupMultilineException, ParseLog4jLog",
"filters.GroupMultilineException.negate": "false",
"filters.GroupMultilineException.pattern": "^[\\t]",
"filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
"filters.ParseLog4jLog.match": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
"filters.ParseLog4jLog.overwrite": "message",
Connect FilePulse Connect Spooldir Connect FileStreams
Connector Type source source source / sink
License Apache License 2.0 Apache License 2.0 Apache License 2.0
Available on Confluent Hub YES YES YES
Docker image YES NO NO
Delivery semantics At-least-once At-least-once At-most-once
Usable in production YES YES NO
Supported file formats(out-of-the box) Delimited, Binary, JSON, Avro, XML (limited) Delimited, JSON Text file
@fhussonnois
fhussonnois / azkarra-streams-cloud-events.json
Last active May 12, 2020 12:21
Azkarra Streams - Streams State Cloud Event - Sample
{
"id": "appid:word-count;appsrv:localhost:8080;ts:1588976019636",
"source": "azkarra/ks/localhost:8080",
"specversion": "1.0",
"type": "io.streamthoughts.azkarra.streams.stateupdateevent",
"time": "2020-05-08T22:13:39.636+0000",
"datacontenttype": "application/json",
"ioazkarramonitorintervalms": 10000,
"ioazkarrastreamsappid": "word-count",
"ioazkarraversion": "0.7.0",
@fhussonnois
fhussonnois / csv-example-00.sh
Created August 13, 2020 19:18
connect-file-pulse-csv-example-00
curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
@fhussonnois
fhussonnois / azkarra-cloudevents-example.json
Created September 24, 2020 15:00
Azkarra Streams CloudEvents Example
{
"id": "appid:basic-word-count-0-8-0;appsrv:localhost:8080;ts:1600959526290",
"source": "arn://kafka=s0wwlkImS6CZ4SQ7gOM9eA/host=localhost/port=8080",
"subject": "arn://kafka=s0wwlkImS6CZ4SQ7gOM9eA/host=localhost/port=8080/streams=basic-word-count-0-8-0",
"specversion": "1.0",
"type": "io.streamthoughts.azkarra.streams.stateupdateevent",
"time": "2020-09-24T14:58:46.290+0000",
"datacontenttype": "application/json",
"data": {
"state": "RUNNING",
@Component
public class ServerSentEventsWordCountTopology
// (O) Extend EventStreamSupport or implement the EventStreamProvider interface
extends EventStreamSupport
implements TopologyProvider {
private EventStream<String, Long> wordCountStream;
public ServerSentEventsWordCountTopology() {
// (1) Create a new EventStream for word counts updates.