-
-
Save mravi/501fa00b942764eb0dca to your computer and use it in GitHub Desktop.
Phoenix MapReduce Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.mapreduce.job; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HConstants; | |
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.phoenix.mapreduce.PhoenixInputFormat; | |
import org.apache.phoenix.mapreduce.PhoenixOutputFormat; | |
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; | |
import org.apache.phoenix.mapreduce.bean.StockAvgBean; | |
import org.apache.phoenix.mapreduce.bean.StockBean; | |
import org.apache.phoenix.mapreduce.writable.StockAverageWritable; | |
import org.apache.phoenix.mapreduce.writable.StockWritable; | |
public class StockAverageCalculationJob { | |
private final static String zkUrl = "zookeeperQuorum:2181"; | |
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , StockWritable> { | |
private final Text name = new Text (); | |
@Override | |
protected void map(NullWritable key, StockWritable stockWritable, Context context) | |
throws IOException, InterruptedException { | |
final StockBean bean = stockWritable.getStockBean(); | |
final String stockName = bean.getStockName(); | |
name.set(stockName); | |
context.write(name, stockWritable); | |
} | |
} | |
public static class StockReducer extends Reducer<Text, StockWritable, NullWritable , StockAverageWritable> { | |
@Override | |
protected void reduce(Text stockName, Iterable<StockWritable> beans, Context context) | |
throws IOException, InterruptedException { | |
double total = 0.0; | |
int count = 0; | |
for(StockWritable bean : beans) { | |
total += bean.getStockBean().getClose(); | |
count ++; | |
} | |
double avg = total / count ; | |
StockAvgBean stockAvgBean = new StockAvgBean(); | |
stockAvgBean.setStockName(stockName.toString()); | |
stockAvgBean.setAvgPrice(avg); | |
StockAverageWritable stockAggregateWritable = new StockAverageWritable(); | |
stockAggregateWritable.setStockAvgBean(stockAvgBean); | |
context.write(NullWritable.get(), stockAggregateWritable); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final Configuration conf = new Configuration(); | |
HBaseConfiguration.addHbaseResources(conf); | |
conf.set(HConstants.ZOOKEEPER_QUORUM, ); | |
final Job job = new Job(conf, "stock-stats"); | |
final String selectQuery = "SELECT STOCK_NAME,RECORDING_DT,OPEN,CLOSE FROM STOCKS "; | |
PhoenixMapReduceUtil.setInput(job, StockWritable.class, "stocks", selectQuery); | |
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,AVG"); | |
job.setMapperClass(StockMapper.class); | |
job.setReducerClass(StockReducer.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(StockWritable.class); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(StockAverageWritable.class); | |
TableMapReduceUtil.addDependencyJars(job); | |
job.waitForCompletion(true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | |
import org.apache.phoenix.mapreduce.bean.StockAvgBean; | |
import com.google.common.base.Preconditions; | |
public class StockAverageWritable implements DBWritable,Writable { | |
private StockAvgBean stockAvgBean; | |
@Override | |
public void readFields(DataInput arg0) throws IOException { | |
// NO-OP | |
} | |
@Override | |
public void write(DataOutput arg0) throws IOException { | |
// NO -OP | |
} | |
@Override | |
public void readFields(ResultSet arg0) throws SQLException { | |
// NO-OP | |
} | |
@Override | |
public void write(PreparedStatement pstmt) throws SQLException { | |
Preconditions.checkNotNull(stockAvgBean); | |
pstmt.setString(1, stockAvgBean.getStockName()); | |
pstmt.setDouble(2, stockAvgBean.getAvgPrice()); | |
} | |
public final StockAvgBean getStockAvgBean() { | |
return stockAvgBean; | |
} | |
public final void setStockAvgBean(StockAvgBean stockAvgBean) { | |
this.stockAvgBean = stockAvgBean; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.mapreduce.bean; | |
public final class StockAvgBean { | |
private String stockName; | |
private Double avgPrice; | |
public String getStockName() { | |
return stockName; | |
} | |
public void setStockName(String stockName) { | |
this.stockName = stockName; | |
} | |
public Double getAvgPrice() { | |
return avgPrice; | |
} | |
public void setAvgPrice(Double avgPrice) { | |
this.avgPrice = avgPrice; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((avgPrice == null) ? 0 : avgPrice.hashCode()); | |
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) return true; | |
if (obj == null) return false; | |
if (getClass() != obj.getClass()) return false; | |
StockAvgBean other = (StockAvgBean)obj; | |
if (stockName == null) { | |
if (other.stockName != null) return false; | |
} else if (!stockName.equals(other.stockName)) return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return "StockAvgBean [stockName=" + stockName + ", avgPrice=" + avgPrice + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.mapreduce.bean; | |
import java.util.Date; | |
public final class StockBean { | |
private String stockName; | |
private Date date; | |
private Double open; | |
private Double close; | |
public String getStockName() { | |
return stockName; | |
} | |
public void setStockName(String stockName) { | |
this.stockName = stockName; | |
} | |
public Date getDate() { | |
return date; | |
} | |
public void setDate(Date date) { | |
this.date = date; | |
} | |
public final Double getOpen() { | |
return open; | |
} | |
public final void setOpen(Double open) { | |
this.open = open; | |
} | |
public final Double getClose() { | |
return close; | |
} | |
public final void setClose(Double close) { | |
this.close = close; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((date == null) ? 0 : date.hashCode()); | |
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) return true; | |
if (obj == null) return false; | |
if (getClass() != obj.getClass()) return false; | |
StockBean other = (StockBean)obj; | |
if (date == null) { | |
if (other.date != null) return false; | |
} else if (!date.equals(other.date)) return false; | |
if (stockName == null) { | |
if (other.stockName != null) return false; | |
} else if (!stockName.equals(other.stockName)) return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return "StockBean [stockName=" + stockName + ", date=" + date + ", open=" + open + ", close=" + close + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.mapreduce.generator; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.Properties; | |
import org.apache.phoenix.util.QueryUtil; | |
public final class StockSampleDataGenerator { | |
protected final static String zkUrl = "zookeeperQuorum:2181"; | |
protected final static String STOCK_TABLE_NAME = "stocks"; | |
protected final static String STOCK_STATS_TABLE_NAME = "stock_stats"; | |
public static void main(String[] args) throws SQLException { | |
StockSampleDataGenerator dataGenerator = new StockSampleDataGenerator(); | |
dataGenerator.doIt (); | |
} | |
private void doIt() throws SQLException { | |
// create tables. | |
createTables(); | |
// insert data. | |
insertData(); | |
selectAverage(); | |
} | |
protected void insertData() throws SQLException { | |
final Properties props = new Properties(); | |
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props); | |
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2009-01-02','yyyy-MM-dd'),85.88,91.04)"); | |
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2008-01-02','yyyy-MM-dd'),199.27,200.26)"); | |
conn.createStatement().execute("upsert into stocks values ('AAPL',TO_DATE('2007-01-03','yyyy-MM-dd'),86.29,86.58)"); | |
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2009-01-02','yyyy-MM-dd'),16.41,17.00)"); | |
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2008-01-02','yyyy-MM-dd'),27.00,27.30)"); | |
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2007-01-03','yyyy-MM-dd'),27.46,27.98)"); | |
conn.createStatement().execute("upsert into stocks values ('CSCO',TO_DATE('2006-01-03','yyyy-MM-dd'),17.21,17.49)"); | |
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2009-01-02','yyyy-MM-dd'),308.60,321.82)"); | |
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2008-01-02','yyyy-MM-dd'),692.87,697.37)"); | |
conn.createStatement().execute("upsert into stocks values ('GOOG',TO_DATE('2007-01-03','yyyy-MM-dd'),466.00,476.66)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2009-01-02','yyyy-MM-dd'),19.53,20.40)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2008-01-02','yyyy-MM-dd'),35.79,35.96)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2007-01-03','yyyy-MM-dd'),29.91,30.25)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2006-01-03','yyyy-MM-dd'),26.25,27.00)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2001-01-02','yyyy-MM-dd'),44.13,45.00)"); | |
conn.createStatement().execute("upsert into stocks values ('MSFT',TO_DATE('2000-01-03','yyyy-MM-dd'),117.37,118.62)"); | |
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2009-01-02','yyyy-MM-dd'),12.17,12.85)"); | |
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2008-01-02','yyyy-MM-dd'),23.80,24.15)"); | |
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2007-01-03','yyyy-MM-dd'),25.85,26.26)"); | |
conn.createStatement().execute("upsert into stocks values ('YHOO',TO_DATE('2006-01-03','yyyy-MM-dd'),39.69,41.22)"); | |
conn.commit(); | |
conn.close(); | |
} | |
public void createTables() throws SQLException { | |
final Properties props = new Properties(); | |
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props); | |
final String stocksDDL = String.format("CREATE TABLE IF NOT EXISTS %s " + | |
"(STOCK_NAME VARCHAR NOT NULL ,RECORDING_DT DATE NOT NULL ,OPEN DOUBLE,CLOSE DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME , RECORDING_DT))\n", STOCK_TABLE_NAME); | |
conn.createStatement().execute(stocksDDL); | |
final String stockStatsDDL = String.format("CREATE TABLE IF NOT EXISTS %s " + | |
"(STOCK_NAME VARCHAR NOT NULL , AVG DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME))\n", STOCK_STATS_TABLE_NAME); | |
conn.createStatement().execute(stockStatsDDL); | |
} | |
public void selectAverage() throws SQLException { | |
final Properties props = new Properties(); | |
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(zkUrl), props); | |
final String stocksDDL = String.format("SELECT * from %s\n", STOCK_STATS_TABLE_NAME); | |
final ResultSet rs = conn.createStatement().executeQuery(stocksDDL); | |
while(rs.next()) { | |
final String stockName = rs.getString(1); | |
final Double avgPrice = rs.getDouble(2); | |
System.out.println(String.format(" The average price for stock [%s] is [%s] " , stockName , avgPrice)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.Date; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableUtils; | |
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | |
import org.apache.phoenix.mapreduce.bean.StockBean; | |
import com.google.common.base.Preconditions; | |
public class StockWritable implements DBWritable,Writable { | |
private StockBean stockBean; | |
@Override | |
public void readFields(DataInput input) throws IOException { | |
final String stockName = WritableUtils.readString(input); | |
final Long time = WritableUtils.readVLong(input); | |
final Double open = input.readDouble(); | |
final Double close = input.readDouble(); | |
stockBean = new StockBean(); | |
stockBean.setStockName(stockName); | |
stockBean.setDate(new Date(time)); | |
stockBean.setOpen(open); | |
stockBean.setClose(close); | |
} | |
@Override | |
public void write(DataOutput output) throws IOException { | |
Preconditions.checkNotNull(stockBean); | |
WritableUtils.writeString(output, stockBean.getStockName()); | |
WritableUtils.writeVLong(output, stockBean.getDate().getTime()); | |
output.writeDouble(stockBean.getOpen()); | |
output.writeDouble(stockBean.getClose()); | |
} | |
@Override | |
public void readFields(ResultSet rs) throws SQLException { | |
final String stockName = rs.getString("STOCK_NAME"); | |
final Date date = rs.getDate("RECORDING_DT"); | |
final Double open = rs.getDouble("OPEN"); | |
final Double close = rs.getDouble("CLOSE"); | |
stockBean = new StockBean(); | |
stockBean.setStockName(stockName); | |
stockBean.setDate(date); | |
stockBean.setOpen(open); | |
stockBean.setClose(close); | |
} | |
@Override | |
public void write(PreparedStatement pstmt) throws SQLException { | |
// NO-OP | |
} | |
public final StockBean getStockBean() { | |
return stockBean; | |
} | |
public final void setStockBean(StockBean stockBean) { | |
this.stockBean = stockBean; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment