Skip to content

Instantly share code, notes, and snippets.

@Saberko
Created August 9, 2016 03:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Saberko/7a0aeabd113bb2f8437f182caddf7ef8 to your computer and use it in GitHub Desktop.
Save Saberko/7a0aeabd113bb2f8437f182caddf7ef8 to your computer and use it in GitHub Desktop.
spark读取、写入hbase操作
package learn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import scala.Tuple3;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* write data to HBase with Spark
*/
public class SparkHbaseIntegration {
/**
* 将三元组rdd转换成Tuple2<ImmutableBytesWritable, Put>的函数
*/
public static final PairFunction<Tuple3<Integer, String, Integer>, ImmutableBytesWritable, Put> CONVERT_FUN =
new PairFunction<Tuple3<Integer, String, Integer>, ImmutableBytesWritable, Put>() {
public Tuple2<ImmutableBytesWritable, Put> call(Tuple3<Integer, String, Integer> data) throws Exception {
Put put = new Put(Bytes.toBytes(data._1().toString())); // rowKey
put.addColumn("basic".getBytes(), "name".getBytes(), data._2().getBytes());
put.addColumn("basic".getBytes(), "age".getBytes(), Bytes.toBytes(data._3()));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
};
public static final String scanToString (Scan scan) {
try {
ClientProtos.Scan s = ProtobufUtil.toScan(scan);
return Base64.encodeBytes(s.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main (String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Hbase writter").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clienPort", "2181");
conf.set("hbase.zookeeper.quorum", "localhost");
// Write to hbase
JobConf jobConf = new JobConf(conf, SparkHbaseIntegration.class);
jobConf.setOutputFormat(TableOutputFormat.class);
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "user");
List<Tuple3<Integer, String, Integer>> data = Arrays.asList(new Tuple3<Integer, String, Integer>(1, "linzhili", 2),
new Tuple3<Integer, String, Integer>(2, "zhangsichao", 33));
JavaPairRDD<ImmutableBytesWritable, Put> rddData = sc.parallelize(data).mapToPair(SparkHbaseIntegration.CONVERT_FUN);
rddData.saveAsHadoopDataset(jobConf);
// ========== Read from hbase ==========
conf.set(TableInputFormat.INPUT_TABLE, "user"); // 表名
// 可以使用scan来设置过滤条件,或指定扫描某一列族的某一列
/*************************************************
Scan scan = new Scan();
scan.setFilter(new SingleColumnValueFilter("basic".getBytes(),"age".getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18))); //过滤大于18岁的
scan.addFamily(Bytes.toBytes("basic"));
scan.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age")); // 只扫描 basic:age
conf.set(TableInputFormat.SCAN, SparkHbaseIntegration.scanToString(scan));
***************************************************/
JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println("totla records: " + rdd.count());
rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() {
public void call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception {
Result result = tuple2._2();
String key = Bytes.toString(result.getRow());
String name = Bytes.toString(result.getValue("basic".getBytes(), "name".getBytes()));
int age = Bytes.toInt(result.getValue("basic".getBytes(), "age".getBytes()));
System.out.println("Key: " + key + "\tName: " + name + "\tAge: " + age);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment