Skip to content

Instantly share code, notes, and snippets.

@mdaniel
Created August 6, 2014 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mdaniel/2857b6191f84ab5f6da4 to your computer and use it in GitHub Desktop.
Save mdaniel/2857b6191f84ab5f6da4 to your computer and use it in GitHub Desktop.
Kafka on Vagrant
---
- hosts: all
sudo: true
vars:
java_home: /usr/lib/jdk1.8.0_11
roles:
- java
- role: kafka
local_kafka: true
- name: check for existing kafka
stat:
path: '{{kafka_home}}/bin/kafka-topics.sh'
ignore_errors: true
register: kafka_exe
- name: upload local kafka tar
when: local_kafka
copy:
src: kafka_{{kafka_scala_ver}}-{{kafka_ver}}.tgz
dest: /tmp/kafka.tar.gz
- name: fetch kafka bits
when: not local_kafka
get_url:
dest: /tmp/kafka.tar.gz
url: '{{kafka_url}}'
sha256sum: '{{kafka_sha256}}'
- name: unpack kafka
when: not kafka_exe.stat.exists
unarchive:
copy: no
creates: '{{kafka_home}}/bin/kafka-topics.sh'
src: /tmp/kafka.tar.gz
# the tar contains the version qualified directory
dest: /opt
- name: remove kafka.tar
file: path=/tmp/kafka.tar.gz state=absent
- name: symlink to versionless name
file:
src: '{{kafka_home}}'
dest: '{{kafka_home|regex_replace("_"+kafka_scala_ver+".*", "")}}'
owner: '{{kafka_user}}'
state: link
- name: chown kafka dir
command: chown -R {{kafka_user}} {{kafka_home}}
- name: create log directory
file: path={{kafka_log_dir}} owner={{kafka_user}} state=directory
- name: create server.properties
template:
src: server.properties.j2
dest: '{{kafka_home}}/config/server.properties'
- name: create init conf
template:
src: etc_init.conf.j2
dest: /etc/init/kafka.conf
- name: create the zk chroot
# we might not be the first runner
ignore_errors: true
shell: |
# grab JAVA_HOME
for i in /etc/profile.d/*.sh; do . $i; done
set -x
# firing up a kafka utility will create the logs directory
# and perhaps even logs in it; but we need the owner != root
log_chmod() {
chown -R {{kafka_user}} {{kafka_home}}/
}
trap log_chmod EXIT
export `sed -ne '/zookeeper.connect=/{
s#^[^=]*=\([^/]*\)\(/.*\)#ZKHOST=\1 ZKCHROOT=\2#p
}' {{kafka_home}}/config/server.properties`
ZKCMD={{kafka_home}}/bin/zookeeper-shell.sh
if $ZKCMD $ZKHOST ls2 $ZKCHROOT | grep -q 'ctime'; then
exit 0
fi
# regrettably, it is unclear what user:group should be
$ZKCMD $ZKHOST create $ZKCHROOT 0:0
echo "RC=$?"
description "Kafka Daemon"
env JAVA_HOME={{java_home}}
export JAVA_HOME
start on runlevel [2345]
stop on runlevel [!2345]
respawn
console log
setuid {{kafka_user}}
chdir {{kafka_home}}
exec bin/kafka-server-start.sh config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id={{ansible_hostname | regex_replace("^[^0-9]*", "")}}
port=9092
host.name={{ansible_hostname}}
# Hostname the broker will advertise to producers and consumers. If
# not set, it uses the value for "host.name" if configured.
# Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
## advertised.host.name=<hostname routable by clients>
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs={{kafka_log_dir}}
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we
# only fsync() to sync the OS cache lazily. The following
# configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush
# data after a period of time or every N messages (or both). This can
# be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=1000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log
# segments. The policy can be set to delete segments after a period of
# time, or after a given size has accumulated. A segment will be
# deleted whenever *either* of these criteria are met. Deletion always
# happens from the end of the log.
# The minimum age of a log file to be eligible for deletion
# 168 = 24 * 7
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the
# log as long as the remaining segments don't drop below
# log.retention.bytes.
# log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a
# new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be
# deleted according to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy
# will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and
# individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details). This
# is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". You
# can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={% for z in zoos %}{% if not loop.first %},{% endif %}{{z}}:2181{% endfor %}/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
---
kafka_ver: '0.8.1.1'
kafka_scala_ver: '2.10'
kafka_url: http://download.nextag.com/apache/kafka/{{kafka_ver}}/kafka_{{kafka_scala_ver}}-{{kafka_ver}}.tgz
kafka_sha256: '2532af3dbd71d2f2f95f71abff5b7505690bd1f15c7063f8cbaa603b45ee4e86'
kafka_user: ubuntu
# this path is used so much, let's shorten the typing
kafka_home: /opt/kafka_{{kafka_scala_ver}}-{{kafka_ver}}
# be careful, this is its *data* directory
kafka_log_dir: /var/lib/kafka
# -*- mode: ruby -*-
# vi: set ft=ruby :
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = '2'
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = 'trusty-server-cloudimg-amd64-vagrant-disk1'
config.vm.box_url = 'http://cloud-images.ubuntu.com/vagrant/trusty/current/trusty-server-cloudimg-amd64-vagrant-disk1.box'
zoo_host_map = {
'zoo1' => '192.168.56.111',
'zoo2' => '192.168.56.112',
'zoo3' => '192.168.56.113',
}
kafka_host_map = {
'kafka1' => '192.168.56.121',
'kafka2' => '192.168.56.122',
'kafka3' => '192.168.56.123',
}
$zoo_hosts_file = zoo_host_map.map do
|h,ip| "#{ip} #{h}" end.join("\n")
$zoo_update_hosts =<<ZOO
if grep -q zoo-update-hosts /etc/hosts; then
exit 0
fi
cat >>/etc/hosts<<H
#-zoo-update-hosts
#{$zoo_hosts_file}
H
ZOO
$kafka_hosts_file = kafka_host_map.map do
|h,ip| "#{ip} #{h}" end.join("\n")
$kafka_update_hosts =<<KAFKA
if grep -q kafka-update-hosts /etc/hosts; then
exit 0
fi
cat >>/etc/hosts<<H
#-kafka-update-hosts
#{$kafka_hosts_file}
#{$zoo_hosts_file}
H
KAFKA
zoo_host_map.keys.each do |hn|
config.vm.define hn do |c|
c.vm.hostname = hn
c.vm.network :private_network, ip: zoo_host_map[c.vm.hostname]
c.vm.provider :virtualbox do |vb|
vb.customize ['modifyvm', :id, '--memory', '512']
end
c.vm.provision :shell, :inline => $zoo_update_hosts
c.vm.provision :ansible do |a|
a.verbose = 'vvvv'
a.playbook = 'prov.ansible/zoo_play.yml'
a.extra_vars = {'hostname' => config.vm.hostname}
end
end
end
kafka_host_map.keys.each do |hn|
config.vm.define hn do |c|
c.vm.hostname = hn
c.vm.network :private_network, ip: kafka_host_map[c.vm.hostname]
c.vm.provider :virtualbox do |vb|
# this must be > 1G b/c kafka init scripts use Xmx1G
vb.customize ['modifyvm', :id, '--memory', '1536']
end
c.vm.provision :shell, :inline => $kafka_update_hosts
c.vm.provision :ansible do |a|
a.verbose = 'vvvv'
a.playbook = 'prov.ansible/kafka_play.yml'
a.extra_vars = {
'hostname' => config.vm.hostname,
'zoos' => zoo_host_map.keys,
}
end
end
end
end
@mdaniel
Copy link
Author

mdaniel commented Aug 6, 2014

This doesn't include the zookeeper role in order to prevent this gist from being huge, but I'm sure there is a separate gist for provisioning those machines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment