Skip to content

Instantly share code, notes, and snippets.

FROM confluentinc/cp-kafka-connect:5.5.1
# Remove bundled (older) version of kafka-connect plugins for S3/GCS
RUN rm -rf /usr/share/java/kafka-connect-s3/ /usr/share/java/kafka-connect-gcs/ /usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/
# Install the official kafka-connect-s3 plugin in v5.5.1
RUN (echo 1 && yes) | confluent-hub install confluentinc/kafka-connect-s3:5.5.1
# Install the latest kafka-connect Single Message Transforms plugin
RUN (echo 1 && yes) | confluent-hub install confluentinc/connect-transforms:latest
# Acceptable events for processing (other are discarded
TRIGGERING_EVENTS = ['ObjectCreated:Put', 'ObjectCreated:CompleteMultipartUpload']
# The S3 path inference regexp: env/topics/domain/raw/format/table/partition_scheme
EXTRACTION_REGEXP = '(?P<env>.*)/topics/(?P<domain>.*)/raw/(?P<format>.*)/(?P<short_name>.*)/' \
'(?P<version_partition>.*)=(?P<version>.*)/' \
'event_date=(?P<event_date>.*)/event_hour=(?P<event_hour>.*)/' \
'(?P<file>.*)'
{
"typology": "job-queue",
"compaction": "none",
"scope": "public",
"encoding": "avro",
"throughput": "normal",
"ownership": "team-ads",
"pii": "HIGH",
"producers": [
"producer_app_1"
.
├── leboncoin
│ ├── ads
│ │ ├── accepted-ad
│ │ │ ├── prod
│ │ │ │ └── schemas
│ │ │ └── staging
│ │ │ └── schemas
│ │ ├── action-states
│ │ │ ├── prod
"rotate.interval.ms": "3600000",
"rotate.schedule.interval.ms": "1200000",
"flush.size": "500000"
"transforms": "tombstoneHandler,insertFields",
"transforms.tombstoneHandler.type": "io.confluent.connect.transforms.TombstoneHandler",
"transforms.tombstoneHandler.behavior": "ignore",
"transforms.insertFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertFields.partition.field": "_kafka_partition",
"transforms.insertFields.offset.field": "_kafka_offset",
"transforms.insertFields.timestamp.field": "_kafka_timestamp",
"partitioner.class": "fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner",
"timestamp.extractor": "fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor",
"topics": "leboncoin_staging_ads_ad-publications_public_avro",
"s3.bucket.name": "leboncoin_kafka_events_bucket",
"topics.dir": "staging/topics/ads/raw/parquet",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"path.format": "'schema_version'=VV/'event_date'=YYYY-MM-dd/'event_hour'=HH",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"errors.tolerance": "none",
"errors.logs.enable": "true",
"errors.log.include.messages": "true"
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"errors.tolerance": "none",
"errors.logs.enable": "true",
"errors.log.include.messages": "true",
"topics": "leboncoin_staging_ads_ad-publications_public_avro",
"s3.bucket.name": "leboncoin_kafka_events_bucket",
"topics.dir": "staging/topics/ads/raw/parquet",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",