Skip to content

Instantly share code, notes, and snippets.

@sascala
Last active June 22, 2017 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sascala/04922afb0fed953c6852dd8836c88a1d to your computer and use it in GitHub Desktop.
Save sascala/04922afb0fed953c6852dd8836c88a1d to your computer and use it in GitHub Desktop.
beta-kafka docs

API Reference

REST API Authentication

REST API requests must be authenticated. This authentication is only applicable for interacting with the Kafka REST API directly. You do not need the token to access the Kafka nodes themselves.

If you are using Enterprise DC/OS, follow these instructions to create a service account and an authentication token. You can then configure your service to automatically refresh the authentication token when it expires. To get started more quickly, you can also get the authentication token without a service account, but you will need to manually refresh the token.

If you are using open source DC/OS, follow these instructions to pass your authentication token to the DC/OS endpoint.

Once you have the authentication token, you can store it in an environment variable and reference it in your REST API calls:

$ export auth_token=uSeR_t0k3n

The curl examples in this document assume that an auth token has been stored in an environment variable named auth_token.

If you are using Enterprise DC/OS, the security mode of your installation may also require the --ca-cert flag when making REST calls. Refer to Obtaining and passing the DC/OS certificate in cURL requests for information on how to use the --cacert flag. If your security mode is disabled, do not use the --ca-cert flag.

For ongoing maintenance of the Kafka cluster itself, the Kafka service exposes an HTTP API whose structure is designed to roughly match the tools provided by the Kafka distribution, such as bin/kafka-topics.sh.

The examples here provide equivalent commands using both the DC/OS CLI (with the kafka CLI module installed) and curl. These examples assume a service named kafka (the default), and the curl examples assume a DC/OS cluster path of <dcos_url>. Replace these with appropriate values as needed.

The dcos kafka CLI commands have a --name argument, allowing the user to specify which Kafka instance to query. The value defaults to kafka, so it's technically redundant to specify --name=kafka in these examples.

Connection Information

Kafka comes with many useful tools of its own that often require either Zookeeper connection information or the list of broker endpoints. This information can be retrieved in an easily consumable format from the /connection endpoint:

$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/endpoints/broker"
GET /service/kafka/v1/connection HTTP/1.1

{
  "vips": [
    "broker.kafka.l4lb.thisdcos.directory:9092"
  ],
  "address": [
    "10.0.0.35:1028",
    "10.0.1.249:1030"
  ],
  "dns": [
    "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1028",
    "kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1030"
  ],
}

The same information can be retrieved through the DC/OS CLI:

$ dcos kafka endpoints broker
{
  "vips": [
    "broker.kafka.l4lb.thisdcos.directory:9092"
  ],
  "address": [
    "10.0.0.35:1028",
    "10.0.1.249:1030"
  ],
  "dns": [
    "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1028",
    "kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1030"
  ],
}

Broker Operations

Add Broker

Increase the BROKER_COUNT value via Marathon. This should be rolled as in any other configuration update.

List All Brokers

$ dcos kafka --name=kafka broker list
{
    "brokers": [
        "0",
        "1",
        "2"
    ]
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/brokers"
GET /service/kafka/v1/brokers HTTP/1.1

{
    "brokers": [
        "0",
        "1",
        "2"
    ]
}

Restart Single Broker

Restarts the broker in-place.

$ dcos kafka --name=kafka broker restart 0
[
    "broker-0__9c426c50-1087-475c-aa36-cd00d24ccebb"
]


$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/brokers/0"
PUT /service/kafka/v1/brokers/0 HTTP/1.1

[
    "broker-0__9c426c50-1087-475c-aa36-cd00d24ccebb"
]

Replace Single Broker

Restarts the broker and replaces its existing resource/volume allocations. The new broker instance may also be placed on a different machine.

$ dcos kafka --name=kafka broker replace 0
[
    "broker-0__9c426c50-1087-475c-aa36-cd00d24ccebb"
]


$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/brokers/0?replace=true"
PUT /service/kafka/v1/brokers/0 HTTP/1.1

[
    "broker-0__9c426c50-1087-475c-aa36-cd00d24ccebb"
]

Topic Operations

These operations mirror what is available with bin/kafka-topics.sh.

List Topics

$ dcos kafka --name=kafka topic list
[
    "topic1",
    "topic0"
]


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics"
GET /service/kafka/v1/topics HTTP/1.1

[
    "topic1",
    "topic0"
]

Describe Topic

$ dcos kafka --name=kafka topic describe topic1
{
    "partitions": [
        {
            "0": {
                "controller_epoch": 1,
                "isr": [
                    0,
                    1,
                    2
                ],
                "leader": 0,
                "leader_epoch": 0,
                "version": 1
            }
        },
        {
            "1": {
                "controller_epoch": 1,
                "isr": [
                    1,
                    2,
                    0
                ],
                "leader": 1,
                "leader_epoch": 0,
                "version": 1
            }
        },
        {
            "2": {
                "controller_epoch": 1,
                "isr": [
                    2,
                    0,
                    1
                ],
                "leader": 2,
                "leader_epoch": 0,
                "version": 1
            }
        }
    ]
}


$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
GET /service/kafka/v1/topics/topic1 HTTP/1.1

{
    "partitions": [
        {
            "0": {
                "controller_epoch": 1,
                "isr": [
                    0,
                    1,
                    2
                ],
                "leader": 0,
                "leader_epoch": 0,
                "version": 1
            }
        },
        {
            "1": {
                "controller_epoch": 1,
                "isr": [
                    1,
                    2,
                    0
                ],
                "leader": 1,
                "leader_epoch": 0,
                "version": 1
            }
        },
        {
            "2": {
                "controller_epoch": 1,
                "isr": [
                    2,
                    0,
                    1
                ],
                "leader": 2,
                "leader_epoch": 0,
                "version": 1
            }
        }
    ]
}

Create Topic

$ dcos kafka --name=kafka topic create topic1 --partitions=3 --replication=3
{
    "message": "Output: Created topic "topic1".n"
}


$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics?name=topic1&partitions=3&replication=3"
POST /service/kafka/v1/topics?replication=3&name=topic1&partitions=3 HTTP/1.1

{
    "message": "Output: Created topic "topic1".n"
}

View Topic Offsets

There is an optional --time parameter which may be set to either "first", "last", or a timestamp in milliseconds as described in the Kafka documentation.

$ dcos kafka --name=kafka topic offsets topic1 --time=last
[
    {
        "2": "334"
    },
    {
        "1": "333"
    },
    {
        "0": "333"
    }
]


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/offsets?time=last"
GET /service/kafka/v1/topics/topic1/offsets?time=last HTTP/1.1

[
    {
        "2": "334"
    },
    {
        "1": "333"
    },
    {
        "0": "333"
    }
]

Alter Topic Partition Count

$ dcos kafka --name=kafka topic partitions topic1 2
{
    "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}


$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=partitions&partitions=2"
PUT /service/kafka/v1/topics/topic1?operation=partitions&partitions=2 HTTP/1.1

{
    "message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}

Run Producer Test on Topic

$ dcos kafka --name=kafka topic producer_test topic1 10

{
    "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}


$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=producer-test&messages=10"
PUT /service/kafka/v1/topics/topic1?operation=producer-test&messages=10 HTTP/1.1

{
    "message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}

Runs the equivalent of the following command from the machine running the Kafka Scheduler:

kafka-producer-perf-test.sh
    --topic <topic>
    --num-records <messages>
    --throughput 100000
    --record-size 1024
    --producer-props bootstrap.servers=<current broker endpoints>

Delete Topic

$ dcos kafka --name=kafka topic delete topic1

{
    "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}


$ curl -X DELETE -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
DELETE /service/kafka/v1/topics/topic1 HTTP/1.1

{
    "message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}

Note the warning in the output from the commands above. You can change the indicated "delete.topic.enable" configuration value as a configuration change.

List Under Replicated Partitions

$ dcos kafka --name=kafka topic under_replicated_partitions

{
    "message": ""
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/under_replicated_partitions"
GET /service/kafka/v1/topics/under_replicated_partitions HTTP/1.1

{
    "message": ""
}

List Unavailable Partitions

$ dcos kafka --name=kafka topic unavailable_partitions

{
    "message": ""
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/unavailable_partitions"
GET /service/kafka/v1/topics/unavailable_partitions HTTP/1.1

{
    "message": ""
}

Service Status

Send a GET request to the /v1/state/properties/suppressed endpoint to learn if Kafka is in a suppressed state and not receiving offers. If a service does not need offers, Mesos can "suppress" it so that other services are not starved for resources.

You can use this request to troubleshoot: if you think Kafka should be receiving resource offers, but is not, you can use this API call to see if Kafka is suppressed. You will receive a response of true or false.

curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/state/properties/suppressed"

Config History

These operations relate to viewing the service's configuration history.

List Configuration IDs

$ dcos kafka --name=kafka config list

[
    "319ebe89-42e2-40e2-9169-8568e2421023",
    "294235f2-8504-4194-b43d-664443f2132b"
]


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/configurations"
GET /service/kafka/v1/configurations HTTP/1.1

[
    "319ebe89-42e2-40e2-9169-8568e2421023",
    "294235f2-8504-4194-b43d-664443f2132b"
]

Describe Configuration

This configuration shows a default per-broker memory allocation of 2048 (configured via the BROKER_MEM parameter):

$ dcos kafka --name=kafka config describe 319ebe89-42e2-40e2-9169-8568e2421023

{
    "brokerConfiguration": {
        "containerHookUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/container-hook-0.2.5.tgz",
        "cpus": 1,
        "disk": 5000,
        "diskType": "ROOT",
        "javaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/jre-8u72-linux-x64.tar.gz",
        "kafkaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/kafka_2.10-0.9.0.1.tgz",
        "mem": 2048, // <<--
        "overriderUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/overrider.zip"
    },
    "kafkaConfiguration": {
        [...]
    },
    "serviceConfiguration": {
        "count": 3,
        "name": "kafka",
        "phaseStrategy": "INSTALL",
        "placementStrategy": "NODE",
        "principal": "kafka-principal",
        "role": "kafka-role",
        "user": ""
    }
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/configurations/319ebe89-42e2-40e2-9169-8568e2421023"
GET /service/kafka/v1/configurations/319ebe89-42e2-40e2-9169-8568e2421023 HTTP/1.1

{
    "brokerConfiguration": {
        "containerHookUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/container-hook-0.2.5.tgz",
        "cpus": 1,
        "disk": 5000,
        "diskType": "ROOT",
        "javaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/jre-8u72-linux-x64.tar.gz",
        "kafkaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/kafka_2.10-0.9.0.1.tgz",
        "mem": 2048, // <<--
        "overriderUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/overrider.zip"
    },
    "kafkaConfiguration": {
        [...]
    },
    "serviceConfiguration": {
        "count": 3,
        "name": "kafka",
        "phaseStrategy": "INSTALL",
        "placementStrategy": "NODE",
        "principal": "kafka-principal",
        "role": "kafka-role",
        "user": ""
    }
}

Describe Target Configuration

The target configuration, meanwhile, shows an increase of configured per-broker memory from 2048 to 4096 (again, configured as BROKER_MEM):

$ dcos kafka --name=kafka config target

{
    "brokerConfiguration": {
        "containerHookUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/container-hook-0.2.5.tgz",
        "cpus": 1,
        "disk": 5000,
        "diskType": "ROOT",
        "javaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/jre-8u72-linux-x64.tar.gz",
        "kafkaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/kafka_2.10-0.9.0.1.tgz",
        "mem": 4096, // <<--
        "overriderUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/overrider.zip"
    },
    "kafkaConfiguration": {
        [...]
    },
    "serviceConfiguration": {
        "count": 3,
        "name": "kafka",
        "phaseStrategy": "INSTALL",
        "placementStrategy": "NODE",
        "principal": "kafka-principal",
        "role": "kafka-role",
        "user": ""
    }
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/configurations/target"
GET /service/kafka/v1/configurations/target HTTP/1.1

{
    "brokerConfiguration": {
        "containerHookUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/container-hook-0.2.5.tgz",
        "cpus": 1,
        "disk": 5000,
        "diskType": "ROOT",
        "javaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/jre-8u72-linux-x64.tar.gz",
        "kafkaUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/kafka_2.10-0.9.0.1.tgz",
        "mem": 4096, // <<--
        "overriderUri": "https://s3.amazonaws.com/downloads.mesosphere.io/kafka/assets/0.2.5-0.9.0.1/overrider.zip"
    },
    "kafkaConfiguration": {
        [...]
    },
    "serviceConfiguration": {
        "count": 3,
        "name": "kafka",
        "phaseStrategy": "INSTALL",
        "placementStrategy": "NODE",
        "principal": "kafka-principal",
        "role": "kafka-role",
        "user": ""
    }
}

Config Updates

These options relate to viewing and controlling rollouts and configuration updates.

View Plan Status

Displays all Phases and Steps in the service Plan. If a rollout is currently in progress, this returns a 503 HTTP code with response content otherwise unchanged.

$ dcos kafka --name=kafka plan
GET /service/kafka/v1/plan HTTP/1.1

{
  "phases": [
    {
      "id": "1915bcad-1235-400f-8406-4ac7555a7d34",
      "name": "Reconciliation",
      "steps": [
        {
          "id": "9854a67d-7803-46d0-b278-402785fe3199",
          "status": "COMPLETE",
          "name": "Reconciliation",
          "message": "Reconciliation complete"
        }
      ],
      "status": "COMPLETE"
    },
    {
      "id": "3e72c258-1ead-465f-871e-2a305d29124c",
      "name": "Update to: 329ef254-7331-48dc-a476-8a0e45752871",
      "steps": [
        {
          "id": "ebf4cb02-1011-452a-897a-8c4083188bb2",
          "status": "COMPLETE",
          "name": "broker-0",
          "message": "Broker-0 is COMPLETE"
        },
        {
          "id": "ff9e74a7-04fd-45b7-b44c-00467aaacd5b",
          "status": "IN_PROGRESS",
          "name": "broker-1",
          "message": "Broker-1 is IN_PROGRESS"
        },
        {
          "id": "a2ba3969-cb18-4a05-abd0-4186afe0f840",
          "status": "PENDING",
          "name": "broker-2",
          "message": "Broker-2 is PENDING"
        }
      ],
      "status": "IN_PROGRESS"
    }
  ],
  "errors": [],
  "status": "IN_PROGRESS"
}


$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan"
GET /service/kafka/v1/plan HTTP/1.1

{
  "phases": [
    {
      "id": "1915bcad-1235-400f-8406-4ac7555a7d34",
      "name": "Reconciliation",
      "steps": [
        {
          "id": "9854a67d-7803-46d0-b278-402785fe3199",
          "status": "COMPLETE",
          "name": "Reconciliation",
          "message": "Reconciliation complete"
        }
      ],
      "status": "COMPLETE"
    },
    {
      "id": "3e72c258-1ead-465f-871e-2a305d29124c",
      "name": "Update to: 329ef254-7331-48dc-a476-8a0e45752871",
      "steps": [
        {
          "id": "ebf4cb02-1011-452a-897a-8c4083188bb2",
          "status": "COMPLETE",
          "name": "broker-0",
          "message": "Broker-0 is COMPLETE"
        },
        {
          "id": "ff9e74a7-04fd-45b7-b44c-00467aaacd5b",
          "status": "COMPLETE",
          "name": "broker-1",
          "message": "Broker-1 is COMPLETE"
        },
        {
          "id": "a2ba3969-cb18-4a05-abd0-4186afe0f840",
          "status": "COMPLETE",
          "name": "broker-2",
          "message": "Broker-2 is COMPLETE"
        }
      ],
      "status": "COMPLETE"
    }
  ],
  "errors": [],
  "status": "COMPLETE"
}

Upgrade Interaction

These operations are only applicable when PHASE_STRATEGY is set to STAGE, they have no effect when it is set to INSTALL. See the Changing Configuration at Runtime part of the Configuring section for more information.

Continue

$ dcos kafka --name=kafka continue
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan/continue"

Interrupt

$ dcos kafka --name=kafka interrupt
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan/interrupt"

Configure

Changing Configuration at Runtime

You can customize your cluster in-place when it is up and running.

The Kafka scheduler runs as a Marathon process and can be reconfigured by changing values from the DC/OS web interface. These are the general steps to follow:

  1. Go to the Services tab of the DC/OS web interface.
  2. Click the name of the Kafka service to be updated.
  3. Within the Kafka instance details view, click the menu in the upper right, then choose Edit.
  4. In the dialog that appears, click the Environment tab and update any field(s) to their desired value(s). For example, to increase the number of Brokers, edit the value for BROKER_COUNT. Do not edit the value for FRAMEWORK_NAME or BROKER_DISK.
  5. Choose a DEPLOY_STRATEGY: serial, serial-canary, parallel-canary, or parallel. See the SDK Developer guide for more information on deployment plan strategies.
  6. Click REVIEW & RUN to apply any changes and cleanly reload the Kafka scheduler. The Kafka cluster itself will persist across the change.

Configuration Update REST API

Make the REST request below to view the current deployment plan. See the REST API Authentication part of the REST API Reference section for information on how this request must be authenticated.

curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan"

{
  "phases" : [
    {
      "id" : "b6180a4e-b25f-4307-8855-0b37d671fd46",
      "name" : "Deployment",
      "steps" : [
        {
          "id" : "258f19a4-d6bc-4ff1-8685-f314924884a1",
          "status" : "COMPLETE",
          "name" : "kafka-0:[broker]",
          "message" : "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-0:[broker] [258f19a4-d6bc-4ff1-8685-f314924884a1]' has status: 'COMPLETE'."
        },
        {
          "id" : "e59fb2a9-22e2-4900-89e3-bda24041639f",
          "status" : "COMPLETE",
          "name" : "kafka-1:[broker]",
          "message" : "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-1:[broker] [e59fb2a9-22e2-4900-89e3-bda24041639f]' has status: 'COMPLETE'."
        },
        {
          "id" : "0b5a5048-fd3a-4b2c-a9b5-746045176d29",
          "status" : "COMPLETE",
          "name" : "kafka-2:[broker]",
          "message" : "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-2:[broker] [0b5a5048-fd3a-4b2c-a9b5-746045176d29]' has status: 'COMPLETE'."
        }
      ],
    "status" : "COMPLETE"
  }
],
"errors" : [ ],
"status" : "COMPLETE"

}

Note: After a configuration update, you may see an error from Mesos-DNS; this will go away 10 seconds after the update.

Enter the continue command to execute the first step:

curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan?cmd=continue"
PUT <dcos_url>/service/kafka/v1/continue HTTP/1.1

{
    "Result": "Received cmd: continue"
}

After you execute the continue operation, the plan will look like this:

curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan"
GET <dcos_url>/service/kafka/v1/plan HTTP/1.1

{
  "phases": [
    {
      "id": "9f8927de-d0df-4f72-bd0d-55e3f2c3ab21",
      "name": "Reconciliation",
      "steps": [
        {
          "id": "2d137273-249b-455e-a65c-3c83228890b3",
          "status": "COMPLETE",
          "name": "Reconciliation",
          "message": "Reconciliation complete"
        }
      ],
      "status": "COMPLETE"
    },
    {
      "id": "a7742963-f7e1-4640-8bd0-2fb28dc04045",
      "name": "Update to: 6092e4ec-8ffb-49eb-807b-877a85ef8859",
      "steps": [
        {
          "id": "b4453fb0-b4cc-4996-a05c-762673f75e6d",
          "status": "IN_PROGRESS",
          "name": "broker-0",
          "message": "Broker-0 is IN_PROGRESS"
        },
        {
          "id": "b8a8de9f-8758-4d0f-b785-0a38751a2c94",
          "status": "WAITING",
          "name": "broker-1",
          "message": "Broker-1 is WAITING"
        },
        {
          "id": "49e85522-1bcf-4edb-9456-712e8a537dbc",
          "status": "PENDING",
          "name": "broker-2",
          "message": "Broker-2 is PENDING"
        }
      ],
      "status": "IN_PROGRESS"
    }
  ],
  "errors": [],
  "status": "IN_PROGRESS"
}   

If you enter continue a second time, the rest of the plan will be executed without further interruption. If you want to interrupt a configuration update that is in progress, enter the interrupt command:

curl -X PUT -H "Authorization: token=$auth_token"  "<dcos_url>/service/kafka/v1/plan?cmd=interrupt"
PUT <dcos_url>/service/kafka/v1/interrupt HTTP/1.1

{
    "Result": "Received cmd: interrupt"
}

Note: The interrupt command can’t stop a step that is InProgress, but it will stop the change on the subsequent steps.

Configuration Options

The following describes the most commonly used features of the Kafka service and how to configure them via the DC/OS CLI and from the DC/OS web interface. View the default config.json in DC/OS Universe to see all possible configuration options.

Service Name

The name of this Kafka instance in DC/OS. This is an option that cannot be changed once the Kafka cluster is started: it can only be configured via the DC/OS CLI --options flag when the Kafka instance is created.

  • In DC/OS CLI options.json: name: string (default: kafka)
  • DC/OS web interface: The service name cannot be changed after the cluster has started.

Broker Count

Configure the number of brokers running in a given Kafka cluster. The default count at installation is three brokers. This number may be increased, but not decreased, after installation.

  • In DC/OS CLI options.json: broker-count: integer (default: 3)
  • DC/OS web interface: BROKER_COUNT: integer

Broker Port

Configure the port number that the brokers listen on. If the port is set to a particular value, this will be the port used by all brokers. The default port is 9092. Note that this requires that placement-strategy be set to NODE to take effect, since having every broker listening on the same port requires that they be placed on different hosts. Setting the port to 0 indicates that each Broker should have a random port in the 9092-10092 range.

  • In DC/OS CLI options.json: broker-port: integer (default: 9092)
  • DC/OS web interface: BROKER_PORT: integer

Configure Broker Placement Strategy

ANY allows brokers to be placed on any node with sufficient resources, while NODE ensures that all brokers within a given Kafka cluster are never colocated on the same node. This is an option that cannot be changed once the Kafka cluster is started: it can only be configured via the DC/OS CLI --options flag when the Kafka instance is created.

  • In DC/OS CLI options.json: placement-strategy: ANY or NODE (default: ANY)
  • DC/OS web interface: PLACEMENT_STRATEGY: ANY or NODE

Configure Kafka Broker Properties

Kafka Brokers are configured through settings in a server.properties file deployed with each Broker. The settings here can be specified at installation time or during a post-deployment configuration update. They are set in the DC/OS Universe's config.json as options such as:

"log_retention_hours": {
    "title": "log.retention.hours",
    "description": "Override log.retention.hours: The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property",
    "type": "integer",
    "default": 168
},

The defaults can be overridden at install time by specifying an options.json file with a format like this:

{
    "kafka": {
        "log_retention_hours": 100
    }
}

These same values are also represented as environment variables for the scheduler in the form KAFKA_OVERRIDE_LOG_RETENTION_HOURS and may be modified through the DC/OS web interface and deployed during a rolling upgrade as described here.

Disk Type

The type of disks that can be used for storing broker data are: ROOT (default) and MOUNT. The type of disk may only be specified at install time.

  • ROOT: Broker data is stored on the same volume as the agent work directory. Broker tasks will use the configured amount of disk space.
  • MOUNT: Broker data will be stored on a dedicated volume attached to the agent. Dedicated MOUNT volumes have performance advantages and a disk error on these MOUNT volumes will be correctly reported to Kafka.

Configure Kafka service to use dedicated disk volumes:

  • DC/OS cli options.json:
    {
        "brokers": {
            "disk_type": "MOUNT"
        }
    }
  • DC/OS web interface: Set the environment variable DISK_TYPE: MOUNT

When configured to MOUNT disk type, the scheduler selects a disk on an agent whose capacity is equal to or greater than the configured disk value.

JVM Heap Size

Kafka service allows configuration of JVM Heap Size for the broker JVM process. To configure it:

  • DC/OS cli options.json:
    {
        "brokers": {
            "heap": {
                "size": 2000
            }
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEAP_MB: 2000

Note: The total memory allocated for the Mesos task is specified by the BROKER_MEM configuration parameter. The value for BROKER_HEAP_MB should not be greater than BROKER_MEM value. Also, if BROKER_MEM is greater than BROKER_HEAP_MB then the Linux operating system will use BROKER_MEM - BROKER_HEAP_MB for PageCache.

Alternate ZooKeeper

By default the Kafka framework uses the ZooKeeper ensemble made available on the Mesos masters of a DC/OS cluster. You can configure an alternate ZooKeeper at install time. To configure it:

  • DC/OS CLI options.json:
    {
        "kafka": {
            "kafka_zookeeper_uri": "zookeeper.marathon.autoip.dcos.thisdcos.directory:2181"
        }
    }

This configuration option cannot be changed after installation.

Recovery and Health Checks

You can enable automated replacement of brokers and configure the circumstances under which they are replaced.

Enable Broker Replacement

To enable automated replacement:

  • DC/OS CLI options.json:
    {
        "enable_replacement":{
            "description":"Enable automated replacement of Brokers. WARNING: May cause data loss. See documentation.",
            "type":"boolean",
            "default":false
        }
    }
  • DC/OS web interface: Set the environment variable ENABLE_REPLACEMENT: true to enable replacement.

Warning: The replacement mechanism is unaware of whether the broker it is destructively replacing had the latest copy of data. Depending on your replication policy and the degree and duration of the permanent failures, you may lose data.

The following configuration options control the circumstances under which a broker is replaced.

Minumum Grace Period

Configure the minimum amount of time before a broker should be replaced:

  • DC/OS CLI options.json:
    {   
        "recover_in_place_grace_period_secs":{
            "description":"The minimum amount of time (in minutes) which must pass before a Broker may be destructively replaced.",
            "type":"number",
            "default":1200
        }
    }
  • DC/OS web interface: Set the environment variable RECOVERY_GRACE_PERIOD_SEC: 1200

Minumum Delay Between Replacements

Configure the minimum amount of time between broker replacements.

    {
        "min_delay_between_recovers_secs":{
            "description":"The minimum amount of time (in seconds) which must pass between destructive replacements of Brokers.",
            "type":"number",
            "default":600
        }
    }
  • DC/OS web interface: Set the environment variable REPLACE_DELAY_SEC: 600

The following configurations control the health checks that determine when a broker has failed:

Enable Health Check

Enable health checks on brokers:

    {
        "enable_health_check":{
            "description":"Enable automated detection of Broker failures which did not result in a Broker process exit.",
            "type":"boolean",
            "default":true
        }
    }
  • DC/OS web interface: Set the environment variable ENABLE_BROKER_HEALTH_CHECK: true

Health Check Delay

Set the amount of time before the health check begins:

    {
        "health_check_delay_sec":{
            "description":"The period of time (in seconds) waited before the health-check begins execution.",
            "type":"number",
            "default":15
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEALTH_CHECK_DELAY_SEC: 15

Health Check Interval

Set the interval between health checks:

    {
        "health_check_interval_sec":{
            "description":"The period of time (in seconds) between health-check executions.",
            "type":"number",
            "default":10
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEALTH_CHECK_INTERVAL_SEC: 10

Health Check Timeout

Set the time a health check can take to complete before it is considered a failed check:

    {
        "health_check_timeout_sec":{
            "description":"The duration (in seconds) allowed for a health-check to complete before it is considered a failure.",
            "type":"number",
            "default":20
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEALTH_CHECK_TIMEOUT_SEC: 20

Health Check Grace Period

Set the amount of time after the delay before health check failures count toward the maximum number of consecutive failures:

    {
        "health_check_grace_period_sec":{
            "description":"The period of time after the delay (in seconds) before health-check failures count towards the maximum consecutive failures.",
            "type":"number",
            "default":10
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEALTH_CHECK_GRACE_SEC: 10

Maximum Consecutive Health Check Failures

    {
        "health_check_max_consecutive_failures":{
            "description":"The the number of consecutive failures which cause a Broker process to exit.",
            "type":"number",
            "default":3
        }
    }
  • DC/OS web interface: Set the environment variable BROKER_HEALTH_CHECK_MAX_FAILURES: 3

Connecting Clients

The only supported client library is the official Kafka Java library, i.e., org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer. Other clients are at the user's risk.

Kafka Client API Compatibility

  1. The Kafka client protocol is versioned and the cluster supports multiple versions.
  2. Kafka is backwards compatible: Newer versions of Kafka always continue to support older versions of the protocol. The implication of this is older clients continue to work with newer versions of Kafka.
  3. Clients are not forward compatible: There is no effort to have newer versions of the client support older versions of Kafka the protocol. The implication of this is newer clients are not compatible older versions of Kafka.

Using the DC/OS CLI

The following command can be executed from the cli in order to retrieve a set of brokers to connect to.

dcos kafka --name=<name> endpoints broker

Using the REST API

REST API requests must be authenticated. See the REST API Authentication part of the REST API Reference for more information.

The following curl example demonstrates how to retrive connection a set of brokers to connect to using the REST API.

curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/endpoints/broker"

User token authentication

DC/OS Enterprise Edition comes with support for user ACLs. To interact with the Kafka REST API you must first retrieve an auth token from the auth HTTP endpoint, then provide this token in following requests.

First, we retrieve uSeR_t0k3n with our user credentials and store the token as an environment variable:

curl --data '{"uid":"username", "password":"password"}' -H "Content-Type:application/json" "<dcos_url>/acs/api/v1/auth/login"

{
  "token": "uSeR_t0k3n"
}

export auth_token=uSeR_t0k3n

Then, use this token to authenticate requests to the Kafka Service:

curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/endpoints/broker"

You do not need the token to access the Kafka brokers themselves.

Connection Info Response

The response, for both the CLI and the REST API is as below.

{ "address": [ "10.0.0.49:1025", "10.0.2.253:1025", "10.0.1.27:1025" ], "dns": [ "kafka-2-broker.kafka.autoip.dcos.thisdcos.directory:1025", "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1025", "kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1025" ], "vips": [ "broker.kafka.l4lb.thisdcos.directory:9092" ] }

This JSON array contains a list of valid brokers that the client can use to connect to the Kafka cluster. For availability reasons, it is best to specify multiple brokers in configuration of the client. Use the VIP to address any one of the Kafka brokers in the cluster. Learn more about load balancing and VIPs in DC/OS.

Configuring the Kafka Client Library

Adding the Kafka Client Library to Your Application

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.1</version>
</dependency>

The above is the correct dependency for the Kafka Client Library to use with the DC/OS Kafka service. After adding this dependency to your project, you should have access to the correct binary dependencies to interface with the Kafka Cluster.

Connecting the Kafka Client Library

The code snippet below demonstrates how to connect a Kafka Producer to the cluster and perform a loop of simple insertions.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("bootstrap.servers", "10.0.0.211:9843,10.0.0.217:10056,10.0.0.214:9689");
// optional:
producerConfig.put("metadata.fetch.timeout.ms": "3000");
producerConfig.put("request.timeout.ms", "3000");
// ... other options: http://kafka.apache.org/documentation.html#producerconfigs
ByteArraySerializer serializer = new ByteArraySerializer();
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);

byte[] message = new byte[1024];
for (int i = 0; i < message.length; ++i) {
  if (i % 2 == 0) {
    message[i] = 'x';
  } else {
    message[i] = 'o';
  }
}
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("test-topic", message);
while (true) {
  kafkaProducer.send(record).get();
  Thread.sleep(1000);
}

The code snippet below demonstrates how to connect a Kafka Consumer to the cluster and perform a simple retrievals.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers", "10.0.0.211:9843,10.0.0.217:10056,10.0.0.214:9689");
// optional:
consumerConfig.put("group.id", "test-client-consumer")
// ... other options: http://kafka.apache.org/documentation.html#consumerconfigs
ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

List<String> topics = new ArrayList<>();
topics.add("test-topic");
kafkaConsumer.subscribe(topics);
while (true) {
  ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(1000);
  int bytes = 0;
  for (ConsumerRecord<byte[], byte[]> record : records) {
    if (record.key() != null) {
      bytes += record.key().length;
    }
    bytes += record.value().length;
  }
  System.out.println(String.format("Got %d messages (%d bytes)", records.count(), bytes));
}
kafkaConsumer.close();

Configuring the Kafka Test Scripts

The following code connects to a DC/OS-hosted Kafka instance using bin/kafka-console-producer.sh and bin/kafka-console-consumer.sh as an example:

dcos kafka endpoints broker
{
  "address": [
    "10.0.0.49:1025",
    "10.0.2.253:1025",
    "10.0.1.27:1025"
  ],
  "dns": [
    "kafka-2-broker.kafka.autoip.dcos.thisdcos.directory:1025",
    "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1025",
    "kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1025"
  ],
  "vips": [
    "broker.kafka.l4lb.thisdcos.directory:9092"
  ]
}

dcos node ssh --master-proxy --leader

core@ip-10-0-6-153 ~ docker run -it mesosphere/kafka-client

root@7d0aed75e582:/bin# echo "Hello, World." | ./kafka-console-producer.sh --broker-list 10.0.0.49:1025, 10.0.2.253:1025, 10.0.1.27:1025 --topic topic1

root@7d0aed75e582:/bin# ./kafka-console-consumer.sh --zookeeper master.mesos:2181/kafka --topic topic1 --from-beginning
Hello, World.

DC/OS Apache Kafka is an automated service that makes it easy to deploy and manage Apache Kafka on Mesosphere DC/OS, eliminating nearly all of the complexity traditionally associated with managing a Kafka cluster. Apache Kafka is a distributed high-throughput publish-subscribe messaging system with strong ordering guarantees. Kafka clusters are highly available, fault tolerant, and very durable. For more information on Apache Kafka, see the Apache Kafka documentation. DC/OS Kafka gives you direct access to the Kafka API so that existing producers and consumers can interoperate. You can configure and install DC/OS Kafka in moments. Multiple Kafka clusters can be installed on DC/OS and managed independently, so you can offer Kafka as a managed service to your organization.

Benefits

DC/OS Kafka offers the following benefits of a semi-managed service:

  • Easy installation
  • Multiple Kafka clusters
  • Elastic scaling of brokers
  • Replication for high availability
  • Kafka cluster and broker monitoring

Features

DC/OS Kafka provides the following features:

  • Single-command installation for rapid provisioning
  • Multiple clusters for multiple tenancy with DC/OS
  • High availability runtime configuration and software updates
  • Storage volumes for enhanced data durability, known as Mesos Dynamic Reservations and Persistent Volumes
  • Integration with syslog-compatible logging services for diagnostics and troubleshooting
  • Integration with statsd-compatible metrics services for capacity and performance monitoring

Related Services

Install and Customize

Kafka is available in the DC/OS Universe and can be installed by using either the web interface or the DC/OS CLI.

Prerequisites

  • Depending on your security mode in Enterprise DC/OS, you may need to provision a service account before installing Kafka. Only someone with superuser permission can create the service account.
    • strict security mode requires a service account.
    • permissive security mode a service account is optional.
    • disabled security mode does not require a service account.
  • Your cluster must have at least three private nodes.

Default Installation

To start a basic test cluster with three brokers, run the following command on the DC/OS CLI. Enterprise DC/OS users must follow additional instructions. More information about installing Kafka on Enterprise DC/OS.

$ dcos package install kafka

This command creates a new Kafka cluster with the default name kafka. Two clusters cannot share the same name, so installing additional clusters beyond the default cluster requires customizing the name at install time for each additional instance.

All dcos kafka CLI commands have a --name argument allowing the user to specify which Kafka instance to query. If you do not specify a service name, the CLI assumes the default value, kafka. The default value for --name can be customized via the DC/OS CLI configuration:

$ dcos kafka --name kafka-dev <cmd>

Note: Alternatively, you can install Kafka from the DC/OS web interface. If you install Kafka from the web interface, you must install the Kafka DC/OS CLI subcommands separately. From the DC/OS CLI, enter:

dcos package install kafka --cli

Minimal Installation

For development purposes, you may wish to install Kafka on a local DC/OS cluster. For this, you can use dcos-vagrant.

To start a minimal cluster with a single broker, create a JSON options file named sample-kafka-minimal.json:

{
    "brokers": {
        "count": 1,
        "mem": 512,
        "disk": 1000
    }
}

The command below creates a cluster using sample-kafka-minimal.json:

$ dcos package install --options=sample-kafka-minimal.json kafka

Custom Installation

Customize the defaults by creating a JSON file. Then, pass it to dcos package install using the --options parameter.

Sample JSON options file named sample-kafka-custom.json:

{
    "service": {
        "name": "sample-kafka-custom",
        "placement_strategy": "NODE"
    },
    "brokers": {
        "count": 10
    },
    "kafka": {
        "delete_topic_enable": true,
        "log_retention_hours": 128
    }
}

The command below creates a cluster using sample-kafka.json:

$ dcos package install --options=sample-kafka-custom.json kafka

See Configuration Options for a list of fields that can be customized via an options JSON file when the Kafka cluster is created.

Multiple Kafka cluster installation

Installing multiple Kafka clusters is identical to installing Kafka clusters with custom configurations as described above. The only requirement on the operator is that a unique name is specified for each installation. For example:

$ cat kafka1.json
{
    "service": {
        "name": "kafka1"
    }
}

$ dcos package install kafka --options=kafka1.json

Limitations

Configurations

The "disk" configuration value is denominated in MB. We recommend you set the configuration value log_retention_bytes to a value smaller than the indicated "disk" configuration. See the Configuring section for instructions for customizing these values.

Managing Configurations Outside of the Service

The Kafka service's core responsibility is to deploy and maintain the deployment of a Kafka cluster whose configuration has been specified. In order to do this, the service assumes that it has ownership of broker configuration. If an end-user makes modifications to individual brokers through out-of-band configuration operations, the service will almost certainly override those modifications at a later time. If a broker crashes, it will be restarted with the configuration known to the scheduler, not one modified out-of-band. If a configuration update is initiated, all out-of-band modifications will be overwritten during the rolling update.

Brokers

The number of deployable brokers is constrained by two factors. First, brokers have specified required resources, so brokers may not be placed if the DC/OS cluster lacks the requisite resources. Second, the specified "PLACEMENT_STRATEGY" environment variable may affect how many brokers can be created in a Kafka cluster. By default the value is "ANY," so brokers are placed anywhere and are only constrained by the resources of the cluster. A second option is "NODE." In this case only one broker may be placed on a given DC/OS agent.

Security

The security features introduced in Apache Kafka 0.9 are not supported at this time.

Overlay networks

When Kafka is deployed on the dcos overlay network, the configuration cannot be updated to use the host network.

Managing

Add a Broker

Increase the BROKER_COUNT value via the DC/OS web interface as in any other configuration update.

Upgrade Software

  1. In the DC/OS web interface, destroy the Kafka scheduler to be updated.

  2. Verify that you no longer see it in the DC/OS web interface.

  3. Optional: Create a JSON options file with any custom configuration, such as a non-default DEPLOY_STRATEGY.

    {
        "env": {
            "DEPLOY_STRATEGY": "parallel-canary"
        }
    }
    
  4. Install the latest version of Kafka:

    $ dcos package install kafka -—options=options.json
    

Quick Start

  1. Install a Kafka cluster. If you are using open source DC/OS, install a Kafka cluster with the following command from the DC/OS CLI. If you are using Enterprise DC/OS, you may need to follow additional instructions. See the Install and Customize section for more information.

     dcos package install kafka
    
  2. Create a new topic.

     dcos kafka topic create topic1
    
  3. Find Zookeeper and broker endpoint information.

     dcos kafka endpoints zookeeper
     master.mesos:2181/dcos-service-kafka
    
     dcos kafka endpoints broker
     {
       "address": [
         "10.0.3.226:1000",
         "10.0.3.98:1000",
         "10.0.0.120:1000"
       ],
       "dns": [
         "kafka-2-broker.kafka.autoip.dcos.thisdcos.directory:1000",
         "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1000",
         "kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1000"
       ],
       "vips": [
         "broker.kafka.l4lb.thisdcos.directory:9092"
       ]
     }
    
  4. Produce and consume data.

     dcos node ssh --master-proxy --leader
    
     core@ip-10-0-6-153 ~ docker run -it mesosphere/kafka-client
    
     root@7d0aed75e582:/bin# echo "Hello, World." | ./kafka-console-producer.sh --broker-list kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1000, kafka-1-broker.kafka.autoip.dcos.thisdcos.directory:1000, kafka-2-broker.kafka.autoip.dcos.thisdcos.directory:1000 --topic topic1
    
     root@7d0aed75e582:/bin# ./kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
     Hello, World.
    

See also Connecting clients.

Troubleshooting

The Kafka service will be listed as "Unhealthy" when it detects any underreplicated partitions. This error condition usually indicates a malfunctioning broker. Use the dcos kafka topic under_replicated_partitions and dcos kafka topic describe <topic-name> commands to find the problem broker and determine what actions are required.

Possible repair actions include dcos kafka broker restart <broker-id> and dcos kafka broker replace <broker-id>. The replace operation is destructive and will irrevocably lose all data associated with the broker. The restart operation is not destructive and indicates an attempt to restart a broker process.

Configuration Update Errors

The bolded entries below indicate the necessary changes needed to create a valid configuration:

$ curl -H "Authorization: token=$AUTH_TOKEN" "$DCOS_URI/service/kafka/v1/plan"
GET /service/kafka/v1/plan HTTP/1.1

{
    "phases": [
        {
             "id": "c26bec40-3290-4501-b3da-945d0abef55f",
            "name": "Reconciliation",
            "steps": [
                {
                    "id": "e56d2e4a-e05b-42ad-b4a0-d74b68d206af",
                    "message": "Reconciliation complete",
                    "name": "Reconciliation",
                    "status": "COMPLETE"
                },
                "status": "COMPLETE"
            ]
        },
        {
            
            "id": "226a780e-132f-4fea-b584-7712b07cf357",
            "name": "Update to: 72cecf77-dbc5-4ae6-8f91-c88702b9a6a8",
            "steps": [
                {
                    "id": "d4e72ee8-4608-423a-9566-1632ff0ab211",
                    "message": "Broker-0 is COMPLETE",
                    "name": "broker-0",
                    "status": "COMPLETE"
                },
                {
                    "id": "3ea30deb-9660-42f1-ad23-bd418d718999",
                    "message": "Broker-1 is COMPLETE",
                    "name": "broker-1",
                    "status": "COMPLETE"
                },
                {
                    "id": "4da21440-de73-4772-9c85-877f2677e62a",
                    "message": "Broker-2 is COMPLETE",
                    "name": "broker-2",
                    "status": "COMPLETE"
                }
            ],
            "status": "COMPLETE"
        }
    ],
    
    "errors": [
        "Validation error on field \"BROKER_COUNT\": Decreasing this value (from 3 to 2) is not supported."
    ],
    "status": "Error"
}

Replacing a Permanently Failed Server

If a machine has permanently failed, manual intervention is required to replace the broker or brokers that resided on that machine. Because DC/OS Kafka uses persistent volumes, the service continuously attempts to replace brokers where their data has been persisted. In the case where a machine has permanently failed, use the Kafka CLI to replace the brokers.

In the example below, the broker with id 0 will be replaced on new machine as long as cluster resources are sufficient to satisfy the service’s placement constraints and resource requirements.

$ dcos kafka broker replace 0

Uninstall

Run the following command from the DC/OS CLI to uninstall Kafka. Alternatively, you can uninstall Kafka from the DC/OS web interface. More information about uninstalling DC/OS services.

Replace name with the name of the kafka instance to be uninstalled.

$ dcos package uninstall --app-id=<name> kafka

Then, use the framework cleaner script to remove your Kafka instance from Zookeeper and to destroy all data associated with it. The script require several arguments, the values for which are derived from your service name:

  • framework-role is <name>-role.
  • framework-principal is <name>-principal.
  • zk_path is dcos-service-<name>.

Version Policy

The DC/OS Kafka Service is engineered and tested to work with a specific release of Apache Kafka, the base technology. We select stable versions of the base technology in order to promote customer success. We have selected the latest stable version of Apache Kafka for new releases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment