Skip to content

Instantly share code, notes, and snippets.

@mravi
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mravi/501fa00b942764eb0dca to your computer and use it in GitHub Desktop.
Save mravi/501fa00b942764eb0dca to your computer and use it in GitHub Desktop.
Phoenix MapReduce Example
package org.apache.phoenix.mapreduce.job;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.mapreduce.bean.StockAvgBean;
import org.apache.phoenix.mapreduce.bean.StockBean;
import org.apache.phoenix.mapreduce.writable.StockAverageWritable;
import org.apache.phoenix.mapreduce.writable.StockWritable;
public class StockAverageCalculationJob {
private final static String zkUrl = "zookeeperQuorum:2181";
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , StockWritable> {
private final Text name = new Text ();
@Override
protected void map(NullWritable key, StockWritable stockWritable, Context context)
throws IOException, InterruptedException {
final StockBean bean = stockWritable.getStockBean();
final String stockName = bean.getStockName();
name.set(stockName);
context.write(name, stockWritable);
}
}
public static class StockReducer extends Reducer<Text, StockWritable, NullWritable , StockAverageWritable> {
@Override
protected void reduce(Text stockName, Iterable<StockWritable> beans, Context context)
throws IOException, InterruptedException {
double total = 0.0;
int count = 0;
for(StockWritable bean : beans) {
total += bean.getStockBean().getClose();
count ++;
}
double avg = total / count ;
StockAvgBean stockAvgBean = new StockAvgBean();
stockAvgBean.setStockName(stockName.toString());
stockAvgBean.setAvgPrice(avg);
StockAverageWritable stockAggregateWritable = new StockAverageWritable();
stockAggregateWritable.setStockAvgBean(stockAvgBean);
context.write(NullWritable.get(), stockAggregateWritable);
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
conf.set(HConstants.ZOOKEEPER_QUORUM, );
final Job job = new Job(conf, "stock-stats");
final String selectQuery = "SELECT STOCK_NAME,RECORDING_DT,OPEN,CLOSE FROM STOCKS ";
PhoenixMapReduceUtil.setInput(job, StockWritable.class, "stocks", selectQuery);
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,AVG");
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StockWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockAverageWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.mapreduce.bean.StockAvgBean;
import com.google.common.base.Preconditions;
public class StockAverageWritable implements DBWritable,Writable {
private StockAvgBean stockAvgBean;
@Override
public void readFields(DataInput arg0) throws IOException {
// NO-OP
}
@Override
public void write(DataOutput arg0) throws IOException {
// NO -OP
}
@Override
public void readFields(ResultSet arg0) throws SQLException {
// NO-OP
}
@Override
public void write(PreparedStatement pstmt) throws SQLException {
Preconditions.checkNotNull(stockAvgBean);
pstmt.setString(1, stockAvgBean.getStockName());
pstmt.setDouble(2, stockAvgBean.getAvgPrice());
}
public final StockAvgBean getStockAvgBean() {
return stockAvgBean;
}
public final void setStockAvgBean(StockAvgBean stockAvgBean) {
this.stockAvgBean = stockAvgBean;
}
}
package org.apache.phoenix.mapreduce.bean;
public final class StockAvgBean {
private String stockName;
private Double avgPrice;
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public Double getAvgPrice() {
return avgPrice;
}
public void setAvgPrice(Double avgPrice) {
this.avgPrice = avgPrice;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((avgPrice == null) ? 0 : avgPrice.hashCode());
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StockAvgBean other = (StockAvgBean)obj;
if (stockName == null) {
if (other.stockName != null) return false;
} else if (!stockName.equals(other.stockName)) return false;
return true;
}
@Override
public String toString() {
return "StockAvgBean [stockName=" + stockName + ", avgPrice=" + avgPrice + "]";
}
}
package org.apache.phoenix.mapreduce.bean;
import java.util.Date;
public final class StockBean {
private String stockName;
private Date date;
private Double open;
private Double close;
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public final Double getOpen() {
return open;
}
public final void setOpen(Double open) {
this.open = open;
}
public final Double getClose() {
return close;
}
public final void setClose(Double close) {
this.close = close;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((date == null) ? 0 : date.hashCode());
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StockBean other = (StockBean)obj;
if (date == null) {
if (other.date != null) return false;
} else if (!date.equals(other.date)) return false;
if (stockName == null) {
if (other.stockName != null) return false;
} else if (!stockName.equals(other.stockName)) return false;
return true;
}
@Override
public String toString() {
return "StockBean [stockName=" + stockName + ", date=" + date + ", open=" + open + ", close=" + close + "]";
}
}
package org.apache.phoenix.mapreduce.generator;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.phoenix.util.QueryUtil;
public final class StockSampleDataGenerator {
protected final static String zkUrl = "zookeeperQuorum:2181";
protected final static String STOCK_TABLE_NAME = "stocks";
protected final static String STOCK_STATS_TABLE_NAME = "stock_stats";
public static void main(String[] args) throws SQLException {
StockSampleDataGenerator dataGenerator = new StockSampleDataGenerator();
dataGenerator.doIt ();
}
private void doIt() throws SQLException {
// create tables.
createTables();
// insert data.
insertData();
selectAverage();
}
protected void insertData() throws SQLException {
final Properties props = new Properties();
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props);
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2009-01-02','yyyy-MM-dd'),85.88,91.04)");
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2008-01-02','yyyy-MM-dd'),199.27,200.26)");
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2007-01-03','yyyy-MM-dd'),86.29,86.58)");
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2009-01-02','yyyy-MM-dd'),16.41,17.00)");
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2008-01-02','yyyy-MM-dd'),27.00,27.30)");
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2007-01-03','yyyy-MM-dd'),27.46,27.98)");
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2006-01-03','yyyy-MM-dd'),17.21,17.49)");
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2009-01-02','yyyy-MM-dd'),308.60,321.82)");
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2008-01-02','yyyy-MM-dd'),692.87,697.37)");
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2007-01-03','yyyy-MM-dd'),466.00,476.66)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2009-01-02','yyyy-MM-dd'),19.53,20.40)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2008-01-02','yyyy-MM-dd'),35.79,35.96)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2007-01-03','yyyy-MM-dd'),29.91,30.25)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2006-01-03','yyyy-MM-dd'),26.25,27.00)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2001-01-02','yyyy-MM-dd'),44.13,45.00)");
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2000-01-03','yyyy-MM-dd'),117.37,118.62)");
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2009-01-02','yyyy-MM-dd'),12.17,12.85)");
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2008-01-02','yyyy-MM-dd'),23.80,24.15)");
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2007-01-03','yyyy-MM-dd'),25.85,26.26)");
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2006-01-03','yyyy-MM-dd'),39.69,41.22)");
conn.commit();
conn.close();
}
public void createTables() throws SQLException {
final Properties props = new Properties();
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props);
final String stocksDDL = String.format("CREATE TABLE IF NOT EXISTS %s " +
"(STOCK_NAME VARCHAR NOT NULL ,RECORDING_DT DATE NOT NULL ,OPEN DOUBLE,CLOSE DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME , RECORDING_DT))\n", STOCK_TABLE_NAME);
conn.createStatement().execute(stocksDDL);
final String stockStatsDDL = String.format("CREATE TABLE IF NOT EXISTS %s " +
"(STOCK_NAME VARCHAR NOT NULL , AVG DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME))\n", STOCK_STATS_TABLE_NAME);
conn.createStatement().execute(stockStatsDDL);
}
public void selectAverage() throws SQLException {
final Properties props = new Properties();
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props);
final String stocksDDL = String.format("SELECT * from %s\n", STOCK_STATS_TABLE_NAME);
final ResultSet rs = conn.createStatement().executeQuery(stocksDDL);
while(rs.next()) {
final String stockName = rs.getString(1);
final Double avgPrice = rs.getDouble(2);
System.out.println(String.format(" The average price for stock [%s] is [%s] " , stockName , avgPrice));
}
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.mapreduce.bean.StockBean;
import com.google.common.base.Preconditions;
public class StockWritable implements DBWritable,Writable {
private StockBean stockBean;
@Override
public void readFields(DataInput input) throws IOException {
final String stockName = WritableUtils.readString(input);
final Long time = WritableUtils.readVLong(input);
final Double open = input.readDouble();
final Double close = input.readDouble();
stockBean = new StockBean();
stockBean.setStockName(stockName);
stockBean.setDate(new Date(time));
stockBean.setOpen(open);
stockBean.setClose(close);
}
@Override
public void write(DataOutput output) throws IOException {
Preconditions.checkNotNull(stockBean);
WritableUtils.writeString(output, stockBean.getStockName());
WritableUtils.writeVLong(output, stockBean.getDate().getTime());
output.writeDouble(stockBean.getOpen());
output.writeDouble(stockBean.getClose());
}
@Override
public void readFields(ResultSet rs) throws SQLException {
final String stockName = rs.getString("STOCK_NAME");
final Date date = rs.getDate("RECORDING_DT");
final Double open = rs.getDouble("OPEN");
final Double close = rs.getDouble("CLOSE");
stockBean = new StockBean();
stockBean.setStockName(stockName);
stockBean.setDate(date);
stockBean.setOpen(open);
stockBean.setClose(close);
}
@Override
public void write(PreparedStatement pstmt) throws SQLException {
// NO-OP
}
public final StockBean getStockBean() {
return stockBean;
}
public final void setStockBean(StockBean stockBean) {
this.stockBean = stockBean;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment