Skip to content

Instantly share code, notes, and snippets.

@FelixGV
Created January 24, 2012 18:58
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save FelixGV/1671887 to your computer and use it in GitHub Desktop.
Save FelixGV/1671887 to your computer and use it in GitHub Desktop.
Kafka distributed incremental Hadoop consumer
Add all of the files in kafka/contrib/hadoop-consumer and run them from that directory, since they rely on some stuff being available relatively from that location.
You should start by executing spawn-incremental-distributed-hadoop-consumer
The scripts are pretty verbose, with usage indications and all that, so it should be pretty easy to figure out what to do from there.
If you have any questions or comments, you can send them to: felix@mate1inc.com
kafka1.prod:9092
kafka2.prod:9092
kafka3.prod:9092
#/usr/bin/sh
if [[ ! ("$#" == 3) ]]; then
echo "Usage: $0 topic hdfs_dir list_of_brokers"
echo ''
echo 'topic: The Kafka topic to subscribe to.'
echo 'hdfs_dir: The HDFS directory to write to. It will contain an offset directory containing the most recent offset file and a hierarchy of nested directories with a new bucket of imported data each time this script is called.'
echo 'list_of_brokers: A local file containing the list of Kafka brokers to generate offset files for.'
exit 1
fi
topic=$1
hdfs_dir=$2
list_of_brokers=$3
bucket_name=`date +%Y/%m/%d/%Hh%M/`
generated_property_file='current.properties'
current_offset_file_exists=`hadoop fs -ls ${hdfs_dir}/offset`
if [ -z "$current_offset_file_exists" ]; then
echo "***************************************************************************************************************************"
echo "No offset file(s) found, so new one(s) will be generated for the topic '${topic}' starting from offset -1"
echo "***************************************************************************************************************************"
if [ -f $list_of_brokers ]
then
printf "File \"$list_of_brokers\" was found\n"
while read server
do
broker=${server}
offset_file_name=`echo ${server} | sed -e 's/\:/-port/g'`
eval "echo \"$(< template.properties)\"" > ${generated_property_file}
./run-class.sh kafka.etl.impl.DataGenerator ${generated_property_file}
hadoop fs -mv ${hdfs_dir}/offset/1.dat ${hdfs_dir}/offset/${offset_file_name}.dat
done < $list_of_brokers
else
printf "File \"$list_of_brokers\" was NOT found\n"
exit 0
fi
else
echo "***************************************************************************************************************************"
echo "Offset file(s) already found in ${hdfs_dir}/offset so no new one(s) will be created."
echo "***************************************************************************************************************************"
fi
#/usr/bin/bash
if [[ ! ("$#" == 2) ]]; then
echo "Usage: $0 topic hdfs_dir"
echo ''
echo 'topic: The Kafka topic to subscribe to.'
echo 'hdfs_dir: The HDFS directory to write to. It will contain an offset directory containing the most recent offset file and a hierarchy of nested directories with a new bucket of imported data each time this script is called.'
exit 1
fi
topic=$1
hdfs_dir=$2
bucket_name=`date +%Y/%m/%d/%Hh%M/`
generated_property_file='current.properties'
current_offset_file_exists=`hadoop fs -ls ${hdfs_dir}/offset`
eval "echo \"$(< template.properties)\"" > ${generated_property_file}
if [ -z "$current_offset_file_exists" ]; then
echo "***************************************************************************************************************************"
echo "WARNING: Offset file(s) not found. The hadoop job cannot be run."
echo "To generate the initial offset files, please run the following command:"
echo "./generate-many-offset-files ${topic} ${hdfs_dir} file_containing_your_list_of_brokers"
echo "***************************************************************************************************************************"
else
echo "***************************************************************************************************************************"
echo "Importing topic '${topic}' to HDFS directory: ${hdfs_dir}/${bucket_name}"
echo "***************************************************************************************************************************"
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob ${generated_property_file}
hadoop_consumer_successful=`hadoop fs -ls ${hdfs_dir}/${bucket_name}_SUCCESS`
if [ -z "$hadoop_consumer_successful" ]; then
echo "***************************************************************************************************************************"
echo "ERROR: The hadoop job does not report a success."
echo "The original offset file(s) used for the input of this job have been left untouched, so that you can retry later."
echo "***************************************************************************************************************************"
else
echo "***************************************************************************************************************************"
echo "SUCCESS: The hadoop job reports that it is successful."
echo "The original offset file(s) used for the input of this job will now be deleted and replaced by those outputted by this job."
echo "***************************************************************************************************************************"
hadoop fs -rm ${hdfs_dir}/offset/*
old_offset_files_still_exist=`hadoop fs -ls ${hdfs_dir}/offset`
if [ -z "$old_offset_files_still_exist" ]; then
echo "***************************************************************************************************************************"
echo "SUCCESS: The original offset file(s) used for the input of this job have been deleted from the offset directory in HDFS."
echo "***************************************************************************************************************************"
hadoop fs -mv ${hdfs_dir}/${bucket_name}offsets* ${hdfs_dir}/offset/
new_offset_files_present_in_offset_dir=`hadoop fs -ls ${hdfs_dir}/offset`
if [ -z "$new_offset_files_present_in_offset_dir" ]; then
echo "***************************************************************************************************************************"
echo "ERROR: The new offset file(s) were not successfully moved to the offset directory."
echo "***************************************************************************************************************************"
else
echo "***************************************************************************************************************************"
echo "SUCCESS: The new offset file(s) have been moved to the offset directory."
echo "***************************************************************************************************************************"
fi
else
echo "***************************************************************************************************************************"
echo "ERROR: The original offset file(s) used for the input of this job could NOT be deleted from the offset directory."
echo "***************************************************************************************************************************"
fi
fi
fi
# name of test topic
kafka.etl.topic=${topic}
# hdfs location of jars
hdfs.default.classpath.dir=/tmp/kafka/lib
# number of test events to be generated
event.count=0
# hadoop id and group
hadoop.job.ugi=kafka,hadoop
# kafka server uri
kafka.server.uri=tcp://${broker}
# hdfs location of input directory
input=${hdfs_dir}/offset/
# hdfs location of output directory
output=${hdfs_dir}/${bucket_name}
# limit the number of events to be fetched;
# value -1 means no limitation
kafka.request.limit=-1
# kafka parameters
client.buffer.size=1048576
client.so.timeout=60000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment