Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active March 2, 2019 21:46
Show Gist options
  • Save dgadiraju/e21dd86b2f9cf6a8f1f3132785d9d0da to your computer and use it in GitHub Desktop.
Save dgadiraju/e21dd86b2f9cf6a8f1f3132785d9d0da to your computer and use it in GitHub Desktop.
name := "HBaseDemo"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "com.typesafe" % "config" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.0"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.8"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.8"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.1.8"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.8"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.3.0"
assemblyMergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.startsWith("META-INF") => MergeStrategy.discard
case PathList("javax", "servlet", xs@_*) => MergeStrategy.first
case PathList("org", "apache", xs@_*) => MergeStrategy.first
case "about.html" => MergeStrategy.rename
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
mainClass in assembly := Some("NYSELoad")
dev.zookeeper.quorum = localhost
dev.zookeeper.port = 2181
dev.execution.mode = local
prod.zookeeper.quorum = nn01.itversity.com,nn02.itversity.com,rm01.itversity.com
prod.zookeeper.port = 2181
prod.execution.mode = yarn-client
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{Row, SparkSession}
/**
* Created by itversity on 27/08/18.
*/
object NYSELoadSpark {
def getHbaseConnection(conf: Config, env: String): Connection ={
//Create Hbase Configuration Object
val hbaseConfig: Configuration = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum",
conf.getString("zookeeper.quorum"))
hbaseConfig.set("hbase.zookeeper.property.clientPort",
conf.getString("zookeeper.port"))
if(env != "dev") {
hbaseConfig.set("zookeeper.znode.parent", "/hbase-unsecure")
hbaseConfig.set("hbase.cluster.distributed", "true")
}
val connection = ConnectionFactory.createConnection(hbaseConfig)
connection
}
def buildPutList(table: Table, nyseRecord: Row) = {
val put = new Put(Bytes.toBytes(
nyseRecord.getString(1).substring(0,6)
+ "," + nyseRecord.get(0))) // Key
put.addColumn(Bytes.toBytes("sd"),
Bytes.toBytes(nyseRecord.get(1) + ",op"),
Bytes.toBytes(nyseRecord.getString(2)))
put.addColumn(Bytes.toBytes("sd"),
Bytes.toBytes(nyseRecord.get(1) + ",hp"),
Bytes.toBytes(nyseRecord.getString(3)))
put.addColumn(Bytes.toBytes("sd"),
Bytes.toBytes(nyseRecord.get(1) + ",lp"),
Bytes.toBytes(nyseRecord.getString(4)))
put.addColumn(Bytes.toBytes("sd"),
Bytes.toBytes(nyseRecord.get(1) + ",cp"),
Bytes.toBytes(nyseRecord.getString(5)))
put.addColumn(Bytes.toBytes("sd"),
Bytes.toBytes(nyseRecord.get(1) + ",v"),
Bytes.toBytes(nyseRecord.getString(6)))
put
}
def main(args: Array[String]): Unit = {
val env = args(0)
val conf = ConfigFactory.load.getConfig(env)
val spark = SparkSession.
builder.
master(conf.getString("execution.mode")).
appName("NYSE Load using Spark").
getOrCreate()
val nyseData = spark.read.csv(args(1))
nyseData.foreachPartition(records => {
val connection = getHbaseConnection(conf, env)
val table = connection.
getTable(TableName.valueOf("nyse:stock_data_wide"))
records.foreach(record => {
val row = buildPutList(table, record)
table.put(row)
})
table.close
connection.close
})
}
}
spark-submit \
--class NYSELoadSpark \
--master yarn \
--conf spark.ui.port=4926 \
--jars $(echo /external_jars/*.jar | tr ' ' ',') \
hbasedemo_2.11-0.1.jar prod /public/nyse
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment