Skip to content

Instantly share code, notes, and snippets.

@thinkerbot
Created January 29, 2014 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thinkerbot/8697966 to your computer and use it in GitHub Desktop.
Save thinkerbot/8697966 to your computer and use it in GitHub Desktop.
Kafka round trip perf test
#!/usr/bin/env bash
set -ex
#######################################
# Runs as root
#######################################
cat > /etc/default/locale <<"DOC"
LANG="en_US.UTF-8"
LANGUAGE=
LC_CTYPE="en_US.UTF-8"
LC_NUMERIC="en_US.UTF-8"
LC_TIME="en_US.UTF-8"
LC_COLLATE="en_US.UTF-8"
LC_MONETARY="en_US.UTF-8"
LC_MESSAGES="en_US.UTF-8"
LC_PAPER="en_US.UTF-8"
LC_NAME="en_US.UTF-8"
LC_ADDRESS="en_US.UTF-8"
LC_TELEPHONE="en_US.UTF-8"
LC_MEASUREMENT="en_US.UTF-8"
LC_IDENTIFICATION="en_US.UTF-8"
LC_ALL=
DOC
source /etc/default/locale
#######################################
# Prerequisites
########################################
apt-get update
# Developer tools
apt-get -y install vim
apt-get -y install expect
apt-get -y install git
# Install Kafka
# https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start
apt-get -y install openjdk-7-jdk
git clone https://git-wip-us.apache.org/repos/asf/kafka.git
cd kafka
git checkout -b 0.8 remotes/origin/0.8
./sbt update
./sbt package
./sbt assembly-package-dependency
sed -e 's/broker.id=0/broker.id=1/' -e 's/port=9092/port=9091/' -e 's|log.dirs=/tmp/kafka-logs|log.dirs=/tmp/kafka-logs-1|' -e 's/#host.name=localhost/host.name=192.168.0.100/' -e 's|zookeeper.connect=localhost:2181|zookeeper.connect=192.168.0.100:2181|' config/server.properties > config/server1.properties
sed -e 's/broker.id=0/broker.id=2/' -e 's/port=9092/port=9092/' -e 's|log.dirs=/tmp/kafka-logs|log.dirs=/tmp/kafka-logs-2|' -e 's/#host.name=localhost/host.name=192.168.0.100/' -e 's|zookeeper.connect=localhost:2181|zookeeper.connect=192.168.0.100:2181|' config/server.properties > config/server2.properties
sed -e 's/broker.id=0/broker.id=3/' -e 's/port=9092/port=9093/' -e 's|log.dirs=/tmp/kafka-logs|log.dirs=/tmp/kafka-logs-3|' -e 's/#host.name=localhost/host.name=192.168.0.100/' -e 's|zookeeper.connect=localhost:2181|zookeeper.connect=192.168.0.100:2181|' config/server.properties > config/server3.properties
mkdir logs
chown -R vagrant:vagrant .
# ./bin/zookeeper-server-start.sh config/zookeeper.properties &
# ./bin/kafka-server-start.sh config/server1.properties &
# ./bin/kafka-server-start.sh config/server2.properties &
# ./bin/kafka-server-start.sh config/server3.properties &
#
# ./bin/kafka-create-topic.sh --topic example --replica 3 --zookeeper localhost:2181
# ./bin/kafka-console-producer.sh --broker-list 192.168.0.100:9091,192.168.0.100:9092,192.168.0.100:9093 --sync --topic event
# ./bin/kafka-console-consumer.sh --zookeeper 192.168.0.100:2181 --topic event --from-beginning
#!/usr/bin/env ruby
# Demonstrates zeromq-like throughput in async mode. Note n cannot be
# increased reliably -- if n is too big the console producer errors with:
#
# kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(event,null,74600)
#
# This can likely be handled by a different implementation of producer,
# or by getting fancy by adding more brokers.
bindir = File.expand_path("../../vendor/kafka/bin", __FILE__)
producer = IO.popen("#{bindir}/kafka-console-producer.sh --broker-list 192.168.0.100:9091,192.168.0.100:9092,192.168.0.100:9093 --topic event", 'w')
consumer = IO.popen("#{bindir}/kafka-console-consumer.sh --zookeeper 192.168.0.100:2181 --topic event", 'r')
puts "init"
# warmup
Thread.new do
100.times {|i| producer.puts i }
producer.puts "warm"
end
loop do
r, w, e = IO.select([consumer], nil, nil, 1)
break if r.empty?
msg = consumer.gets
break if msg =~ /^warm/
end
puts "warm"
10.times do |j|
sleep 1
n = 10000
start = Time.now
Thread.new do
(n - 1).times {|i| producer.puts i }
producer.puts "stop#{j}"
end
pattern = /^stop#{j}/
count = 0
loop do
msg = consumer.gets
count += 1
break if msg =~ pattern
end
elapsed = Time.now - start
puts "#{elapsed}s (#{count}): #{(n/elapsed).round} msg/s"
end
# -*- mode: ruby -*-
# vi: set ft=ruby :
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# All Vagrant configuration is done here. The most common configuration
# options are documented and commented below. For a complete reference,
# please see the online documentation at vagrantup.com.
# Every Vagrant virtual environment requires a box to build off of.
config.vm.box = "precise64"
# The url from where the 'config.vm.box' box will be fetched if it
# doesn't already exist on the user's system.
config.vm.box_url = "http://files.vagrantup.com/precise64.box"
config.vm.provision :shell, :path => "vm/bootstrap.sh"
config.vm.network "private_network", ip: "192.168.0.100"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment