Last active
March 16, 2019 06:10
-
-
Save saisgit/774b219e1a93d3f929f1a4c829d26804 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.apache.hadoop.hbase.client.Result | |
import org.apache.hadoop.hbase.HBaseConfiguration | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable | |
object readHbaseTableAsDF extends Serializable { | |
case class EmpRow(empID:String, name:String, city:String) | |
def parseRow(result:Result):EmpRow { | |
val rowkey = Bytes.toString(result.getRow()) | |
val cfDataBytes = Bytes.toBytes("metadata") | |
val d0 = rowkey | |
val d1 = Bytes.toString(result.getValue(cfDataBytes, Bytes.toBytes("name"))) | |
val d2 = Bytes.toString(result.getValue(cfDataBytes, Bytes.toBytes("city"))) | |
EmpRow(d0,d1,d2) | |
} | |
def main(args:Array[String]):Unit{ | |
val spark = SparkSession.builder().appName("Consumer").getOrCreate() | |
val hconf = HBaseConfiguration.create() | |
hconf.set("hbase.zookeeper.quorum","localhost") | |
hconf.set(TableInputFormat.INPUT_TABLE,"emp") | |
val hbaseRDD = spark.sparkContext.newAPIHadoopRDD( | |
hconf, | |
classof[TableInputFormat], | |
classof[ImmutableBytesWritable], | |
classof[Result] | |
) | |
import spark.implicits._ | |
val resultRDD = hbaseRDD.map(tuple => tuple._2) | |
val empRDD = resultRDD.map(parseRow) | |
val empDF = empRDD.toDF | |
empDF.show() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment