Skip to content

Instantly share code, notes, and snippets.

@louisje
Created October 12, 2015 00:42
Show Gist options
  • Save louisje/08e3f39a1e7b01b8576a to your computer and use it in GitHub Desktop.
Save louisje/08e3f39a1e7b01b8576a to your computer and use it in GitHub Desktop.
package acloudsvc.hadoop.examples;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;
public class ImageHasher {
static String userName = "hadoop";
public static class ThumbnailMapper extends Mapper<LongWritable, Text, Text, Text> {
static final Logger logger = Logger.getLogger(ThumbnailMapper.class);
@Override
protected void map(LongWritable key0, Text value0,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String row = value0.toString();
String[] split = row.split(" ", 2);
String storyId = split[0];
String fileName = split[1];
logger.info("(" + storyId + ", " + fileName + ")");
String hdfsPath = "/user/" + userName + "/thumbnails/" + fileName;
String localPath = "/home/" + userName + "/hadoop/tmp/" + fileName;
Path src = new Path(hdfsPath);
Path dest = new Path(localPath);
FileSystem fs = FileSystem.get(new Configuration());
fs.copyToLocalFile(src, dest);
Runtime runtime = Runtime.getRuntime();
String[] args = { "/home/hadoop/blockhash", localPath };
Process process = runtime.exec(args);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
process.waitFor();
while (true) {
String line = err.readLine();
if (line == null) break;
System.out.println("ERR: " + line);
}
IOUtils.copy(process.getInputStream(), baos);
if (baos.size() > 0) {
String blockhashOutput = baos.toString().trim();
String[] splited = blockhashOutput.split(" ", 2);
if (splited.length < 2) return;
String hashCode = splited[1];
if (hashCode.isEmpty()) return;
Text key = new Text();
key.set(hashCode);
Text value = new Text();
value.set(storyId);
System.out.println("map => (" + hashCode + ", " + storyId + ")");
context.write(key, value);
}
}
}
public static class HashReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text hashCode, Iterable<Text> storyIds,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String merged = "";
for (Text storyId : storyIds) {
merged += storyId.toString() + " ";
}
System.out.println("reduce => (" + hashCode.toString() + ", " + merged + ")");
Text result = new Text();
result.set(merged);
context.write(hashCode, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] remains = new GenericOptionsParser(conf, args).getRemainingArgs();
if (remains.length > 0)
userName = remains[0];
Job job = Job.getInstance();
job.setJarByClass(ImageHasher.class);
job.setMapperClass(ThumbnailMapper.class);
job.setCombinerClass(HashReducer.class);
job.setReducerClass(HashReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment