Phoenix Spark Example
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HConstants; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.OutputFormat; | |
import org.apache.phoenix.mapreduce.PhoenixInputFormat; | |
import org.apache.phoenix.mapreduce.PhoenixOutputFormat; | |
import org.apache.phoenix.mapreduce.example.writable.StockBean; | |
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; | |
import org.apache.phoenix.mapreduce.example.writable.StockWritable; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.PairFunction; | |
import scala.Tuple2; | |
public class PhoenixSparkJob { | |
private static final String SPARK_MASTER_URL = "local[*]"; | |
private static final String ZOOKEEPER_QUORUM_URL = "sandbox.hortonworks.com:2181"; | |
public static void main ( | |
String[] args) | |
throws IOException | |
{ | |
final SparkConf sparkConf = new SparkConf() | |
.setAppName("phoenix-spark") | |
.set("spark.executor.memory", "2g") | |
.setMaster(SPARK_MASTER_URL); | |
JavaSparkContext jsc = new JavaSparkContext(sparkConf); | |
final Configuration configuration = HBaseConfiguration.create(); | |
configuration.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_QUORUM_URL); | |
PhoenixConfigurationUtil.setInputTableName(configuration , "STOCKS"); | |
PhoenixConfigurationUtil.setOutputTableName(configuration , "STOCKS"); | |
PhoenixConfigurationUtil.setInputQuery(configuration, "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS"); | |
PhoenixConfigurationUtil.setInputClass(configuration, StockWritable.class); | |
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,"STOCK_NAME,RECORDING_YEAR,RECORDINGS_AVG"); | |
configuration.setClass(JobContext.OUTPUT_FORMAT_CLASS_ATTR,PhoenixOutputFormat.class, OutputFormat.class); | |
@SuppressWarnings("unchecked") | |
JavaPairRDD<NullWritable, StockWritable> stocksRDD = jsc.newAPIHadoopRDD( | |
configuration, | |
PhoenixInputFormat.class, | |
NullWritable.class, | |
StockWritable.class); | |
System.out.println(String.format(" the number of records are [%s] ", stocksRDD.count())); | |
stocksRDD.mapToPair(new PairFunction<Tuple2<NullWritable,StockWritable>,NullWritable,StockWritable> () { | |
@Override | |
public Tuple2<NullWritable, StockWritable> call(Tuple2 tuple) throws Exception { | |
final StockWritable stockWritable = (StockWritable)tuple._2; | |
final StockBean bean = stockWritable.getStockBean(); | |
double[] recordings = bean.getRecordings(); | |
double sum = 0.0; | |
for(double recording: recordings) { | |
sum += recording; | |
} | |
double avg = sum / recordings.length; | |
bean.setAverage(avg); | |
stockWritable.setStockBean(bean); | |
return new Tuple2<NullWritable, StockWritable>(NullWritable.get(), stockWritable); | |
} | |
}).saveAsNewAPIHadoopDataset(configuration); | |
jsc.stop(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment