I show two examples for importing and exporting data from and into HBase tables. In these examples I use Apache Spark 2.x and HBase 1.x API.
To load data into HBase we will use either foreachPartition
or saveAsNewAPIHadoopDataset
action.
We create a connection object in a foreachPartition
action and push data either with Table object or bufferedMutator interface. The latter one is preferred for a big amount of data, since the bufferedMutator sends records in batches.
I also used for a Scala example try-catch-close pattern from https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
Scala Example
def optionOne(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
rdd.foreachPartition(iterator => {
// since resources should be closed properly I used the solution described in
// https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
withResources(ConnectionFactory.createConnection(HBaseConfiguration.create()))(
connection => {
withResources(connection.getBufferedMutator(TableName.valueOf(tableName)))(
mutator => {
iterator.foreach(record => {
val put = new Put(Bytes.toBytes(record.key))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
mutator.mutate(put)
}
)
}
)
}
)
}
)
}
Java Example
public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
rdd.foreachPartition(iterator -> {
try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
//option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
while (iterator.hasNext()) {
MyRecord record = iterator.next();
Put put = new Put(Bytes.toBytes(record.getKey()));
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
mutator.mutate(put);
//table.put(put);
}
}
}
);
}
In this option Spark uses TableOutputFormat
Object to load data in HBase. This is my preferred option, since connection handling and data loading are hidden. The first step is to create a PairRDD[ImmutableBytesWritable, Put]
and then use write
action with a saveAsNewAPIHadoopDataset
call.
Scala
def optionTwo(sparkSession: SparkSession, tableName : String, rdd : RDD[MyRecord]) : Unit ={
val config = new Configuration
config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val jobConfig = Job.getInstance(config)
jobConfig.setOutputFormatClass(classOf[TableOutputFormat[_]])
rdd.map( record => {
val put = new Put(Bytes.toBytes(record.key))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
( new ImmutableBytesWritable(put.getRow()), put)
}).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration)
}
Java
public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
rdd.foreachPartition(iterator -> {
try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
//option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
while (iterator.hasNext()) {
MyRecord record = iterator.next();
Put put = new Put(Bytes.toBytes(record.getKey()));
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
mutator.mutate(put);
//table.put(put);
}
}
}
);
}
Here is a complete example with a main method. In this example we put records in a HBase table with both options.
Scala:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.util.control.NonFatal
/**
* this object exhibits simple pattern for ingesting data into hbase
*/
object ExportIntoHBase {
/**
* copied from https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
*
* @param r
* @param f
* @tparam T
* @tparam V
* @return
*/
def withResources[T <: AutoCloseable, V](r: => T)(f: T => V): V = {
val resource: T = r
require(resource != null, "resource is null")
var exception: Throwable = null
try {
f(resource)
} catch {
case NonFatal(e) =>
exception = e
throw e
} finally {
closeAndAddSuppressed(exception, resource)
}
}
/**
* https //todo: labels is not supported//medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
*
* @param e
* @param resource
*/
def closeAndAddSuppressed(e: Throwable,
resource: AutoCloseable): Unit = {
if (e != null) {
try {
resource.close()
} catch {
case NonFatal(suppressed) =>
e.addSuppressed(suppressed)
}
} else {
resource.close()
}
}
/**
*
* @param key
* @param myValue
*/
case class MyRecord(key: String, myValue: String)
/**
* https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
*
* @param sparkSession
* @param tableName
* @param rdd
*/
def optionOne(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
rdd.foreachPartition(iterator => {
// initialize for a partition
// basic pattern:
// var connection = ConnectionFactory.createConnection(HBaseConfiguration.create())
// var table = connection.getTable(TableName.valueOf(tableName))
// iterator.foreach(pair => table.put(pair._2))
// since resources should be closed properly I used the solution described in
// https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
withResources(ConnectionFactory.createConnection(HBaseConfiguration.create()))(
connection => {
//option 1.1 Table
//withResources(connection.getTable(TableName.valueOf(tableName)))(
// table => {
// iterator.foreach(pair => table.put(pair._2))
// }
//)
//option 1.2 BufferedMutator preferred one
withResources(connection.getBufferedMutator(TableName.valueOf(tableName)))(
mutator => {
iterator.foreach(record => {
val put = new Put(Bytes.toBytes(record.key))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
mutator.mutate(put)
}
)
}
)
}
)
}
)
}
/**
* preferred one
*
* @param sparkSession
* @param tableName
* @param rdd
*/
def optionTwo(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
val config = new Configuration
config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val jobConfig = Job.getInstance(config)
jobConfig.setOutputFormatClass(classOf[TableOutputFormat[_]])
rdd.map(record => {
val put = new Put(Bytes.toBytes(record.key))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
(new ImmutableBytesWritable(put.getRow()), put)
}).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration)
}
/**
*
* @param args
*/
def main(args: Array[String]): Unit = {
// init spark session
val spark = SparkSession.builder().appName("Spark HBase export")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.ui.showConsoleProgress", "false").getOrCreate();
// ingesting data into hbase
val tableName = "testTable"
val array1 = Array(new MyRecord("key_1", "value_1"), new MyRecord("key_2", "value_2"))
val array2 = Array(new MyRecord("key_3", "value_3"), new MyRecord("key_4", "value_4"))
val rdd1 = spark.sparkContext.parallelize(array1, 1)
val rdd2 = spark.sparkContext.parallelize(array2, 1)
optionOne(spark, tableName, rdd1)
optionTwo(spark, tableName, rdd2)
}
}
Java:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
public class ExportIntoHBaseJava {
/**
* POJO
*/
public static class MyRecord implements Serializable {
private String key;
private String myValue;
public MyRecord(String key, String myValue) {
this.key = key;
this.myValue = myValue;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getMyValue() {
return myValue;
}
public void setMyValue(String myValue) {
this.myValue = myValue;
}
}
public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
rdd.foreachPartition(iterator -> {
try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
//option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
while (iterator.hasNext()) {
MyRecord record = iterator.next();
Put put = new Put(Bytes.toBytes(record.getKey()));
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
mutator.mutate(put);
//table.put(put);
}
}
}
);
}
/**
* @param sparkSession
* @param tableName
* @param rdd
* @throws IOException
*/
public static void optionTwo(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) throws IOException {
Configuration config = new Configuration();
config.set(TableOutputFormat.OUTPUT_TABLE, tableName);
Job jobConfig = Job.getInstance(config);
jobConfig.setOutputFormatClass(TableOutputFormat.class);
rdd.mapToPair(record -> {
Put put = new Put(Bytes.toBytes(record.getKey()));
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(put.getRow()), put);
}).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration());
}
public static void main(String[] args) throws IOException {
// init spark session
SparkSession spark = SparkSession.builder().appName("Spark HBase export")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.ui.showConsoleProgress", "false").getOrCreate();
String tableName = "testTable";
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<MyRecord> rdd1 = javaSparkContext.parallelize(Arrays.asList(new MyRecord("key_1", "value_1"), new MyRecord("key_2", "value_2")), 1);
JavaRDD<MyRecord> rdd2 = javaSparkContext.parallelize(Arrays.asList(new MyRecord("key_3", "value_3"), new MyRecord("key_4", "value_4")), 1);
optionOne(spark, tableName, rdd1);
optionTwo(spark, tableName, rdd2);
}
}
In the next examples we will load data from HBase with Apache Spark.
To query hbase you could use these two java classes:
import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}
MultiTableInputFormat
could be used for reading data from multiple tables or using different scans from a single table (e.g. if a hbase table uses salting)
def createResultRDD(sc: SparkContext, scans: Array[Scan]): RDD[Result] = {
val conf = createConfig()
val scanDefinitions = scans.map(s => {
convertScanToString(s)
})
conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions: _*)
val rdd = sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.map(pair => pair._2)
}
TableInputFormat
could be used for reading data from a single table:
def createResultRDD(sc: SparkContext, tableNameString: String, scan: Scan): RDD[Result] = {
val conf = createConfig()
val scanString = convertScanToString(scan)
conf.set(TableInputFormat.INPUT_TABLE, tableNameString)
conf.set(TableInputFormat.SCAN, scanString )
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.map(pair => pair._2)
}
Since scan definitions are passed as a String property converting scan objects to string could be done as shown below:
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray())
}
Below the complete example for scala and java with a main methods:
Scala:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
/**
* this object exhibits simple pattern for HBase Queries using apache spark
*/
object ExportFromHBase {
val logger = LoggerFactory
.getLogger(this.getClass.getName)
val SCAN_DURATION: Long = 1000 * 60 * 15 // 15 minutes
/**
* create RDD for different scans
* @param sc
* @param options
* @param scans
* @return
*/
def createResultRDD(sc: SparkContext, scans: Array[Scan]): RDD[Result] = {
val conf = createConfig()
val scanDefinitions = scans.map(s => {
convertScanToString(s)
})
conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions: _*)
val rdd = sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.map(pair => pair._2)
}
/**
* single table
*
* @param sc
* @param tableNameString
* @param scan
* @return
*/
def createResultRDD(sc: SparkContext, tableNameString: String, scan: Scan): RDD[Result] = {
val conf = createConfig()
val scanString = convertScanToString(scan)
conf.set(TableInputFormat.INPUT_TABLE, tableNameString)
conf.set(TableInputFormat.SCAN, scanString )
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.map(pair => pair._2)
}
/**
* create config for a hbase queries
* @return
*/
def createConfig(): Configuration = {
val conf = HBaseConfiguration.create()
// when using Spark with HBase, scans are usually large
// this results in timeout exceptions, since the default 1 minute timeout is too small
val timeoutMillis: String = SCAN_DURATION.toString
conf.set("hbase.client.scanner.timeout.period", timeoutMillis)
conf.set("hbase.rpc.timeout", timeoutMillis)
conf
}
/**
* convert scan to string
* @param scan
* @return
*/
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray())
}
/**
*
* @param args
*/
def main(args: Array[String]): Unit = {
// init spark session
val spark = SparkSession.builder().appName("Spark HBase export")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.ui.showConsoleProgress", "false").getOrCreate()
import spark.implicits._;
val tableName_1 = "testTable"
val tableName_2 = "testTable"
// case 1 multi tables
val startRow_1 = Bytes.toBytes("key_1")
val stopRow_2 = Bytes.toBytes("key_3") // exclusive
val startRow_3 = Bytes.toBytes("key_3")
val stopRow_4 = Bytes.toBytes("key_5") // exclusive
val columnFamily = Bytes.toBytes("c")
val scan_1 = new Scan(startRow_1, stopRow_2)
scan_1.addFamily(columnFamily)
scan_1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_1).getName)
val scan_2 = new Scan(startRow_3, stopRow_4)
scan_2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_2).getName)
// map to RDD[result]
val rddMultiTable = createResultRDD(spark.sparkContext, Array(scan_1, scan_2))
val count_1 =rddMultiTable.count()
logger.info(s"Scan 1 ${count_1}")
// now you can further process RDD e.g. map to dataset
val rddSingleTable = createResultRDD(spark.sparkContext, tableName_1, scan_1)
val count_2 = rddSingleTable.count()
logger.info(s"Scan 2 ${count_2}")
// ...
}
}
Java:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.io.IOException;
public class ExportFromHBaseJava {
public static Logger logger = LoggerFactory
.getLogger(ExportFromHBaseJava.class);
public static long SCAN_DURATION = 1000 * 60 * 15; // 15 minutes
/**
* create RDD for different scans
* @param sc
* @param options
* @param scans
* @return
*/
public static JavaRDD<Result> createResultRDD(JavaSparkContext sc, Scan[] scans) throws IOException{
Configuration conf = createConfig();
String[] scanDefinitions = new String[scans.length];
for(int i = 0; i < scanDefinitions.length; i++){
scanDefinitions[i] = convertScanToString(scans[i]);
}
conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions);
JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, MultiTableInputFormat.class, ImmutableBytesWritable.class, Result.class);
return rdd.map(pair -> pair._2());
}
/**
* single table
*
* @param sc
* @param tableNameString
* @param scan
* @return
*/
public static JavaRDD<Result> createResultRDD(JavaSparkContext sc, String tableNameString, Scan scan) throws IOException {
Configuration conf = createConfig();
String scanString = convertScanToString(scan);
conf.set(TableInputFormat.INPUT_TABLE, tableNameString);
conf.set(TableInputFormat.SCAN, scanString );
JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
return rdd.map(pair -> pair._2());
}
/**
* create config for a hbase queries
* @return
*/
public static Configuration createConfig() {
Configuration conf = HBaseConfiguration.create();
// when using Spark with HBase, scans are usually large
// this results in timeout exceptions, since the default 1 minute timeout is too small
String timeoutMillis = String.valueOf(SCAN_DURATION);
conf.set("hbase.client.scanner.timeout.period", timeoutMillis);
conf.set("hbase.rpc.timeout", timeoutMillis);
return conf;
}
/**
* convert scan to string
* @param scan
* @return
*/
private static String convertScanToString(Scan scan) throws IOException {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
return Base64.encodeBytes(proto.toByteArray());
}
public static void main(String[] args) throws IOException {
// init spark session
SparkSession spark = SparkSession.builder().appName("Spark HBase export")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.ui.showConsoleProgress", "false").getOrCreate();
String tableName_1 = "testTable";
String tableName_2 = "testTable";
// case 1 multi tables
byte[] startRow_1 = Bytes.toBytes("key_1");
byte[] stopRow_2 = Bytes.toBytes("key_3"); // exclusive
byte[] startRow_3 = Bytes.toBytes("key_3");
byte[] stopRow_4 = Bytes.toBytes("key_5"); // exclusive
byte[] columnFamily = Bytes.toBytes("c");
Scan scan_1 = new Scan(startRow_1, stopRow_2);
scan_1.addFamily(columnFamily);
scan_1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_1).getName());
Scan scan_2 = new Scan(startRow_3, stopRow_4);
scan_2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_2).getName());
// map to RDD[result]
JavaRDD<Result> rddMultiTable = createResultRDD(JavaSparkContext.fromSparkContext(spark.sparkContext()),
new Scan[]{scan_1, scan_2});
long count_1 =rddMultiTable.count();
logger.info("Scan 1: " + count_1);
// now you can further process RDD e.g. map to dataset
JavaRDD<Result> rddSingleTable = createResultRDD(JavaSparkContext.fromSparkContext(spark.sparkContext()), tableName_1, scan_1);
long count_2 = rddSingleTable.count();
logger.info("Scan 2: " + count_2);
// ...
}
}