Last active
March 2, 2019 21:46
-
-
Save dgadiraju/e21dd86b2f9cf6a8f1f3132785d9d0da to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "HBaseDemo" | |
version := "0.1" | |
scalaVersion := "2.11.12" | |
libraryDependencies += "com.typesafe" % "config" % "1.3.2" | |
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.0" | |
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.0" | |
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.8" | |
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.8" | |
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.1.8" | |
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.8" | |
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.3.0" | |
assemblyMergeStrategy in assembly := { | |
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard | |
case m if m.startsWith("META-INF") => MergeStrategy.discard | |
case PathList("javax", "servlet", xs@_*) => MergeStrategy.first | |
case PathList("org", "apache", xs@_*) => MergeStrategy.first | |
case "about.html" => MergeStrategy.rename | |
case "reference.conf" => MergeStrategy.concat | |
case _ => MergeStrategy.first | |
} | |
mainClass in assembly := Some("NYSELoad") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dev.zookeeper.quorum = localhost | |
dev.zookeeper.port = 2181 | |
dev.execution.mode = local | |
prod.zookeeper.quorum = nn01.itversity.com,nn02.itversity.com,rm01.itversity.com | |
prod.zookeeper.port = 2181 | |
prod.execution.mode = yarn-client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} | |
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table} | |
import org.apache.hadoop.hbase.util.Bytes | |
import com.typesafe.config.{Config, ConfigFactory} | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.spark.sql.{Row, SparkSession} | |
/** | |
* Created by itversity on 27/08/18. | |
*/ | |
object NYSELoadSpark { | |
def getHbaseConnection(conf: Config, env: String): Connection ={ | |
//Create Hbase Configuration Object | |
val hbaseConfig: Configuration = HBaseConfiguration.create() | |
hbaseConfig.set("hbase.zookeeper.quorum", | |
conf.getString("zookeeper.quorum")) | |
hbaseConfig.set("hbase.zookeeper.property.clientPort", | |
conf.getString("zookeeper.port")) | |
if(env != "dev") { | |
hbaseConfig.set("zookeeper.znode.parent", "/hbase-unsecure") | |
hbaseConfig.set("hbase.cluster.distributed", "true") | |
} | |
val connection = ConnectionFactory.createConnection(hbaseConfig) | |
connection | |
} | |
def buildPutList(table: Table, nyseRecord: Row) = { | |
val put = new Put(Bytes.toBytes( | |
nyseRecord.getString(1).substring(0,6) | |
+ "," + nyseRecord.get(0))) // Key | |
put.addColumn(Bytes.toBytes("sd"), | |
Bytes.toBytes(nyseRecord.get(1) + ",op"), | |
Bytes.toBytes(nyseRecord.getString(2))) | |
put.addColumn(Bytes.toBytes("sd"), | |
Bytes.toBytes(nyseRecord.get(1) + ",hp"), | |
Bytes.toBytes(nyseRecord.getString(3))) | |
put.addColumn(Bytes.toBytes("sd"), | |
Bytes.toBytes(nyseRecord.get(1) + ",lp"), | |
Bytes.toBytes(nyseRecord.getString(4))) | |
put.addColumn(Bytes.toBytes("sd"), | |
Bytes.toBytes(nyseRecord.get(1) + ",cp"), | |
Bytes.toBytes(nyseRecord.getString(5))) | |
put.addColumn(Bytes.toBytes("sd"), | |
Bytes.toBytes(nyseRecord.get(1) + ",v"), | |
Bytes.toBytes(nyseRecord.getString(6))) | |
put | |
} | |
def main(args: Array[String]): Unit = { | |
val env = args(0) | |
val conf = ConfigFactory.load.getConfig(env) | |
val spark = SparkSession. | |
builder. | |
master(conf.getString("execution.mode")). | |
appName("NYSE Load using Spark"). | |
getOrCreate() | |
val nyseData = spark.read.csv(args(1)) | |
nyseData.foreachPartition(records => { | |
val connection = getHbaseConnection(conf, env) | |
val table = connection. | |
getTable(TableName.valueOf("nyse:stock_data_wide")) | |
records.foreach(record => { | |
val row = buildPutList(table, record) | |
table.put(row) | |
}) | |
table.close | |
connection.close | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit \ | |
--class NYSELoadSpark \ | |
--master yarn \ | |
--conf spark.ui.port=4926 \ | |
--jars $(echo /external_jars/*.jar | tr ' ' ',') \ | |
hbasedemo_2.11-0.1.jar prod /public/nyse |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment