Skip to content

Instantly share code, notes, and snippets.

@mravi
Created December 6, 2014 02:55
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mravi/444afe7f49821819c987 to your computer and use it in GitHub Desktop.
Save mravi/444afe7f49821819c987 to your computer and use it in GitHub Desktop.
Phoenix Spark Example
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.example.writable.StockBean;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.example.writable.StockWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class PhoenixSparkJob {
private static final String SPARK_MASTER_URL = "local[*]";
private static final String ZOOKEEPER_QUORUM_URL = "sandbox.hortonworks.com:2181";
public static void main (
String[] args)
throws IOException
{
final SparkConf sparkConf = new SparkConf()
.setAppName("phoenix-spark")
.set("spark.executor.memory", "2g")
.setMaster(SPARK_MASTER_URL);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
final Configuration configuration = HBaseConfiguration.create();
configuration.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_QUORUM_URL);
PhoenixConfigurationUtil.setInputTableName(configuration , "STOCKS");
PhoenixConfigurationUtil.setOutputTableName(configuration , "STOCKS");
PhoenixConfigurationUtil.setInputQuery(configuration, "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS");
PhoenixConfigurationUtil.setInputClass(configuration, StockWritable.class);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,"STOCK_NAME,RECORDING_YEAR,RECORDINGS_AVG");
configuration.setClass(JobContext.OUTPUT_FORMAT_CLASS_ATTR,PhoenixOutputFormat.class, OutputFormat.class);
@SuppressWarnings("unchecked")
JavaPairRDD<NullWritable, StockWritable> stocksRDD = jsc.newAPIHadoopRDD(
configuration,
PhoenixInputFormat.class,
NullWritable.class,
StockWritable.class);
System.out.println(String.format(" the number of records are [%s] ", stocksRDD.count()));
stocksRDD.mapToPair(new PairFunction<Tuple2<NullWritable,StockWritable>,NullWritable,StockWritable> () {
@Override
public Tuple2<NullWritable, StockWritable> call(Tuple2 tuple) throws Exception {
final StockWritable stockWritable = (StockWritable)tuple._2;
final StockBean bean = stockWritable.getStockBean();
double[] recordings = bean.getRecordings();
double sum = 0.0;
for(double recording: recordings) {
sum += recording;
}
double avg = sum / recordings.length;
bean.setAverage(avg);
stockWritable.setStockBean(bean);
return new Tuple2<NullWritable, StockWritable>(NullWritable.get(), stockWritable);
}
}).saveAsNewAPIHadoopDataset(configuration);
jsc.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment