Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Configuring Kafka Connect with Confluent Cloud

Configuring Kafka Connect with Confluent Cloud

So you want to connect Kafka Connect to Confluent Cloud? Here’s how.

Overview

  1. Configure the worker

  2. Pre-create any topics that your source connector will write to (since auto-topic creation is not enabled on Confluent Cloud)

  3. If you’re using any licensed connectors from Confluent you need to include the Confluent Cloud details in the connector configuration

Configuring the worker

Configuring the worker breaks down into several parts:

  • Specifying the Confluent Cloud broker connection details

  • Optionally, configuring the Kafka Connect Avro converter to connect to Schema Registry in Confluent Cloud

Kafka Connect Worker - configuration items for Confluent Cloud

  • Configure the worker to point to Confluent Cloud: 

    bootstrap.servers=<CCLOUD_BROKER_HOST>
  • Confluent Cloud provides you with redundancy for your data so specify replication factor of 3

    config.storage.replication.factor=3
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
  • This set of four parameters is the necessary security configuration for a client to connect to Confluent Cloud. It’s repeated three times, for the admin client, producer, and consumer in Kafka Connect.

    security.protocol=SASL_SSL
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
    
    consumer.security.protocol=SASL_SSL
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
    
    producer.security.protocol=SASL_SSL
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

Kafka Connect worker - converter configuration for Schema Registry in Confluent Cloud

This configuration sets the default converter for the Kafka Connect worker when serialising message values to Avro, and defines how to access the Schema Registry in Confluent Cloud.

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

If you want to use Avro for your message keys then repeat the configuration but with a key prefix:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
key.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

Configuring connectors

If you’re using a licensed Connector from Confluent you need to include in your connector configuration: 

[]
"confluent.topic.bootstrap.servers" : "${CCLOUD_BROKER_HOST}:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN"

Example configuration

Docker (with Avro converters)

(Why should you use Avro? This is why.)

Create a .env file in the same folder that contains the values for each variable

CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz
CCLOUD_SCHEMA_REGISTRY_URL=https://yyyyyy.aws.confluent.cloud
CCLOUD_SCHEMA_REGISTRY_API_KEY=yyyy
CCLOUD_SCHEMA_REGISTRY_API_SECRET=yyyy

The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.

---
version: '3'
services:
  kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.3.1
    container_name: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'

      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"

Non-Docker (with Avro converters)

bootstrap.servers=<CCLOUD_BROKER_HOST>

security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
value.converter.schema.registry.url=<SR_URL>

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
key.converter.schema.registry.url=<SR_URL>

group.id=connect-cluster

config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

plugin.path=share/java/,share/confluent-hub-components

Docker (with JSON converters)

Create a .env file in the same folder that contains the values for each variable

CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz

The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.

---
version: '3'
services:
  kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.3.1
    container_name: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'

      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
      #
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"

Non-Docker (with JSON converters)

bootstrap.servers=<CCLOUD_BROKER_HOST>

security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

group.id=connect-cluster

config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

plugin.path=share/java/,share/confluent-hub-components
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.