Skip to content

Instantly share code, notes, and snippets.

@shiumachi
Created April 27, 2018 07:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shiumachi/83e89c231940d8497007d07ca967efa9 to your computer and use it in GitHub Desktop.
Save shiumachi/83e89c231940d8497007d07ca967efa9 to your computer and use it in GitHub Desktop.
Kafka Kudu Demo (WIP)
#!/bin/sh
# logging stdout/stderr
set -x
exec >> /root/bootstrap-master-init.log 2>&1
date
# Master node identifier
touch /root/kafka-kudu-demo_edge-node.flag
include file("your-aws-info.conf")
## Instance Configurations
INSTANCE_TYPE_CM: t2.xlarge #vCPU 4, RAM 16G
INSTANCE_TYPE_MASTER: t2.large #vCPU 2, RAM 8G
INSTANCE_TYPE_WORKER: r3.xlarge #vCPU 4, RAM 30.5G, SSD 80Gx1
WORKER_NODE_NUM: 3 #Number of Worker Nodes
##
name: kafka-kudu-demo-cluster
provider {
type: aws
accessKeyId: ${?AWS_ACCESS_KEY_ID}
secretAccessKey: ${?AWS_SECRET_ACCESS_KEY}
region: ${?AWS_REGION}
subnetId: ${?AWS_SUBNET_ID}
securityGroupsIds: ${?AWS_SECURITY_GROUP}
}
ssh {
username: ${?OS_USERNAME}
privateKey: ${?KEY_PAIR}
}
common-instance-properties {
image: ${?AWS_AMI}
tags {
owner: ${?INSTANCE_OWNER_TAG}
}
}
cloudera-manager {
instance: ${common-instance-properties} {
type: ${INSTANCE_TYPE_CM}
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-cm"
tags {
application: "Cloudera Manager 5"
}
bootstrapScriptsPaths: [
"common/bootstrap-common.sh"
]
}
enableEnterpriseTrial: true
javaInstallationStrategy: NONE
repository: "https://archive.cloudera.com/cm5/redhat/7/x86_64/cm/5.14.1/"
repositoryKeyUrl: "https://archive.cloudera.com/cm5/redhat/7/x86_64/cm/RPM-GPG-KEY-cloudera"
configs {
CLOUDERA_MANAGER {
custom_banner_html: "Demo cluster managed by Cloudera Director"
}
NAVIGATORMETASERVER {
# for Navigator Demo
navigator_safety_valve: "nav.extractor.poll_period=10"
}
}
}
cluster {
products {
CDH: 5
KAFKA: 3
}
parcelRepositories: [
"http://archive.cloudera.com/cdh5/parcels/5.14.0/",
"https://archive.cloudera.com/kafka/parcels/3.0/"
]
services: [
ZOOKEEPER,
HDFS,
YARN,
SPARK_ON_YARN,
HIVE,
IMPALA,
OOZIE,
HUE,
KUDU,
KAFKA
]
configs {
KAFKA {
"offsets.topic.replication.factor": 1
}
}
master {
count: 1
instance: ${common-instance-properties} {
type: ${INSTANCE_TYPE_MASTER}
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-master"
tags {
group: master
}
bootstrapScriptsPaths: [
"common/bootstrap-common.sh",
"impala-demo-cluster/bootstrap-master.sh"
]
}
roles {
ZOOKEEPER: [SERVER]
HDFS: [NAMENODE,SECONDARYNAMENODE]
YARN: [RESOURCEMANAGER,JOBHISTORY]
SPARK_ON_YARN: [SPARK_YARN_HISTORY_SERVER]
HIVE: [HIVEMETASTORE,HIVESERVER2]
IMPALA: [STATESTORE,CATALOGSERVER]
OOZIE: [OOZIE_SERVER]
HUE: [HUE_SERVER]
KUDU: [KUDU_MASTER]
KAFKA: [KAFKA_BROKER]
}
configs {
HDFS {
NAMENODE {
dfs_name_dir_list: "/data0/nn"
}
SECONDARYNAMENODE {
fs_checkpoint_dir_list: "/data0/snn"
}
}
YARN {
RESOURCEMANAGER {
yarn_scheduler_maximum_allocation_mb: 8192
yarn_scheduler_maximum_allocation_vcores: 4
}
}
KUDU {
KUDU_MASTER {
fs_wal_dir: "/data0/kudu/masterwal"
fs_data_dirs: "/data0/kudu/master"
}
}
}
}
worker {
count: ${?WORKER_NODE_NUM}
minCount: ${?WORKER_NODE_NUM}
instance: ${common-instance-properties} {
type: ${INSTANCE_TYPE_WORKER}
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-worker"
tags {
group: worker
}
bootstrapScriptsPaths: [
"common/bootstrap-common.sh"
]
}
roles {
HDFS: [DATANODE]
YARN: [NODEMANAGER]
IMPALA: [IMPALAD]
KUDU: [KUDU_TSERVER]
HIVE: [GATEWAY]
}
configs {
HDFS {
DATANODE {
dfs_data_dir_list: "/data0/dn"
}
}
YARN {
NODEMANAGER {
yarn_nodemanager_resource_memory_mb: 4096
yarn_nodemanager_resource_cpu_vcores: 2
}
}
KUDU {
KUDU_TSERVER {
fs_wal_dir: "/data0/kudu/tabletwal"
fs_data_dirs: "/data0/kudu/tablet"
#memory_limit_hard_bytes: 17179869184 #16GiB
#block_cache_capacity_mb: 2048 #2GiB
}
}
KAFKA {
KAFKA_BROKER {
broker_max_heap_size: 268435456
}
}
IMPALA {
IMPALAD {
impalad_memory_limit: 17179869184 #16GiB
}
}
}
}
postCreateScriptsPaths: [
"impala-demo-cluster/postcreate-dataload.sh"
]
instancePostCreateScriptsPaths: [
"impala-demo-cluster/postcreate-kafka-kudu-demo.sh"
]
}
from kafka import KafkaConsumer
from kafka.client import KafkaClient
import kudu
from kudu.client import Partitioning
import argparse
def init_argumentparser():
parser = argparse.ArgumentParser()
parser.add_argument('--kudu_master_address', default='', type=str)
parser.add_argument('--kudu_master_port', default='7051', type=str)
parser.add_argument('--kudu_table', default='test_table', type=str)
parser.add_argument('--kafka_broker_address', default='', type=str)
parser.add_argument('--kafka_broker_port', default='9092', type=str)
parser.add_argument('--kafka_topic', default='test_topic', type=str)
parser.add_argument('--config', default='', type=str)
return parser
def insert_msg(msg, table, session):
ts = msg.timestamp
value = msg.value.decode('utf-8')
op = table.new_insert({'key': ts, 'value': value})
session.apply(op)
session.flush()
print("key={}, value={}".format(ts, value))
def create_table(kudu_client, kudu_table):
# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('value', type_=kudu.string, nullable=False, compression='lz4')
schema = builder.build()
# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
# Create new table
kudu_client.create_table(kudu_table, schema, partitioning)
def create_kafka_topic(kafka_client, kafka_topic):
kafka_client.add_topic(kafka_topic)
def main():
argumentparser = init_argumentparser()
args = vars(argumentparser.parse_args())
kudu_master_address = args.kudu_master_address
kudu_master_port = args.kudu_master_port
kudu_table = args.kudu_table
kafka_broker_address = args.kafka_broker_address
kafka_broker_port = args.kafka_broker_port
kafka_topic = args.kafka_topic
kudu_client = kudu.connect(host=kudu_master_address, port=kudu_master_port)
# create a table
tables = kudu_client.list_tables()
if kudu_table not in tables:
create_table(kudu_client, kudu_table)
# Open a table
table = kudu_client.table(kudu_table)
# Create a new session so that we can apply write operations
session = kudu_client.new_session()
kafka_bootstrap_servers = ':'.join([kafka_broker_address, str(kafka_broker_port)])
kafka_client = KafkaClient(bootstrap_servers=kafka_bootstrap_servers)
# Create a topic
topics = kafka_client.cluster.topics()
if kafka_topic not in topics:
create_kafka_topic(kafka_client, kafka_topic)
# Listen a topic as a consumer
consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_bootstrap_servers)
for msg in consumer:
insert_msg(msg, table, session)
if __name__ == '__main__':
main()
#!/bin/sh
#
# Post Creation Script for Kafka Kudu Demo
#
# logging stdout/stderr
set -x
exec >> /root/instance-postcreate-base 2>&1
date
# exit this script if the target host doesn't have a flag file
if [ ! -e /root/kafka-kudu-demo_edge-node.flag ]; then
echo "INFO: This host is not an edge node. Retry to run in another host ..."
exit 0
fi
# install EPEL repository
yum install -y https://centos7.iuscommunity.org/ius-release.rpm
# install setuptools, pip, virtualenv, virtualenvwrapper
easy_install setuptools
easy_install pip
pip install virtualenv virtualenvwrapper
source /usr/bin/virtualenvwrapper.sh
# install python 3.6
yum -y install python36 python36-devel
# create virtualenv
mkvirtualenv -p /usr/bin/python3.6 kafka-kudu-demo
# install Kafka python client
pip install kafka-python
# install Kudu repository
cd /etc/yum.repos.d
wget http://archive.cloudera.com/kudu/redhat/7/x86_64/kudu/cloudera-kudu.repo
yum -y install kudu-client-devel
# install packages for build
yum -y install kudu-client-devel gcc gcc-c++
# install kudu-python
# The latest version of Kudu Client 1.7.0 doesn't support 1.6.0 (Cloudera 5.14),
# so please specify the previous version 1.2.0
# install Cython before kudu-python. Do not merge into one command. kudu-python cannot resolve dependency.
pip install Cython
pip install kudu-python==1.2.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment