Skip to content

Instantly share code, notes, and snippets.

@rmoff
Created September 26, 2019 11:18
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmoff/49526672990f1b4f7935b62609f6f567 to your computer and use it in GitHub Desktop.
Save rmoff/49526672990f1b4f7935b62609f6f567 to your computer and use it in GitHub Desktop.
Configuring Kafka Connect with Confluent Cloud

Configuring Kafka Connect with Confluent Cloud

So you want to connect Kafka Connect to Confluent Cloud? Here’s how.

Overview

  1. Configure the worker

  2. Pre-create any topics that your source connector will write to (since auto-topic creation is not enabled on Confluent Cloud)

  3. If you’re using any licensed connectors from Confluent you need to include the Confluent Cloud details in the connector configuration

Configuring the worker

Configuring the worker breaks down into several parts:

  • Specifying the Confluent Cloud broker connection details

  • Optionally, configuring the Kafka Connect Avro converter to connect to Schema Registry in Confluent Cloud

Kafka Connect Worker - configuration items for Confluent Cloud

  • Configure the worker to point to Confluent Cloud: 

    bootstrap.servers=<CCLOUD_BROKER_HOST>
  • Confluent Cloud provides you with redundancy for your data so specify replication factor of 3

    config.storage.replication.factor=3
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
  • This set of four parameters is the necessary security configuration for a client to connect to Confluent Cloud. It’s repeated three times, for the admin client, producer, and consumer in Kafka Connect.

    security.protocol=SASL_SSL
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
    
    consumer.security.protocol=SASL_SSL
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
    
    producer.security.protocol=SASL_SSL
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

Kafka Connect worker - converter configuration for Schema Registry in Confluent Cloud

This configuration sets the default converter for the Kafka Connect worker when serialising message values to Avro, and defines how to access the Schema Registry in Confluent Cloud.

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

If you want to use Avro for your message keys then repeat the configuration but with a key prefix:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
key.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

Configuring connectors

If you’re using a licensed Connector from Confluent you need to include in your connector configuration: 

[]
"confluent.topic.bootstrap.servers" : "${CCLOUD_BROKER_HOST}:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN"

Example configuration

Docker (with Avro converters)

(Why should you use Avro? This is why.)

Create a .env file in the same folder that contains the values for each variable

CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz
CCLOUD_SCHEMA_REGISTRY_URL=https://yyyyyy.aws.confluent.cloud
CCLOUD_SCHEMA_REGISTRY_API_KEY=yyyy
CCLOUD_SCHEMA_REGISTRY_API_SECRET=yyyy

The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.

---
version: '3'
services:
  kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.3.1
    container_name: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'

      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"

Non-Docker (with Avro converters)

bootstrap.servers=<CCLOUD_BROKER_HOST>

security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
value.converter.schema.registry.url=<SR_URL>

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
key.converter.schema.registry.url=<SR_URL>

group.id=connect-cluster

config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

plugin.path=share/java/,share/confluent-hub-components

Docker (with JSON converters)

Create a .env file in the same folder that contains the values for each variable

CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz

The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.

---
version: '3'
services:
  kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.3.1
    container_name: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'

      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"

Non-Docker (with JSON converters)

bootstrap.servers=<CCLOUD_BROKER_HOST>

security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

group.id=connect-cluster

config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

plugin.path=share/java/,share/confluent-hub-components
@nuria
Copy link

nuria commented May 7, 2022

Important point is that on confluent cloud most connectors are licensed and as such they need access to license topic and ACLs: https://docs.confluent.io/platform/current/connect/license.html

Otherwise you get what seem errors in auth on sasl that are, rather, inability to access license stored in cluster

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment