Skip to content

Instantly share code, notes, and snippets.

What would you like to do?

Phoenix/Spark demo

Option 1: prebuilt VM

There is a prebuilt Centos 6.5 VM with the below components installed:

  • HDP
  • Spark 1.3.1
  • Ambari service/view for OpenTSDB
  • Ambari service/view for Zeppelin
  • Intraday stock price data from Google finance in both csv and opentsdb format
    • Data for AAPL, GOOG, HDP for April 2015 has already been imported into both OpenTSDB andthe PRICES table in Phoenix


  • Download .ova file from here and import into VMWare Fusion
  • Once it starts, create hosts file entry on local machine for
  • Launch Ambari by clicking here: and login as admin/admin
  • Start HBASE, OpenTSDB and (optional) Zeppelin services
  • Try examples from phoenix-spark by opening SSH session to the VM (ssh and following steps here
  • Try OpenTSDB view and query data using steps here

Option 2: Manual setup using HDP

  • Setup a VM running HDP 2.3/Ambari 2.1 build 233 using this repo file

  • Download spark 1.3.1

export HDP_VER=`hdp-select status hadoop-client | sed 's/hadoop-client - \(.*\)/\1/'`
echo "export HDP_VER=$HDP_VER" >> ~/.bashrc

tar -xzvf spark-1.3.1-bin-hadoop2.6.tgz
echo "spark.driver.extraJavaOptions -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
echo " -Dhdp.version=$HDP_VER" >> spark-1.3.1-bin-hadoop2.6/conf/spark-defaults.conf
#copy hbase-site.xml
cp /etc/hbase/conf/hbase-site.xml spark-1.3.1-bin-hadoop2.6/conf/
export YARN_CONF_DIR=/etc/hadoop/conf
echo "export YARN_CONF_DIR=$YARN_CONF_DIR" >> ~/.bashrc

Setup Phoenix table and import data

  • Remove Hbase maintenance mode
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Turn off maintenance mode for HBase"}, "Body": {"ServiceInfo": {"maintenance_state": "OFF"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
  • Start HBASE
curl -u admin:admin -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo": {"context" :"Start HBASE via REST"}, "Body": {"ServiceInfo": {"state": "STARTED"}}}' http://localhost:8080/api/v1/clusters/Sandbox/services/HBASE
  • Run python code to generate stock price csv
/bin/rm -f prices.csv
/bin/rm -f opentsd.input
python AAPL > prices.csv
python GOOG >> prices.csv
python HDP >> prices.csv
python ORCL >> prices.csv
python MSFT >> prices.csv

#check output files
tail prices.csv  opentsd.input

  • Create sql file to create phoenix table
vi ~/prices.sql
drop table if exists PRICES;
drop table if exists prices;

create table PRICES (
 SYMBOL varchar(10),
 DATE   varchar(10),
 TIME varchar(10),
 OPEN varchar(10),
 HIGH varchar(10),
 LOW    varchar(10),
 CLOSE     varchar(10),
 VOLUME varchar(30),
  • Create phoenix table and populate with csv data
/usr/hdp/2*/phoenix/bin/ ~/prices.sql ~/prices.csv
  • Connect to hbase via phoenix

  • Run sample query
select * from prices order by DATE, TIME limit 20;

Try examples from phoenix-spark

  • Start spark shell
export SPARK_CLASSPATH=/etc/hbase/conf:/usr/hdp/

#start spark shell and pass in relevant jars to classpath
/root/spark-1.3.1-bin-hadoop2.6/bin/spark-shell  --driver-memory 512m --executor-memory 512m --conf hdp.version=$HDP_VER --jars \

  • Load as an RDD, using a Zookeeper URL. Other examples available here
import org.apache.phoenix.spark._ 
import org.apache.spark.rdd.RDD
val sqlCtx = new org.apache.spark.sql.SQLContext(sc)

val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
  "PRICES", Seq("SYMBOL","DATE","TIME", "OPEN","HIGH","LOW","CLOSE","VOLUME"), zkUrl = Some("localhost:2181:/hbase-unsecure")
// count rows

//get first row

//get fields from first row
val firstSymbol = rdd.first()("SYMBOL").asInstanceOf[String]
val firstVolume = rdd.first()("VOLUME").asInstanceOf[String]

//print top 100 rows

// print all rows where AAPL volume is above 100100
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("VOLUME").asInstanceOf[String].toInt > 100100).foreach(println)

// print all rows where AAPL low is below 130.0
rdd.filter(row => row("SYMBOL") == "AAPL").filter(row => row("LOW").asInstanceOf[String].toDouble < 130.0).foreach(println)

// avg each row
val avg_each_row = => ( row("HIGH").asInstanceOf[String].toDouble + row("LOW").asInstanceOf[String].toDouble ) / 2.0 )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment