Skip to content

Instantly share code, notes, and snippets.

@sankars
Last active April 2, 2019 10:06
Show Gist options
  • Save sankars/5371069 to your computer and use it in GitHub Desktop.
Save sankars/5371069 to your computer and use it in GitHub Desktop.
Mapreduce job to process data stored in MySQL.
/* hadoop jar mysqlmapr.jar org.jm.samples.ComputeEnergyForCustomer -libjars /path/mysql-connector-java-5.1.13-bin.jar */
package org.jm.samples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ComputeEnergyForCustomer extends Configured implements Tool {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
int exitCode = ToolRunner.run(new ComputeEnergyForCustomer(), args);
System.exit(exitCode);
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
@Override
public int run(String[] arg0) throws Exception {
runJob();
return 0;
}
public static void runJob() throws Exception {
try {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://172.27.138.195/sms_dev_db"
+ "?user=dev_user&password=pass@123");
Job job = new Job(conf, "ComputeEnergyForCustomer");
job.setJarByClass(ComputeEnergyForCustomer.class);
job.setMapperClass(ComputeEnergyForCustomerMapper.class);
job.setReducerClass(ComputeEnergyForCustomerReducer.class);
job.setInputFormatClass(DBInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(MyRecord.class);
job.setOutputValueClass(NullWritable.class);
DBInputFormat.setInput(job, MyRecord.class,
"select device_id,kwh from SMS_REPORT_DATA Where CUSTOMER_ID =1 AND SC_TIME=1353951807000","select 1");
DBOutputFormat.setOutput(job, "results", "device_id,kwh1");
job.waitForCompletion(true);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static class ComputeEnergyForCustomerMapper extends
Mapper<LongWritable, MyRecord, LongWritable, DoubleWritable> {
@Override
protected void map(LongWritable key, MyRecord value, Context context)
throws IOException, InterruptedException {
LongWritable deviceid = new LongWritable(value.deviceid);
DoubleWritable kwh = new DoubleWritable(value.kwh);
context.write(deviceid, kwh);
}
}
static class ComputeEnergyForCustomerReducer extends
Reducer<LongWritable, DoubleWritable, MyRecord, NullWritable> {
@Override
protected void reduce(LongWritable key, Iterable<DoubleWritable> arg1,
Context context) throws IOException, InterruptedException {
double sumActualKWHForDay = 0;
for (DoubleWritable actualKWH : arg1) {
sumActualKWHForDay = sumActualKWHForDay + actualKWH.get();
}
MyRecord rec = new MyRecord();
rec.deviceid = key.get();
rec.kwh = sumActualKWHForDay;
context.write(rec, NullWritable.get());
}
}
static class MyRecord implements Writable, DBWritable {
long deviceid;
double kwh;
@Override
public void readFields(ResultSet arg0) throws SQLException {
this.deviceid = arg0.getLong(1);
this.kwh = arg0.getDouble(2);
}
@Override
public void write(PreparedStatement arg0) throws SQLException {
arg0.setLong(1, deviceid);
arg0.setDouble(2, kwh);
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.deviceid = arg0.readLong();
this.kwh = arg0.readDouble();
}
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeLong(deviceid);
arg0.writeDouble(kwh);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment