Skip to content

Instantly share code, notes, and snippets.

@sreejithpillai
Created November 24, 2014 15:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sreejithpillai/6044673b4daeee2d1263 to your computer and use it in GitHub Desktop.
Save sreejithpillai/6044673b4daeee2d1263 to your computer and use it in GitHub Desktop.
/*
* Copyright 2014 Sreejith Pillai
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sreejith.loganalyzer.mapreduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(LogDriver.class);
job.setJobName("Log Analyzer");
job.setMapperClass(LogMapper.class);
job.setPartitionerClass(LogPartitioner.class);
job.setCombinerClass(LogReducer.class);
job.setReducerClass(LogReducer.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
/*
* Copyright 2014 Sreejith Pillai
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sreejith.loganalyzer.mapreduce;
import java.io.IOException;
import java.text.ParseException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sreejith.loganalyzer.parser.ParseLog;
public class LogMapper extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
private static Logger logger = LoggerFactory.getLogger(LogMapper.class);
private IntWritable hour = new IntWritable();
private final static IntWritable one = new IntWritable(1);
private static Pattern logPattern = Pattern
.compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\]"
+ " \"([^\"]*)\""
+ " ([^ ]*) ([^ ]*).*");
public void map(LongWritable key, Text value, Context context)
throws InterruptedException, IOException {
logger.info("Mapper started");
String line = ((Text) value).toString();
Matcher matcher = logPattern.matcher(line);
if (matcher.matches()) {
String timestamp = matcher.group(4);
try {
hour.set(ParseLog.getHour(timestamp));
} catch (ParseException e) {
logger.warn("Exception", e);
}
context.write(hour, one);
}
logger.info("Mapper Completed");
}
}
/*
* Copyright 2014 Sreejith Pillai
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sreejith.loganalyzer.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogPartitioner extends Partitioner<IntWritable, IntWritable> {
private static Logger logger = LoggerFactory.getLogger(LogPartitioner.class);
@Override
public int getPartition(IntWritable key, IntWritable value, int numReduceTasks) {
logger.info("Partitioner started");
int intKey=key.get();
if(intKey>=8 && intKey<=18){
return 1;
}else{
return 0;
}
}
}
/*
* Copyright 2014 Sreejith Pillai
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sreejith.loganalyzer.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static Logger logger = LoggerFactory.getLogger(LogReducer.class);
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
logger.info("Reducer started");
int sum = 0;
for (IntWritable value : values) {
sum = sum + value.get();
}
context.write(key, new IntWritable(sum));
logger.info("Reducer completed");
}
}
@sudarshanGit
Copy link

Hi i am running an application which reads records from HBase and writes into text files .

I have used combiner in my application and custom partitioner also. I have used 41 reducer in my application because i need to create 40 reducer output file that satisfies my condition in custom partitioner.

All working fine but when i use combiner in my application it creates map output file per regions or per mapper .

Foe example i have 40 regions in my application so 40 mapper getting initiated then it create 40 map-output files . But reducer is not able to combine all map-output and generate final reducer output file that will be 40 reducer output files.

Data in the files are correct but no of files has increased .

Any idea how can i get only reducer output files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment