Skip to content

Instantly share code, notes, and snippets.

@mcascallares
Last active October 13, 2023 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcascallares/bfc4872f292ba6d369d9abb6ae9312db to your computer and use it in GitHub Desktop.
Save mcascallares/bfc4872f292ba6d369d9abb6ae9312db to your computer and use it in GitHub Desktop.
kafka-connect-offset-sandbox

CSE Day

Compile Kafka 3.6

jenv global 11

git clone https://github.com/apache/kafka.git

git checkout 3.6.0

./gradlew releaseTarGz

Configure and start Kafka (Kraft mode)

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

Configure and start connect (Distributed mode)

bin/connect-distributed.sh config/connect-distributed.properties

Produce some sample data

kafka-topics --bootstrap-server localhost:9092 \
	--create \
	--topic foo \
	--partitions 1 \
	--replication-factor 1

kafka-producer-perf-test  \
    --producer-props bootstrap.servers=localhost:9092 \
    --topic foo \
    --record-size 1024 \
    --throughput 10000 \
    --num-records 1000000

kafka-console-consumer --bootstrap-server localhost:9092 \
	--topic foo

Configure Mirror Maker Source Connector

Let's replicate data from the topic foo into another topic, within the same cluster

curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d '
{
    "name": "my-connector",
    "config": {
	    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
	    "source.cluster.bootstrap.servers": "localhost:9092",
	    "target.cluster.bootstrap.servers":"localhost:9092",
	    "source.cluster.alias": "source",
	    "topics": "foo",
		"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"schemas.enable": false
    }
}' | jq .

Retrieve offsets with the new API

Request

curl -s -XGET http://localhost:8083/connectors/my-connector/offsets | jq .

Response

{
  "offsets": [
    {
      "partition": {
        "cluster": "my-source",
        "partition": 0,
        "topic": "foo"
      },
      "offset": {
        "offset": 933997
      }
    }
  ]
}

Delete offsets

Request

curl -s -XDELETE http://localhost:8083/connectors/my-connector/offsets | jq .

Response

{
  "error_code": 400,
  "message": "Connectors must be in the STOPPED state before their offsets can be modified. This can be done for the specified connector by issuing a 'PUT' request to the '/connectors/my-connector/stop' endpoint"
}

New target state: STOPPED

Background: target states today

Kafka Connect currently supports two "target states" for a connector: RUNNING (the default), and PAUSED. The target state for a connector can be controlled using the REST API, using the PUT /connectors/connector/resume endpoint for the RUNNING state and the PUT /connectors/connector/pause endpoint for the PAUSED state. When a connector is paused, its tasks continue to exist on the cluster. Many resources allocated for them, including Kafka and Kafka Connect resources such as clients, converters, and transformations, and connector-specific resources such as database connections, file descriptors, memory buffers, remain allocated. This can lead to confusing and sometimes suboptimal behavior when users pause connectors but notice that resources (especially database connections) are still allocated. In addition, connectors and tasks are allocated across the cluster without regard to their target state during rebalance. This can lead to some skew in resource usage (such as network traffic and CPU utilization) across the cluster; in the worst case, all paused tasks are allocated to one set of workers, and all running tasks are allocated to a disjoint set of workers.

A new STOPPED state

A new target state will be added for connectors: STOPPED. The semantics for a connector that becomes stopped will be: - The connector config remains present in the config topic of the cluster (if running in distributed mode), unmodified - The connector config remains visible in the REST API - All tasks for the connector are shut down completely. If running in distributed mode, a set of empty tasks is published to the config topic for the connector - If running in distributed mode, as a result of the empty set of task configs being published to the config topic, a rebalance will be triggered, and no tasks for the connector will be allocated across the cluster - As a further result of that rebalance, any information on the connector provided by the REST API will show it with an empty set of tasks - The Connector, if running, will be shut down (by invoking Connector::stop and deallocating all Kafka Connect- and Kafka-related resources allocated for it by Kafka Connect) - The Connector will still appear in the status portion of the REST API, with a state of STOPPED. This will take place even if the connector was in the FAILED state before the request to stop it, or if it failed during shutdown in response to a request to stop - If running in distributed mode, the Connector will still be assigned to a worker during each rebalance - The Connector will not be started (by invoking Connector::start), and it will not be able to generate new task configurations (by invoking ConnectorContext::requestTaskReconfiguration)

curl -s -XPUT http://localhost:8083/connectors/my-connector/stop | jq .

Now that's stopped, let's delete the offset

curl -s -XDELETE http://localhost:8083/connectors/my-connector/offsets | jq .

Response

{
  "message": "The offsets for this connector have been reset successfully"
}

From the KIP documentation

A source offset will only be considered successfully deleted if the Connect worker is able to emit a tombstone to the offsets topic for its partition, and then read to the end of the offsets topic. A request to reset offsets for a source connector will only be considered successful if the worker is able to delete all known offsets for that connector, on both the worker's global offsets topic and (if one is used) the connector's dedicated offsets topic.

Let's validate offsets are gone!

curl -s -XGET http://localhost:8083/connectors/my-connector/offsets | jq .

Response

{
  "offsets": []
}

Let's restart the connector

curl -s -XPUT http://localhost:8083/connectors/my-connector/resume | jq .

Let's check the offsets

curl -s -XGET http://localhost:8083/connectors/my-connector/offsets | jq .

Response

{
  "offsets": [
    {
      "partition": {
        "cluster": "source",
        "partition": 0,
        "topic": "foo"
      },
      "offset": {
        "offset": 740680
      }
    }
  ]
}

Let's wait some time, and the response

{
  "offsets": [
    {
      "partition": {
        "cluster": "source",
        "partition": 0,
        "topic": "foo"
      },
      "offset": {
        "offset": 1453260
      }
    }
  ]
}

Cherrypicking an offset

kafka-topics --bootstrap-server localhost:9092 --create --topic foo2 --partitions 1 --replication-factor 1
kafka-console-producer --bootstrap-server localhost:9092 --topic foo2
>1
>2
>3
>4
>5
>6
>7
>8
>9
>^C% 
curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d '
{
    "name": "my-connector2",
    "config": {
		"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
	    "source.cluster.bootstrap.servers": "localhost:9092",
	    "target.cluster.bootstrap.servers":"localhost:9092",
	    "source.cluster.alias": "source",
	    "topics": "foo2",
		"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"schemas.enable": false
    }
}' | jq .
curl -s -XGET http://localhost:8083/connectors/my-connector2/offsets | jq .
{
  "offsets": [
    {
      "partition": {
        "cluster": "source",
        "partition": 0,
        "topic": "foo2"
      },
      "offset": {
        "offset": 7
      }
    }
  ]
}
curl -s -XPUT http://localhost:8083/connectors/my-connector2/stop | jq .
curl -s -XPATCH -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors/my-connector2/offsets -d '
{
  "offsets": [
    {
      "partition": {
        "cluster": "source",
        "partition": 0,
        "topic": "foo2"
      },
      "offset": {
        "offset": 3
      }
    }
  ]
}' | jq .
{
  "message": "The offsets for this connector have been altered successfully"
}
curl -s -XPUT http://localhost:8083/connectors/my-connector2/resume | jq .

Configure a Sink Connector (Shell Sink)

Find more information about the connector here After installing it:

curl -s -XGET http://localhost:8083/connector-plugins | jq .
[
  {
    "class": "uk.co.threefi.connect.shell.ShellSinkConnector",
    "type": "sink",
    "version": "5.1.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.6.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.6.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.6.0"
  }
]

Let's create the connector

curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d '
{
    "name": "my-sink-connector",
    "config": {
		"connector.class" : "uk.co.threefi.connect.shell.ShellSinkConnector",
        "tasks.max" : "1", 
        "shell.command" : "echo \"${value}\" >> /tmp/foo2",
        "topics" : "foo2",
		"schemas.enable" : false
    }
}' | jq .

Let's update the offset, note the payload that is different than source connector!

curl -s -XPUT http://localhost:8083/connectors/my-sink-connector/stop | jq .
curl -s -XPATCH -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors/my-sink-connector/offsets -d '
{
  "offsets": [
    {
      "partition": {
        "kafka_topic": "foo2",
        "kafka_partition": 0
      },
      "offset": {
        "kafka_offset": 0
      }
    }
  ]
}' | jq .

Check the return message!

{
  "message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment