|
import java.io.IOException; |
|
import java.nio.ByteBuffer; |
|
import java.util.ArrayList; |
|
import java.util.LinkedHashMap; |
|
import java.util.List; |
|
import java.util.Map; |
|
|
|
import com.datastax.driver.core.Row; |
|
import org.apache.cassandra.hadoop.ConfigHelper; |
|
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; |
|
import org.apache.cassandra.hadoop.cql3.CqlInputFormat; |
|
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat; |
|
import org.apache.cassandra.utils.ByteBufferUtil; |
|
import org.apache.hadoop.conf.Configuration; |
|
import org.apache.hadoop.conf.Configured; |
|
import org.apache.hadoop.io.IntWritable; |
|
import org.apache.hadoop.io.Text; |
|
import org.apache.hadoop.mapreduce.Job; |
|
import org.apache.hadoop.mapreduce.Mapper; |
|
import org.apache.hadoop.mapreduce.Reducer; |
|
import org.apache.hadoop.util.Tool; |
|
import org.apache.hadoop.util.ToolRunner; |
|
import org.slf4j.Logger; |
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
public class Main extends Configured implements Tool { |
|
|
|
private static final Logger logger = LoggerFactory.getLogger(Main.class); |
|
|
|
private static final String KEYSPACE = "wordcount"; |
|
private static final String COLUMN_FAMILY = "input"; |
|
|
|
private static final String OUTPUT_COLUMN_FAMILY = "output"; |
|
private static final String PRIMARY_KEY = "row_key"; |
|
|
|
private static class CountMapper extends Mapper<Long, Row, Text, IntWritable> { |
|
|
|
private final static IntWritable one = new IntWritable(1); |
|
private Text word = new Text(); |
|
|
|
@Override |
|
protected void map(Long key, Row row, Context context) throws IOException, InterruptedException { |
|
String value = row.getString("word"); |
|
logger.debug("read {}:{}={} from {}", key, "word", value, context.getInputSplit()); |
|
word.set(value); |
|
context.write(word, one); |
|
} |
|
} |
|
|
|
private static class CountReducer extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>> { |
|
|
|
private Map<String, ByteBuffer> keys; |
|
|
|
@Override |
|
protected void setup(Context context) throws IOException, InterruptedException { |
|
keys = new LinkedHashMap<String, ByteBuffer>(); |
|
} |
|
|
|
@Override |
|
public void reduce(Text word, Iterable<IntWritable> values, Context context) |
|
throws IOException, InterruptedException { |
|
int sum = 0; |
|
for (IntWritable val : values) { |
|
sum += val.get(); |
|
} |
|
keys.put("word", ByteBufferUtil.bytes(word.toString())); |
|
List<ByteBuffer> variables = new ArrayList<ByteBuffer>(); |
|
variables.add(ByteBufferUtil.bytes(String.valueOf(sum))); |
|
context.write(keys, variables); |
|
} |
|
} |
|
|
|
public int run(String[] args) throws Exception { |
|
// Job job = new Job(getConf(), "Word Count"); |
|
Job job = Job.getInstance(getConf(), "Word Count"); |
|
job.setJarByClass(Main.class); |
|
|
|
job.setMapperClass(CountMapper.class); |
|
job.setInputFormatClass(CqlInputFormat.class); |
|
CqlConfigHelper.setInputCql(job.getConfiguration(), |
|
"select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering"); |
|
|
|
job.setReducerClass(CountReducer.class); |
|
job.setOutputFormatClass(CqlOutputFormat.class); |
|
|
|
job.setMapOutputKeyClass(Text.class); |
|
job.setMapOutputValueClass(IntWritable.class); |
|
job.setOutputKeyClass(Map.class); |
|
job.setOutputValueClass(List.class); |
|
|
|
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); |
|
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); |
|
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); |
|
|
|
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); |
|
job.getConfiguration().set(PRIMARY_KEY, "word,sum"); |
|
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET count = ? "; |
|
CqlConfigHelper.setOutputCql(job.getConfiguration(), query); |
|
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); |
|
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); |
|
|
|
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); |
|
|
|
job.waitForCompletion(true); |
|
return 0; |
|
} |
|
|
|
public static void main (String[] args) throws Exception { |
|
ToolRunner.run(new Configuration(), new Main(), args); |
|
System.exit(0); |
|
} |
|
} |