Skip to content

Instantly share code, notes, and snippets.

@Sathiyarajan
Forked from Saberko/Sample.java
Created January 3, 2018 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sathiyarajan/4f8da97fb13757aa801f3ce3a45bdfa0 to your computer and use it in GitHub Desktop.
Save Sathiyarajan/4f8da97fb13757aa801f3ce3a45bdfa0 to your computer and use it in GitHub Desktop.
spark hbase integration
package utils;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.*;
/**
* run :
* spark-submit
* --class utils.SampleApp
* --master spark://h3:7077
* target/dmp-SNAPSHOT-1.0.jar
* --run_mode local
* --redis_host w1
* --duration 300
* --checkpoint_directory /tmp
*
*/
public class SampleApp {
public static final String APP_DURATION = "durations";
public static final String REDIS_HOST = "redis_hosts";
public static final String LOGS_DIRECTORY = "logs_directory";
public static final String RUN_MODE = "run_mode";
public static final String CHECKPOINT_DIRECTORY = "checkpoint_directory";
public static final Options THE_OPTIONS = createOptions();
private static Options createOptions() {
Options options = new Options();
options.addOption(APP_DURATION, true, "The streaming app's duration");
options.addOption(REDIS_HOST, true, "The redis host");
options.addOption(LOGS_DIRECTORY, true, "The directory where logs are written");
options.addOption(RUN_MODE, true, "The applicaton's running mode");
options.addOption(CHECKPOINT_DIRECTORY, true, "The checkpoint directory");
return options;
}
public static void main (String[] args) {
if (args.length == 0) {
System.err.println("Some parameters are needed");
System.exit(-1);
}
AppConfigure.setFromCommandLineArgs(THE_OPTIONS, args);
SparkConf conf = new SparkConf().setAppName("Sample App");
if ("local".equalsIgnoreCase(AppConfigure.getInstance().getRunMode())) {
conf.setMaster("local[*]");
}
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hconf = HBaseConfiguration.create();
// set Scan to scan specific columns colFam:col
// Scan scan = new Scan();
// scan.addFamily(Bytes.toBytes("name"));
// scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("n1"));
try {
// String table = "mytable";
// hconf.set(TableInputFormat.INPUT_TABLE, table);
// ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
// String scanToString = Base64.encodeBytes(proto.toByteArray());
// conf.set(TableInputFormat.SCAN, scanToString);
hconf.set(TableInputFormat.INPUT_TABLE, "mytable");
JavaPairRDD<ImmutableBytesWritable, Result> rdd = jsc.newAPIHadoopRDD(hconf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println("totla records: " + rdd.count());
// JavaRDD<Result> result = rdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Result>() {
// public Result call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
// return tuple._2();
// }
// });
JavaPairRDD<String, String> myrdd = rdd.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() {
public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
CellScanner cellScanner = tuple._2().cellScanner();
while(cellScanner.advance()) {
Cell cell = cellScanner.current();
// String key = "MYKEY--" + Bytes.toString(CellUtil.cloneRow(cell));
// tuple._1()的ImmutableBytesWritable其实就是rowkey
String key = "MYKEY--" + Bytes.toString(tuple._1().get());
String value = Bytes.toString(CellUtil.cloneValue(cell));
return new Tuple2<String, String>(key, value);
}
return null;
}
});
JavaPairRDD<ImmutableBytesWritable, Result> score = rdd.filter(new Function<Tuple2<ImmutableBytesWritable, Result>, Boolean>() {
public Boolean call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
CellScanner cellScanner = tuple._2().cellScanner();
while (cellScanner.advance()) {
Cell cell = cellScanner.current();
String colFam = Bytes.toString(CellUtil.cloneFamily(cell));
if (colFam.equalsIgnoreCase("score"))
return true;
}
return false;
}
});
System.out.println("score records: " + score.count());
score.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() {
public void call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
String key = Bytes.toString(tuple._2().getRow());
String value = Bytes.toString(tuple._2().getValue("score".getBytes(), "n1".getBytes()));
System.out.println(key + ": \n" + value);
}
});
// rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() {
// public void call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
// String key = Bytes.toString(tuple._2().getRow());
// String value = Bytes.toString(tuple._2().getValue("name".getBytes(), "n1".getBytes()));
// System.out.println(key + ": \n" + value);
// }
// });
} catch (Exception e) {
e.printStackTrace();
}
// write to HBase
Configuration writeConf = HBaseConfiguration.create();
writeConf.set("hbase.zookeeper.property.clientPort", "2181");
writeConf.set("hbase.zookeeper.quorum", "localhost");
// exampleClass - a class whose containing jar is used as the job's jar.
JobConf jobConf = new JobConf(writeConf, SampleApp.class);
jobConf.setOutputFormat(TableOutputFormat.class);
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "mytable");
List<String> rawData = Arrays.asList("one", "another");
JavaPairRDD<ImmutableBytesWritable, Put> data = jsc.parallelize(rawData).mapToPair(new PairFunction<String, ImmutableBytesWritable, Put>() {
public Tuple2<ImmutableBytesWritable, Put> call(String s) throws Exception {
Random random = new Random();
Put p = new Put(Bytes.toBytes("abc" + random.nextInt()));
p.addColumn("name".getBytes(), "n1".getBytes(), s.getBytes());
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), p);
}
});
data.saveAsHadoopDataset(jobConf);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment