Created
January 24, 2012 18:58
-
-
Save FelixGV/1671887 to your computer and use it in GitHub Desktop.
Kafka distributed incremental Hadoop consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add all of the files in kafka/contrib/hadoop-consumer and run them from that directory, since they rely on some stuff being available relatively from that location. | |
You should start by executing spawn-incremental-distributed-hadoop-consumer | |
The scripts are pretty verbose, with usage indications and all that, so it should be pretty easy to figure out what to do from there. | |
If you have any questions or comments, you can send them to: felix@mate1inc.com |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka1.prod:9092 | |
kafka2.prod:9092 | |
kafka3.prod:9092 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/sh | |
if [[ ! ("$#" == 3) ]]; then | |
echo "Usage: $0 topic hdfs_dir list_of_brokers" | |
echo '' | |
echo 'topic: The Kafka topic to subscribe to.' | |
echo 'hdfs_dir: The HDFS directory to write to. It will contain an offset directory containing the most recent offset file and a hierarchy of nested directories with a new bucket of imported data each time this script is called.' | |
echo 'list_of_brokers: A local file containing the list of Kafka brokers to generate offset files for.' | |
exit 1 | |
fi | |
topic=$1 | |
hdfs_dir=$2 | |
list_of_brokers=$3 | |
bucket_name=`date +%Y/%m/%d/%Hh%M/` | |
generated_property_file='current.properties' | |
current_offset_file_exists=`hadoop fs -ls ${hdfs_dir}/offset` | |
if [ -z "$current_offset_file_exists" ]; then | |
echo "***************************************************************************************************************************" | |
echo "No offset file(s) found, so new one(s) will be generated for the topic '${topic}' starting from offset -1" | |
echo "***************************************************************************************************************************" | |
if [ -f $list_of_brokers ] | |
then | |
printf "File \"$list_of_brokers\" was found\n" | |
while read server | |
do | |
broker=${server} | |
offset_file_name=`echo ${server} | sed -e 's/\:/-port/g'` | |
eval "echo \"$(< template.properties)\"" > ${generated_property_file} | |
./run-class.sh kafka.etl.impl.DataGenerator ${generated_property_file} | |
hadoop fs -mv ${hdfs_dir}/offset/1.dat ${hdfs_dir}/offset/${offset_file_name}.dat | |
done < $list_of_brokers | |
else | |
printf "File \"$list_of_brokers\" was NOT found\n" | |
exit 0 | |
fi | |
else | |
echo "***************************************************************************************************************************" | |
echo "Offset file(s) already found in ${hdfs_dir}/offset so no new one(s) will be created." | |
echo "***************************************************************************************************************************" | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/bash | |
if [[ ! ("$#" == 2) ]]; then | |
echo "Usage: $0 topic hdfs_dir" | |
echo '' | |
echo 'topic: The Kafka topic to subscribe to.' | |
echo 'hdfs_dir: The HDFS directory to write to. It will contain an offset directory containing the most recent offset file and a hierarchy of nested directories with a new bucket of imported data each time this script is called.' | |
exit 1 | |
fi | |
topic=$1 | |
hdfs_dir=$2 | |
bucket_name=`date +%Y/%m/%d/%Hh%M/` | |
generated_property_file='current.properties' | |
current_offset_file_exists=`hadoop fs -ls ${hdfs_dir}/offset` | |
eval "echo \"$(< template.properties)\"" > ${generated_property_file} | |
if [ -z "$current_offset_file_exists" ]; then | |
echo "***************************************************************************************************************************" | |
echo "WARNING: Offset file(s) not found. The hadoop job cannot be run." | |
echo "To generate the initial offset files, please run the following command:" | |
echo "./generate-many-offset-files ${topic} ${hdfs_dir} file_containing_your_list_of_brokers" | |
echo "***************************************************************************************************************************" | |
else | |
echo "***************************************************************************************************************************" | |
echo "Importing topic '${topic}' to HDFS directory: ${hdfs_dir}/${bucket_name}" | |
echo "***************************************************************************************************************************" | |
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob ${generated_property_file} | |
hadoop_consumer_successful=`hadoop fs -ls ${hdfs_dir}/${bucket_name}_SUCCESS` | |
if [ -z "$hadoop_consumer_successful" ]; then | |
echo "***************************************************************************************************************************" | |
echo "ERROR: The hadoop job does not report a success." | |
echo "The original offset file(s) used for the input of this job have been left untouched, so that you can retry later." | |
echo "***************************************************************************************************************************" | |
else | |
echo "***************************************************************************************************************************" | |
echo "SUCCESS: The hadoop job reports that it is successful." | |
echo "The original offset file(s) used for the input of this job will now be deleted and replaced by those outputted by this job." | |
echo "***************************************************************************************************************************" | |
hadoop fs -rm ${hdfs_dir}/offset/* | |
old_offset_files_still_exist=`hadoop fs -ls ${hdfs_dir}/offset` | |
if [ -z "$old_offset_files_still_exist" ]; then | |
echo "***************************************************************************************************************************" | |
echo "SUCCESS: The original offset file(s) used for the input of this job have been deleted from the offset directory in HDFS." | |
echo "***************************************************************************************************************************" | |
hadoop fs -mv ${hdfs_dir}/${bucket_name}offsets* ${hdfs_dir}/offset/ | |
new_offset_files_present_in_offset_dir=`hadoop fs -ls ${hdfs_dir}/offset` | |
if [ -z "$new_offset_files_present_in_offset_dir" ]; then | |
echo "***************************************************************************************************************************" | |
echo "ERROR: The new offset file(s) were not successfully moved to the offset directory." | |
echo "***************************************************************************************************************************" | |
else | |
echo "***************************************************************************************************************************" | |
echo "SUCCESS: The new offset file(s) have been moved to the offset directory." | |
echo "***************************************************************************************************************************" | |
fi | |
else | |
echo "***************************************************************************************************************************" | |
echo "ERROR: The original offset file(s) used for the input of this job could NOT be deleted from the offset directory." | |
echo "***************************************************************************************************************************" | |
fi | |
fi | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# name of test topic | |
kafka.etl.topic=${topic} | |
# hdfs location of jars | |
hdfs.default.classpath.dir=/tmp/kafka/lib | |
# number of test events to be generated | |
event.count=0 | |
# hadoop id and group | |
hadoop.job.ugi=kafka,hadoop | |
# kafka server uri | |
kafka.server.uri=tcp://${broker} | |
# hdfs location of input directory | |
input=${hdfs_dir}/offset/ | |
# hdfs location of output directory | |
output=${hdfs_dir}/${bucket_name} | |
# limit the number of events to be fetched; | |
# value -1 means no limitation | |
kafka.request.limit=-1 | |
# kafka parameters | |
client.buffer.size=1048576 | |
client.so.timeout=60000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment