Skip to content

Instantly share code, notes, and snippets.

@mravi
Created December 6, 2014 02:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mravi/a340200bfef66192bf52 to your computer and use it in GitHub Desktop.
Save mravi/a340200bfef66192bf52 to your computer and use it in GitHub Desktop.
Phoenix MR Example
package org.apache.phoenix.example.bean;
import java.util.Arrays;
public final class StockBean {
private String stockName;
private Integer year;
private double[] recordings;
private double average;
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
public double[] getRecordings() {
return recordings;
}
public void setRecordings(double[] recordings) {
this.recordings = recordings;
}
public double getAverage() {
return average;
}
public void setAverage(double average) {
this.average = average;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
long temp;
temp = Double.doubleToLongBits(average);
result = prime * result + (int)(temp ^ (temp >>> 32));
result = prime * result + Arrays.hashCode(recordings);
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode());
result = prime * result + ((year == null) ? 0 : year.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StockBean other = (StockBean)obj;
if (Double.doubleToLongBits(average) != Double.doubleToLongBits(other.average)) return false;
if (!Arrays.equals(recordings, other.recordings)) return false;
if (stockName == null) {
if (other.stockName != null) return false;
} else if (!stockName.equals(other.stockName)) return false;
if (year == null) {
if (other.year != null) return false;
} else if (!year.equals(other.year)) return false;
return true;
}
}
package org.apache.phoenix.example.job;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.bean.StockBean;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.mapreduce.example.StockWritable;
public class StockComputationJob {
protected final static String zkUrl = "sandbox.hortonworks.com:2181";
public static class StockMapper extends Mapper<NullWritable, StockWritable, NullWritable , StockWritable> {
@Override
protected void map(NullWritable key, StockWritable stockWritable, Context context)
throws IOException, InterruptedException {
final StockBean bean = stockWritable.getStockBean();
double[] recordings = bean.getRecordings();
if(recordings.length == 0) {
return;
}
double sum = 0.0;
for(double recording: recordings) {
sum += recording;
}
double avg = sum / recordings.length;
bean.setAverage(avg);
stockWritable.setStockBean(bean);
context.write(NullWritable.get(), stockWritable);
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
final Job job = Job.getInstance(conf, "stock-stats-job");
final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS ";
PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCKS", selectQuery);
PhoenixMapReduceUtil.setOutput(job, "STOCKS", "STOCK_NAME,RECORDING_YEAR,RECORDINGS_AVG");
job.setMapperClass(StockMapper.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
package org.apache.phoenix.example.writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.mapreduce.example.StockBean;
import com.google.common.base.Preconditions;
public class StockWritable implements DBWritable,Writable {
private StockBean stockBean;
@Override
public void readFields(DataInput input) throws IOException {
}
@Override
public void write(DataOutput output) throws IOException {
}
@Override
public void readFields(ResultSet rs) throws SQLException {
final String stockName = rs.getString("STOCK_NAME");
final int year = rs.getInt("RECORDING_YEAR");
final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
final double[] recordings = (double[])recordingsArray.getArray();
stockBean = new StockBean();
stockBean.setStockName(stockName);
stockBean.setYear(year);
stockBean.setRecordings(recordings);
}
@Override
public void write(PreparedStatement pstmt) throws SQLException {
Preconditions.checkNotNull(stockBean);
pstmt.setString(1, stockBean.getStockName());
pstmt.setInt(2, stockBean.getYear());
pstmt.setDouble(3, stockBean.getAverage());
}
public final StockBean getStockBean() {
return stockBean;
}
public final void setStockBean(StockBean stockBean) {
this.stockBean = stockBean;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment