Last active
April 2, 2019 10:06
-
-
Save sankars/5371069 to your computer and use it in GitHub Desktop.
Mapreduce job to process data stored in MySQL.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* hadoop jar mysqlmapr.jar org.jm.samples.ComputeEnergyForCustomer -libjars /path/mysql-connector-java-5.1.13-bin.jar */ | |
package org.jm.samples; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; | |
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; | |
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class ComputeEnergyForCustomer extends Configured implements Tool { | |
/** | |
* @param args | |
*/ | |
public static void main(String[] args) { | |
// TODO Auto-generated method stub | |
try { | |
int exitCode = ToolRunner.run(new ComputeEnergyForCustomer(), args); | |
System.exit(exitCode); | |
} catch (Exception ex) { | |
System.out.println(ex.getMessage()); | |
} | |
} | |
@Override | |
public int run(String[] arg0) throws Exception { | |
runJob(); | |
return 0; | |
} | |
public static void runJob() throws Exception { | |
try { | |
Configuration conf = new Configuration(); | |
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", | |
"jdbc:mysql://172.27.138.195/sms_dev_db" | |
+ "?user=dev_user&password=pass@123"); | |
Job job = new Job(conf, "ComputeEnergyForCustomer"); | |
job.setJarByClass(ComputeEnergyForCustomer.class); | |
job.setMapperClass(ComputeEnergyForCustomerMapper.class); | |
job.setReducerClass(ComputeEnergyForCustomerReducer.class); | |
job.setInputFormatClass(DBInputFormat.class); | |
job.setOutputFormatClass(DBOutputFormat.class); | |
job.setMapOutputKeyClass(LongWritable.class); | |
job.setMapOutputValueClass(DoubleWritable.class); | |
job.setOutputKeyClass(MyRecord.class); | |
job.setOutputValueClass(NullWritable.class); | |
DBInputFormat.setInput(job, MyRecord.class, | |
"select device_id,kwh from SMS_REPORT_DATA Where CUSTOMER_ID =1 AND SC_TIME=1353951807000","select 1"); | |
DBOutputFormat.setOutput(job, "results", "device_id,kwh1"); | |
job.waitForCompletion(true); | |
} catch (Exception e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
static class ComputeEnergyForCustomerMapper extends | |
Mapper<LongWritable, MyRecord, LongWritable, DoubleWritable> { | |
@Override | |
protected void map(LongWritable key, MyRecord value, Context context) | |
throws IOException, InterruptedException { | |
LongWritable deviceid = new LongWritable(value.deviceid); | |
DoubleWritable kwh = new DoubleWritable(value.kwh); | |
context.write(deviceid, kwh); | |
} | |
} | |
static class ComputeEnergyForCustomerReducer extends | |
Reducer<LongWritable, DoubleWritable, MyRecord, NullWritable> { | |
@Override | |
protected void reduce(LongWritable key, Iterable<DoubleWritable> arg1, | |
Context context) throws IOException, InterruptedException { | |
double sumActualKWHForDay = 0; | |
for (DoubleWritable actualKWH : arg1) { | |
sumActualKWHForDay = sumActualKWHForDay + actualKWH.get(); | |
} | |
MyRecord rec = new MyRecord(); | |
rec.deviceid = key.get(); | |
rec.kwh = sumActualKWHForDay; | |
context.write(rec, NullWritable.get()); | |
} | |
} | |
static class MyRecord implements Writable, DBWritable { | |
long deviceid; | |
double kwh; | |
@Override | |
public void readFields(ResultSet arg0) throws SQLException { | |
this.deviceid = arg0.getLong(1); | |
this.kwh = arg0.getDouble(2); | |
} | |
@Override | |
public void write(PreparedStatement arg0) throws SQLException { | |
arg0.setLong(1, deviceid); | |
arg0.setDouble(2, kwh); | |
} | |
@Override | |
public void readFields(DataInput arg0) throws IOException { | |
this.deviceid = arg0.readLong(); | |
this.kwh = arg0.readDouble(); | |
} | |
@Override | |
public void write(DataOutput arg0) throws IOException { | |
arg0.writeLong(deviceid); | |
arg0.writeDouble(kwh); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment