Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Phoenix/Spark demo

Option 1: prebuilt VM

There is a prebuilt Centos 6.5 VM with the below components installed:

  • HDP 2.3.0.0-1754
  • Spark 1.3.1
  • Ambari service/view for OpenTSDB
  • Ambari service/view for Zeppelin
  • Intraday stock price data from Google finance in both csv and opentsdb format
    • Data for AAPL, GOOG, HDP for April 2015 has already been imported into both OpenTSDB andthe PRICES table in Phoenix

Steps:

  • Download .ova file from here and import into VMWare Fusion
  • Once it starts, create hosts file entry on local machine for sandbox.hortonworks.com
  • Launch Ambari by clicking here: http://sandbox.hortonworks.com:8080 and login as admin/admin
  • Start HBASE, OpenTSDB and (optional) Zeppelin services
  • Try examples from phoenix-spark by opening SSH session to the VM (ssh root@sandbox.hortonworks.com) and following steps here
  • Try OpenTSDB view and query data using steps here

Option 2: Manual setup using HDP 2.3.0.0-1754

  • Setup a VM running HDP 2.3/Ambari 2.1 build 233 using this repo file

  • Download spark 1.3.1

export HDP_VER=`hdp-select status hadoop-client | sed 's/hadoop-client - \(.*\)/\1/'`
echo "export HDP_VER=$HDP_VER" >> ~/.bashrc

wget http://d3kbcqa49mib13.cloudfront.net/spark-1.3.1-bin-hadoop2.6.tgz
tar -xzvf spark-1.3.1-bin-hadoop2.6.tgz
echo "spark.driver.extraJavaOptions -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
echo "spark.yarn.am.extraJavaOptions -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
#copy hbase-site.xml
cp /etc/hbase/conf/hbase-site.xml spark-1.3.1-bin-hadoop2.6/conf/
export YARN_CONF_DIR=/etc/hadoop/conf
echo "export YARN_CONF_DIR=$YARN_CONF_DIR" >> ~/.bashrc

Setup Phoenix table and import data

  • Remove Hbase maintenance mode
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Turn off maintenance mode for HBase"}, "Body": {"ServiceInfo": {"maintenance_state": "OFF"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
  • Start HBASE
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start HBASE via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
  • Run python code to generate stock price csv
cd
/bin/rm -f prices.csv
/bin/rm -f opentsd.input
wget https://raw.githubusercontent.com/abajwa-hw/opentsdb-service/master/scripts/google_intraday.py
python google_intraday.py AAPL > prices.csv
python google_intraday.py GOOG >> prices.csv
python google_intraday.py HDP >> prices.csv
python google_intraday.py ORCL >> prices.csv
python google_intraday.py MSFT >> prices.csv

#check output files
tail prices.csv  opentsd.input

  • Create sql file to create phoenix table
vi ~/prices.sql
drop table if exists PRICES;
drop table if exists prices;

create table PRICES (
 SYMBOL varchar(10),
 DATE   varchar(10),
 TIME varchar(10),
 OPEN varchar(10),
 HIGH varchar(10),
 LOW    varchar(10),
 CLOSE     varchar(10),
 VOLUME varchar(30),
 CONSTRAINT pk PRIMARY KEY (SYMBOL, DATE, TIME)
);
  • Create phoenix table and populate with csv data
/usr/hdp/2*/phoenix/bin/psql.py sandbox.hortonworks.com:2181:/hbase-unsecure ~/prices.sql ~/prices.csv
  • Connect to hbase via phoenix
/usr/hdp/*/phoenix/bin/sqlline.py sandbox.hortonworks.com:2181:/hbase-unsecure

  • Run sample query
select * from prices order by DATE, TIME limit 20;
!q

Try examples from phoenix-spark

  • Start spark shell
export SPARK_CLASSPATH=/etc/hbase/conf:/usr/hdp/2.3.0.0-1754/hbase/lib/hbase-protocol.jar

#start spark shell and pass in relevant jars to classpath
/root/spark-1.3.1-bin-hadoop2.6/bin/spark-shell  --driver-memory 512m --executor-memory 512m --conf hdp.version=$HDP_VER --jars \
/usr/hdp/2.3.0.0-1754/phoenix/phoenix-4.4.0.2.3.0.0-1754-client.jar 

  • Load as an RDD, using a Zookeeper URL. Other examples available here
import org.apache.phoenix.spark._ 
import org.apache.spark.rdd.RDD
val sqlCtx = new org.apache.spark.sql.SQLContext(sc)

val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
  "PRICES", Seq("SYMBOL","DATE","TIME", "OPEN","HIGH","LOW","CLOSE","VOLUME"), zkUrl = Some("localhost:2181:/hbase-unsecure")
)
// count rows
rdd.count()

//get first row
rdd.first()

//get fields from first row
val firstSymbol = rdd.first()("SYMBOL").asInstanceOf[String]
val firstVolume = rdd.first()("VOLUME").asInstanceOf[String]

//print top 100 rows
rdd.take(100).foreach(println)


// print all rows where AAPL volume is above 100100
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("VOLUME").asInstanceOf[String].toInt > 100100).foreach(println)

// print all rows where AAPL low is below 130.0
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("LOW").asInstanceOf[String].toDouble < 130.0).foreach(println)

// avg each row
val avg_each_row = rdd.map(row => ( row("HIGH").asInstanceOf[String].toDouble + row("LOW").asInstanceOf[String].toDouble ) / 2.0 )
avg_each_row.foreach(println)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment