Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suyash/8ff0d0c59bd16c8d1ef5 to your computer and use it in GitHub Desktop.
Save suyash/8ff0d0c59bd16c8d1ef5 to your computer and use it in GitHub Desktop.
Cassandra to Cassandra over Hadoop MapReduce bindings
### Java ###
*.class
# Package Files #
*.jar
*.war
*.ear
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
### Intellij ###
*.iml
## Directory-based project format:
.idea/
## File-based project format:
*.ipr
*.iws
## Plugin-specific files:
# IntelliJ
/out/
### Gradle ###
.gradle
build/
# Ignore Gradle GUI config
gradle-app.setting
# Nothing new/important
gradlew*
gradle/
# logs
wc.out
gradle build
unzip build/distributions/CassandraMapReduce-1.0-SNAPSHOT.zip
./run.sh
group 'in.suyash.CassandraMapReduce'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.5
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.0'
compile group: 'org.apache.hadoop', name: 'hadoop-auth', version: '2.7.0'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.0'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-jobclient', version: '2.7.0'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-common', version: '2.7.0'
compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-shuffle', version: '2.7.0'
compile group: 'org.apache.cassandra', name: 'cassandra-all', version: '2.1.5'
compile group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '2.1.5'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = "Main"
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
log4j.rootLogger=INFO,stdout,F
#stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
# log file
log4j.appender.F=org.apache.log4j.FileAppender
log4j.appender.F.Append=false
log4j.appender.F.layout=org.apache.log4j.PatternLayout
log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
# Edit the next line to point to your logs directory
log4j.appender.F.File=wc.out
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.datastax.driver.core.Row;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final String KEYSPACE = "wordcount";
private static final String COLUMN_FAMILY = "input";
private static final String OUTPUT_COLUMN_FAMILY = "output";
private static final String PRIMARY_KEY = "row_key";
private static class CountMapper extends Mapper<Long, Row, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Long key, Row row, Context context) throws IOException, InterruptedException {
String value = row.getString("word");
logger.debug("read {}:{}={} from {}", key, "word", value, context.getInputSplit());
word.set(value);
context.write(word, one);
}
}
private static class CountReducer extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>> {
private Map<String, ByteBuffer> keys;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
keys = new LinkedHashMap<String, ByteBuffer>();
}
@Override
public void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
keys.put("word", ByteBufferUtil.bytes(word.toString()));
List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));
context.write(keys, variables);
}
}
public int run(String[] args) throws Exception {
// Job job = new Job(getConf(), "Word Count");
Job job = Job.getInstance(getConf(), "Word Count");
job.setJarByClass(Main.class);
job.setMapperClass(CountMapper.class);
job.setInputFormatClass(CqlInputFormat.class);
CqlConfigHelper.setInputCql(job.getConfiguration(),
"select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering");
job.setReducerClass(CountReducer.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
job.getConfiguration().set(PRIMARY_KEY, "word,sum");
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET count = ? ";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
job.waitForCompletion(true);
return 0;
}
public static void main (String[] args) throws Exception {
ToolRunner.run(new Configuration(), new Main(), args);
System.exit(0);
}
}
cwd=`dirname $0`
CLASSPATH=$cwd:$CLASSPATH
for jar in $cwd/build/distributions/CassandraMapReduce-1.0-SNAPSHOT/lib/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
for jar in $cwd/build/libs/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
java -Xmx1G -ea -cp $CLASSPATH Main
rootProject.name = 'CassandraMapReduce'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment