Skip to content

Instantly share code, notes, and snippets.

@uuhnaut69
Last active March 5, 2020 06:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uuhnaut69/b4bfb70efca2da44141e88496465964e to your computer and use it in GitHub Desktop.
Save uuhnaut69/b4bfb70efca2da44141e88496465964e to your computer and use it in GitHub Desktop.
A tutorial setup ETL Postgres CDC -> Kafka Connect -> ES

postgres-cdc-kafka-connect-es-setup-guide

A tutorial setup ETL Postgres CDC -> Kafka Connect -> ES

Topology


                   +-------------+
                   |             |
                   |   Postgres  |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |    (Debezium, ES connectors)     |
          |                                  |
          +---------------+------------------+
                          |
                          |
                          |
                          |
                  +-------v--------+
                  |                |
                  | Elasticsearch  |
                  |                |
                  +----------------+

Installation Guide

Installing the wal2json plugin

wal2json: To encode changes in JSON format

  • Go to lib folder of Postgres app: /usr/pgsql-9.6/lib/ (The url maybe difference depend on your installation by homebrew or using Postgres App)

  • Set the export path as shown below: export PATH="$PATH:/usr/pgsql-9.6/bin"

  • Enter the wal2json installation commands.

    git clone https://github.com/eulerto/wal2json -b master --single-branch \
    && cd wal2json \
    && git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
    && make && make install \
    && cd .. \
    && rm -rf wal2json
    

Or Using Docker Images: Here

Enable Replication on the PostgreSQL server

  • Open postgresql.conf file. Find and edit as below configuration.
    # LOGGING
    log_min_error_statement = fatal
    # CONNECTION
    listen_addresses = '*'
    # MODULES
    shared_preload_libraries = 'wal2json'
    # REPLICATION
    wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
    max_wal_senders = 1             # max number of walsender processes (change requires restart)
    #wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
    #wal_sender_timeout = 60s       # in milliseconds; 0 disables
    max_replication_slots = 1       # max number of replication slots (change requires restart)
    

Install Kafka Connect (Confluent Platform)

  • Installation Confluent tutorial can be found here

Install the connector

  • Debezium Connector Postgresql:
confluent-hub install debezium/debezium-connector-postgresql:latest 
  • Elasticsearch Sink Connector:
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest 

( Must use ES version >= 4.0 unless when delete operation on postgres, record being deleted won't be deleted on ES)

Connect & Run

  • Register Postgres CDC connector
[POST] http://localhost:8083/connectors/

Body request:

{
    "name": "jpwork-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "plugin.name": "pgoutput",
        "database.hostname": "127.0.0.1",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "postgres",
        "schema.whitelist": "public"
    }
}
[POST] http://localhost:8083/connectors/

Body request:

{
    "name": "elastic-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "postgres.public.category",
        "connection.url": "http://localhost:9200",
        "transforms": "unwrap,key",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.drop.deletes": "false",
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id",
        "key.ignore": "false",
        "type.name": "category",
        "behavior.on.null.values": "delete"
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment