Skip to content

Instantly share code, notes, and snippets.

@jcustenborder
Last active April 1, 2022 20:12
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save jcustenborder/b9b1518cc794e1c1895c3da7abbe9c08 to your computer and use it in GitHub Desktop.
Save jcustenborder/b9b1518cc794e1c1895c3da7abbe9c08 to your computer and use it in GitHub Desktop.
Kafka Connect blog post

Introduction

Kafka has made significant strides in moving data around in the largest organizations in the world. Message based architectures simplify moving data between systems and responding to change in real time. Once you have a message based architecture in place you still have the last mile problem of moving data between your topics and the source or target systems in your organization.

Kafka Connect was introduced in 0.9.0.0 to solve the last mile issue. It provides a fault tolerant, distributed framework for building and scheduling reusable connectors. Managing schema data, distributing tasks across the cluster, rescheduling tasks on failure. Before Kafka Connect it was up to you to handle everything. Deployment, schema management, monitoring, and handling task failure were all up to the developer or operations team.

The API for building a connector makes development a snap. There are a few things that you as the developer need to handle.

For this blog post we are going to walk through creating a twitter source. Our connector will read from Twitter in real time and persist the tweets to a Kafka topic using the Kafka Connect framework. The source for this project is available on github.

TLDR: The absolute most important parts of building a connector are the configuration, the schema, and handling exceptions.

Quickstart

A maven archtype has been created to generate a skeleton connector implementation for you. This will stub out all of the classes you will need for a SourceConnector and SinkConnector. The following command lines will generate a connector for Kafka 0.10.0.0. Replace 0.10.0.0 with later versions. I plan on keeping this archtype up to date with the latest Kafka releases.

mvn archetype:generate -DarchetypeGroupId=io.confluent.maven.archetypes -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=0.10.0.0 \
    -Dpackage=io.confluent.examples.kafka.connect.twitter \
    -DgroupId=io.confluent.examples \
    -DartifactId=kafka-connect-twitter \
    -DpackageName=io.confluent.examples.kafka.connect.twitter \
    -Dversion=1.0-SNAPSHOT

or interactively

mvn archetype:generate -DarchetypeGroupId=io.confluent.maven.archetypes -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=0.10.0.0

Now we have a skeleton of a connector to start with, lets start digging in.

Configuration

One of the most important features of Kafka Connect is the configuration validation features. Your SourceConnector or SinkConnector must implement a config() method which is used to document and verify the configuration for your connector. This allows tools like Confluent Control Center to pull in the configuration details and render a page for editing.

There are a few configuration entries that are important to connect to Twitter. For example you must configure the consumerKey, consumerSecret, accessToken, accessTokenSecret, and ofcourse the searches you are interested in. In the form of a properties file for Kafka Connect, the required configuration this looks like this:

Properties

name=TwitterSourceConnector
tasks.max=1
connector.class=io.confluent.kafka.connect.twitter.TwitterSourceConnector
twitter.oauth.consumerKey=<insert your value>
twitter.oauth.consumerSecret=<insert your value>
twitter.oauth.accessToken=<insert your value>
twitter.oauth.accessTokenSecret=<insert your value>
filter.keywords=olympics,san jose,kafka
kafka.status.topic=twitter
kafka.delete.topic=asdf
process.deletes=false

This is pretty verbose properties file. How are we going to verify all of this configuration information? That part is pretty easy. Kafka has this capability built in. First me must define what the configuration looks like. In the Twitter source connector, this is defined in TwitterSourceConnectorConfig.

Before we walk through TwitterSourceConnectorConfig. Lets chat about the ConfigDef and the AbstractConfig classes. These two classes go hand and hand together. The ConfigDef is used to define what the configuration looks like and the AbstractConfig is used to validate the configuration. Defining the configuration required to parse the properties snippet above looks like this.

conf() method

  ...
  public static final String KAFKA_STATUS_TOPIC_CONF = "kafka.status.topic";
  public static final String KAFKA_STATUS_TOPIC_DOC = "Kafka topic to write the statuses to.";
  public static final String KAFKA_DELETE_TOPIC_CONF = "kafka.delete.topic";
  public static final String KAFKA_DELETE_TOPIC_DOC = "Kafka topic to write delete events to.";
  public static final String PROCESS_DELETES_CONF = "process.deletes";
  public static final String PROCESS_DELETES_DOC = "Should this connector process deletes.";
  
  public static ConfigDef conf() {
    return new ConfigDef()
        .define(TWITTER_DEBUG_CONF, Type.BOOLEAN, false, Importance.LOW, TWITTER_DEBUG_DOC)
        .define(TWITTER_OAUTH_CONSUMER_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_CONSUMER_KEY_DOC)
        .define(TWITTER_OAUTH_SECRET_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_SECRET_KEY_DOC)
        .define(TWITTER_OAUTH_ACCESS_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_DOC)
        .define(TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC)
        .define(FILTER_KEYWORDS_CONF, Type.LIST, Importance.HIGH, FILTER_KEYWORDS_DOC)
        .define(KAFKA_STATUS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_STATUS_TOPIC_DOC)
        .define(KAFKA_DELETE_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_DELETE_TOPIC_DOC)
        .define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC)
        ;
  }

The constants above is a styling preference borrowed from config related code in Apache Kafka. This makes it much easier to find the proper key in unit testing. For example you can set the consumer token using TwitterSourceConnectorConfig.TWITTER_OAUTH_CONSUMER_KEY_CONF. This makes life much easier in unit testing. Secrets such as passwords, api keys, combination to luggage, should be defined as ConfigDef.Type.PASSWORD. This will ensure that passwords are never logged.

During startup you will see the config dumped to the logs. Notice there are a few properties that have [hidden] as the value.

INFO TwitterSourceConnectorConfig values: 
	twitter.oauth.accessTokenSecret = [hidden]
	process.deletes = false
	filter.keywords = [olympics, san jose, kafka]
	kafka.status.topic = twitter
	kafka.delete.topic = asdf
	twitter.oauth.consumerSecret = [hidden]
	twitter.oauth.accessToken = [hidden]
	twitter.oauth.consumerKey = [hidden]
	twitter.debug = false 

To handle validation of the configuration we place this definition in an AbstractConfig. Now that we have this complete we have a ConfigDef defined and a class that implements AbstractConfig. Validating the connector's configuration is as simple as this.

@Override
  public void start(Map<String, String> map) {
    this.config = new TwitterSourceConnectorConfig(map);
    this.settings = map;
  }

A missing value or value that does not validate will give you a nice error message like this:

ERROR Error while starting connector TwitterSourceConnector (org.apache.kafka.connect.runtime.WorkerConnector:109)
org.apache.kafka.common.config.ConfigException: Missing required configuration "twitter.oauth.accessTokenSecret" which has no default value.
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:421)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:55)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at io.confluent.kafka.connect.twitter.TwitterSourceConnectorConfig.<init>(TwitterSourceConnectorConfig.java:38)
	at io.confluent.kafka.connect.twitter.TwitterSourceConnectorConfig.<init>(TwitterSourceConnectorConfig.java:42)
	at io.confluent.kafka.connect.twitter.TwitterSourceConnector.start(TwitterSourceConnector.java:29)
    ...

Schema

The Schema interface is one of the most important components of Kafka Connect. This allows you to define what the data looks like without having to worry about the way the data is stored. Out of the box Kafka ships with support to store data in a JSON format. Using the Confluent Schema Registry, you can be much more efficient with space and CPU by storing the data with Apache AVRO. Enabling AVRO support on the Confluent Platform is a trivial 4 line config change.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://confluent:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confluent:8081

That's it and you can achieve significant space and CPU savings.

Schema Types

You have all of the standard types available to you. ARRAY, BOOLEAN, BYTES, FLOAT32, FLOAT64, INT16, INT32, INT64, INT8, MAP, STRING, STRUCT are available via Schema.Type. There is also support for Date, Decimal, Time, and Timestamp. Defining a solid schema is really important part of a connector. Using descriptive field names and strong types is very important to deliver the most value to your users.

Structs are similar to a json object, or an AVRO record. They have fields, with descriptions, and store values of a specific type. Structs are a very powerful way to describe your data in a portable way. Your goal should be to define a Struct schema that is very descriptive. Source connectors define schemas that are consumed later by streaming applications and other connectors. A great example is streaming data from a database to HDFS. The schema is defined in the JDBC Source and data is written in AVRO or Parquet files to HDFS by the HDFS connector. Schema data is used to power the structure of the files, and manage the HIVE metastore. All of this is powered by the metadata in the schema.

Naming you structs

Ensuring that your struct schema has a fully qualified unique name is very important. Using the avro converter will make this much more of an issue if you decide to use the avro-maven-plugin. The avro-maven-plugin does is not happy when you have multiple schemas with the same name or schemas that do not have a namespace. I'd even consider allowing my users to define the name of the schemas or namespace prefixes in configuration.

Bad

    Schema statusSchemaKey = SchemaBuilder.struct()
        .name("Key")
        .field("Id", Schema.OPTIONAL_INT64_SCHEMA)
        .build();

Good

    Schema statusSchemaKey = SchemaBuilder.struct()
        .name("io.confluent.examples.kafka.connect.twitter.StatusKey")
        .doc("Key for a twitter status.")
        .field("Id", Schema.OPTIONAL_INT64_SCHEMA)
        .build();

The good schema will generate this schema with the AVRO converter.

{
  "type": "record",
  "name": "StatusKey",
  "namespace": "io.confluent.examples.kafka.connect.twitter",
  "fields": [
    {
      "name": "Id",
      "type": [
        "null",
        "long"
      ]
    }
  ],
  "connect.doc": "Key for a twitter status.",
  "connect.name": "io.confluent.examples.kafka.connect.twitter.StatusKey"
}

Keys are important

You'll need to define a key schema and a value schema. We'll chat about the value schema shortly. The key is one of the methods used by Kafka to determine which partition to place your data in. Keys are also used by compacted topics to keep track of the latest version. As a personal preference I prefer to use a struct for the key even if I have a single field. This allows me to add fields to the key schema without breaking the compatibility of existing data. For example what if my connector is keying data off of the hostname now and later I decide that I want to key off the hostname and the source port as well later. Using a struct from the beginning will allow me easily update my key without breaking compatibility with the data that is in existing Kafka topics.

The Twitter plugin was easy to define a key for. Each status update to Twitter has a INT64 for the status id. I defined the key with this status id in mind. This one was an easy pic.

  static final Schema statusSchemaKey;

  static {
    statusSchemaKey = SchemaBuilder.struct()
        .name("io.confluent.examples.kafka.connect.twitter.StatusKey")
        .doc("Key for a twitter status.")
        .field("Id", Schema.OPTIONAL_INT64_SCHEMA)
        .build();
  }      

It's not always going to be an easy pick. You want to choose a key that has the most even distribution as possible. Following the Twitter example, if I used userID as the key I would have all of a users tweets in a single partition. This could be valuable because it would give me a direct timeline of the users tweets with a strong order guarantee. The drawback to this approach is different users tweet more than others. Take my account jcustenborder vs my friend gwenshap, you're looking at 55 tweets vs 16,000 tweets. This distribution could lead to an imbalance between partitions. This should be one of your design decisions. (Seriously Gwen? 160B * 16,000 == 2MB of tweets)

If you are extracting data from a database, the primary key of the table makes a nice natural key. Assuming we are looking at a user profile table, each change to the table represents a save to a user profile. Representing these changes in order makes a lot of sense. You would not want the middle of 3 updates getting written to a search index for example.

This is a spot where understanding the use cases for your connector data can be very important. If for example you are interested in processing a users Twitter data in exact order, using the status id is the wrong key. You would want to use the users id as the key to ensure that the users data always lands in the same partition. This or manually assign the partition to when you create the SourceRecord.

Values are the bulk of your data.

This is where the bulk of the data is represented in a connector. It is really important to convert your data to the proper schema and java types. For example if your data has Timestamp that is mapped to java.util.data in java. Convert the data in the connector and store it strongly typed. This will benefit the consumers of your plugin, and the framework can assist by storing the data in the most optimized way possible. I created the connect-utils project to help with this conversion. The Parser class is a configurable way to handle a lot of this type conversion.

The twitter example is a very complex example because I want to express as much of the status and user objects as possible. The schema is defined in the StatusConverter class. It is made of up a userSchema, placeSchema, geoLocationSchema, and the parent statusSchema. In AVRO form this translates to a 700 line schema.

Connectors

Hopefully you are still here. That was a wall of text. Connectors are why you are here. So far we have gone over how to define you configuration and your schema. Now we are getting to the fun part of actually doing something with out data. There are two types of connectors Source and Sink.

Source connectors are for extracting data into Kafka from a source system. Source connectors are used to convert data from their source to a Schema and a SourceRecord For example this could be reading a database with JDBC and converting each row to a SourceRecord.

Sink connectors are for loading data from Kafka into a target system. Sink connectors convert a Schema and a SinkRecord to a datatype that is specific to the system being implemented. For example for a JDBC Sink connector you would convert each SinkRecord into a write into the target table.

Exceptions

Handling the exceptions that come from your source or target system is important. You need to have a solid understanding of what exceptions are recoverable and what should stop the connector. If you are throwing exceptions try to be as descriptive as possible. It is a win when someone is able to read your exception message, or log messages until the exception and decipher the fix rather than reading the code. Clearly explaining the issue means less implementation time for the user, and less issues added to your project.

Hard exceptions - Oh man we're busted

For example if your connector is authenticating to a system with a username and password. If you receive an exception stating that the authentication failed, this is typically not an exception that can be corrected by retrying. Continuing to attempt to connect could potentially cause an account to be locked out. This is problematic. Throwing a runtime exception like ConnectException will cause the connector to enter a failed state. With proper monitoring this will alert the operators that there is an issue.

Another unrecoverable error might be something configurtion related. Maybe the configuration calls for a file that is on the local filesystem, but the file does not exist on disk. If this file is required this would be an unrecoverable exception that should stop the connector.

Retriable Exceptions - This might work...

Some exceptions you encounter with your source system may be recoverable. For example if you are writing to a web service and it throws a 500. This might be a momentary problem with the service, and retrying at a later time might be successful. In cases like this throwing a RetriableException will cause the Kafka Connect framework to wait and retry the same call again.

Connector Startup

Both SinkConnector and SinkConnector inherit from Connector so the startup procedures are the same.

Task Configuration

During the start(Map<String, String>) method of the TwitterSourceConnector, the values defined in our properties file are passed into our start method as a Map<String, String>. In the example below we create a new instance of the TwitterSourceConnectorConfig which validates the configuration. If there are any problems with the configuration an exception is thrown and the rest of the method is never executed. Once we pass the connector validation we stash our Map<String, String> in a local variable. For other systems this might be the time to perform some other operations like setup an extract or run a configuration procedure. Things that you only want to run once per connector.

  Map<String, String> settings;

  @Override
  public void start(Map<String, String> map) {
    this.config = new TwitterSourceConnectorConfig(map);
    this.settings = map;
  }

The next piece of the lifecycle is to create the task configs. The List<Map<String, String>> taskConfigs(int) method is used to generate the configurations for tasks that will be distributed across the cluster. This is the way to distribute work across the cluster. In our Twitter example the easiest unit of work to distribute is our search queries. In the taskConfigs implementation of the Twitter plugin we compare the maximum number of tasks requested vs the number of queries in the config. Then we evenly distribute the queries across the taskConfig maps. This example is a little complex but it can be as simple as returning a list with the original settings. For example:

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<List<String>> taskConfigs = new ArrayList<>();
    taskConfigs.add(this.settings);
    return taskConfigs;
  }

If we wanted 5 copies of our task running we could do this.

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    if(maxTasks < 5) throw new IllegalStateException("Not enough tasks!");
    List<List<String>> taskConfigs = new ArrayList<>();
    for(int i = 0; i < 5; i++) {
        taskConfigs.add(this.settings);
    }
    return taskConfigs;
  }

Task Startup

The start(Map<String, String>) method of the

Source Connectors

Writing a source connector is pretty straight forward. You need a solid understanding of the system you are communicating with. You also need to have an understanding of how the data will be used. Tasks like removing fields or masking fields should be left to later in the stack or in a streaming job. The best decision for the users of your connector, is to convert all data to it's strongly typed equivalent. No one wants to write a streaming job that reads strings and converts them to integers, etc.

SourceConnector

Your source connector must implement a SourceConnector class. This class has a few responsibilities. First it is used to define the configuration. Second is to define the SourceTask implementation that preform all of the heavy lifting. Finally the SourceConnector is responsible for setting up the task configs that will be distributed throughout the Kafka Connect cluster.

SourceTask

The SourceTask implementation of the connector provides all of the heavy lifting when extracting data from your source system. Twitter4j is a great library that allows you to easily stream data from Twitter in real time. Because of this our connector is slightly different than most. Twitter4j has a great streaming api. In the kafka-connect-twitter plugin we receive Status events from Twitter4j, convert them to a SourceRecord, and place them on a queue. The SourceTask implementation reads from this queue, and hand a list of records over to the Kafka Connect framework. Given the asynchronous nature of the data this plugin is a little different than most.

Converting the Twitter Status messages are handled by the StatusConverter class. This class maps the Status interface of Twitter4j to a Kafka Connect Struct. The convert method takes a Twitter4j Status and a Struct as input mapping all the data.

Lets walk through this. The streaming API for Twitter4j uses a StatusListener interface to receive the status updates as they arrive. TwitterSourceTask implements this interface. When a status update is received by Twitter4j the following method is called.

  @Override
  public void onStatus(Status status) {
    try {
      Struct keyStruct = new Struct(StatusConverter.statusSchemaKey);
      Struct valueStruct = new Struct(StatusConverter.statusSchema);

      StatusConverter.convertKey(status, keyStruct);
      StatusConverter.convert(status, valueStruct);

      Map<String, ?> sourcePartition = ImmutableMap.of();
      Map<String, ?> sourceOffset = ImmutableMap.of();

      SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaStatusTopic(), StatusConverter.statusSchemaKey, keyStruct, StatusConverter.statusSchema, valueStruct);
      this.messageQueue.add(record);
    } catch (Exception ex) {
      if (log.isErrorEnabled()) {
        log.error("Exception thrown", ex);
      }
    }
  }

First thing we do is define our Structs.

      Struct keyStruct = new Struct(StatusConverter.statusSchemaKey);
      Struct valueStruct = new Struct(StatusConverter.statusSchema);

Next we convert the data from the Twitter4j object model to a Kafka Connect struct representation.

      StatusConverter.convertKey(status, keyStruct);
      StatusConverter.convert(status, valueStruct);

Now that we have a key and value for our SourceRecord we can satisfy the rest of the data requirements. Twitter4j is a streaming API that does not allow us to go back in time. For this connector tracking the source offset and the source partition for the data does not make sense. This is why both of these are empty maps. This is not the case so we'll spend some time going over these topics a little later in this post.

      Map<String, ?> sourcePartition = ImmutableMap.of();
      Map<String, ?> sourceOffset = ImmutableMap.of();

Manually assigning the partition for this data does not make much sense given we have a good key to work off of. The status id of the status update gives us a pretty even distribution across partitions. Due to this I've chosen the constructor of SourceRecord that does not specify the partition. Since this happens in a different thread it's not easy for me to had the data over to the poll method. In this case I take the data an place it in a queue that I read from in the poll method.

      SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaStatusTopic(), StatusConverter.statusSchemaKey, keyStruct, StatusConverter.statusSchema, valueStruct);
      this.messageQueue.add(record);

The actual implementation of the poll method for TwitterSourceTask is quite simple. We check the message queue to find the number of messages that are available and pull the messages from the queue transferring them over to a List which is returned to the framework. This List<SourceRecord> returned by the poll() method are processed by the Kafka Connect framework. Within milliseconds this data will be available in the destination topic.

Source Partitions

The general method of scaling Kafka is to increase the partition count and scale the work across more brokers. Kafka Connect can apply a similar philosophy. Source partitions are a way for the developers to break up the work across multiple workers. For example if your connector is reading data from a table with an incrementing primary key, you could use primaryKey % 10 to define the partition. Then each one of your tasks could query with SELECT * FROM TABLE WHERE primaryKey % 10 = 0, primaryKey % 10 = 1, etc. For this example we would use <TABLENAME, 0> as our partition.

Map<String, ?> sourcePartition=ImmutableMap.of("UserActivity", 1)

Implementing source partitions in your connector is not required.

Source Offsets

Source offsets are a way for you to handle restarting your connector. This could be the position in a file or a timestamp of the last poll to the database. For example the JDBC Source connector uses a column with a timestamp to determine what has changed since the last poll. This allows the JDBC connector to issue a simple query that says tell me all of the records that have changed since this timestamp. If the connector is stopped the last offset (timestamp for the JDBC connector) that was returned in the poll method will be the offset available in the offset storage.

SourceTask exposes the SourceTaskContext as this.context. This allows you to access the OffsetStorageReader for the current connector by calling one of the methods under this.context.offsetStorageReader().

Lets work with the database example using a timestamp as an offset. Lets say we are processing partition 4 of a table called UserActivity. During the poll method we would set the partition and offset to look like this:

      Map<String, ?> sourcePartition = ImmutableMap.of("UserActivity", 4);
      Map<String, ?> sourceOffset = ImmutableMap.of("timestamp", 1472933033154L);

During the startup of our SourceTask we can retrieve the last offsets written by passing in the source partition.

      Map<String, ?> sourceOffset = this.context.offsetStorageReader().offset(ImmutableMap.of("UserActivity", 4));

From here we have the last offset we wrote to Kafka. This allows us to start our task and resume from where we left off. This is very powerful. Lets say that the instance running Kafka Connect has a hardware issue and fails. The Kafka Connect cluster will detect that the tasks have failed and restart the tasks. Each one of these tasks are able to determine the last source offset processed and restart from there.

Sink Connectors

A sink connector is your pathway to load data into your target system. Handling data types and data structure is a snap given we have already gone over the schema capabilities built into the Kafka Connect framework. Now our task is to take key and value data given to us in each SinkRecord and persist this data in our target system. Implementing a SinkConnector is as simple as implementing two classes, SinkConnector and SinkTask.

SinkConnector

The sink connector must implement a SinkConnector class. The setup and startup is the same as a SourceConnector. The Connector Startup section of this post describes this in detail.

SinkTask

@ryancrawcour
Copy link

I'd also be real keen seeing the rest of the post about SinkTasks …. that's the part I really need :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment