Skip to content

Instantly share code, notes, and snippets.

@jizhang
Last active December 16, 2015 16:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jizhang/5466149 to your computer and use it in GitHub Desktop.
Save jizhang/5466149 to your computer and use it in GitHub Desktop.
Simple filter-count hadooop map-reduce job written in Java vs. Clojure.
(ns uv.raw
(:require [clojure-hadoop.gen :as gen]
[clojure-hadoop.imports :as imp])
(:import [org.apache.hadoop.util Tool]
[com.hadoop.mapreduce LzoTextInputFormat]))
(imp/import-conf)
(imp/import-fs)
(imp/import-io)
(imp/import-mapreduce)
(imp/import-mapreduce-lib)
(gen/gen-job-classes)
(gen/gen-main-method)
(def cnt (atom 0))
(defn mapper-map
[this key ^Text value context]
(let [^String log (.toString value)]
(when (.contains log "\"site\":\"anjuke\"")
(swap! cnt inc))))
(defn mapper-cleanup
[this ^MapContext context]
(.write context (LongWritable. 0) (LongWritable. @cnt)))
(defn reducer-reduce
[this key values ^ReduceContext context]
(let [sum (reduce + (map (fn [^LongWritable v] (.get v)) values))]
(.write context key (LongWritable. sum))))
(defn tool-run
[^Tool this args]
(let [conf (.getConf this)
output-path (Path. "/tmp/uv-test")]
(.delete (FileSystem/get conf) output-path true)
(doto (Job. (.getConf this))
(.setJarByClass (.getClass this))
(.setJobName "uv-raw")
(.setOutputKeyClass LongWritable)
(.setOutputValueClass LongWritable)
(.setMapperClass (Class/forName "uv.raw_mapper"))
(.setReducerClass (Class/forName "uv.raw_reducer"))
(.setInputFormatClass TextInputFormat)
(FileInputFormat/setInputPaths "test_logs/soj")
(.setOutputFormatClass TextOutputFormat)
(FileOutputFormat/setOutputPath output-path)
(.waitForCompletion true)))
0)
(comment
Average time taken by Map tasks: 19sec
Average time taken by Shuffle: 38sec
Average time taken by Reduce tasks: 8sec
13/04/26 17:33:38 INFO input.FileInputFormat: Total input paths to process : 96
13/04/26 17:33:38 INFO mapred.JobClient: Running job: job_201304011713_3891
13/04/26 17:33:39 INFO mapred.JobClient: map 0% reduce 0%
13/04/26 17:34:11 INFO mapred.JobClient: map 2% reduce 0%
13/04/26 17:34:14 INFO mapred.JobClient: map 5% reduce 0%
13/04/26 17:34:15 INFO mapred.JobClient: map 10% reduce 0%
13/04/26 17:34:16 INFO mapred.JobClient: map 11% reduce 0%
13/04/26 17:34:17 INFO mapred.JobClient: map 17% reduce 0%
13/04/26 17:34:18 INFO mapred.JobClient: map 35% reduce 0%
13/04/26 17:34:19 INFO mapred.JobClient: map 45% reduce 0%
13/04/26 17:34:25 INFO mapred.JobClient: map 45% reduce 15%
13/04/26 17:34:26 INFO mapred.JobClient: map 46% reduce 15%
13/04/26 17:34:32 INFO mapred.JobClient: map 48% reduce 15%
13/04/26 17:34:33 INFO mapred.JobClient: map 49% reduce 15%
13/04/26 17:34:35 INFO mapred.JobClient: map 53% reduce 15%
13/04/26 17:34:36 INFO mapred.JobClient: map 56% reduce 15%
13/04/26 17:34:37 INFO mapred.JobClient: map 60% reduce 15%
13/04/26 17:34:38 INFO mapred.JobClient: map 64% reduce 15%
13/04/26 17:34:39 INFO mapred.JobClient: map 71% reduce 15%
13/04/26 17:34:40 INFO mapred.JobClient: map 76% reduce 16%
13/04/26 17:34:41 INFO mapred.JobClient: map 77% reduce 16%
13/04/26 17:34:42 INFO mapred.JobClient: map 87% reduce 16%
13/04/26 17:34:43 INFO mapred.JobClient: map 90% reduce 16%
13/04/26 17:34:44 INFO mapred.JobClient: map 93% reduce 16%
13/04/26 17:34:45 INFO mapred.JobClient: map 95% reduce 16%
13/04/26 17:34:48 INFO mapred.JobClient: map 96% reduce 16%
13/04/26 17:34:49 INFO mapred.JobClient: map 97% reduce 25%
13/04/26 17:34:51 INFO mapred.JobClient: map 100% reduce 25%
13/04/26 17:34:55 INFO mapred.JobClient: map 100% reduce 32%
13/04/26 17:34:58 INFO mapred.JobClient: map 100% reduce 66%
13/04/26 17:35:04 INFO mapred.JobClient: map 100% reduce 100%
13/04/26 17:35:09 INFO mapred.JobClient: Job complete: job_201304011713_3891
13/04/26 17:35:09 INFO mapred.JobClient: Counters: 30
13/04/26 17:35:09 INFO mapred.JobClient: Job Counters
13/04/26 17:35:09 INFO mapred.JobClient: Launched reduce tasks=1
13/04/26 17:35:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=1727856
13/04/26 17:35:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/26 17:35:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/04/26 17:35:09 INFO mapred.JobClient: Rack-local map tasks=1
13/04/26 17:35:09 INFO mapred.JobClient: Launched map tasks=89
13/04/26 17:35:09 INFO mapred.JobClient: Data-local map tasks=88
13/04/26 17:35:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=47089
13/04/26 17:35:09 INFO mapred.JobClient: File Output Format Counters
13/04/26 17:35:09 INFO mapred.JobClient: Bytes Written=10
13/04/26 17:35:09 INFO mapred.JobClient: FileSystemCounters
13/04/26 17:35:09 INFO mapred.JobClient: FILE_BYTES_READ=488
13/04/26 17:35:09 INFO mapred.JobClient: HDFS_BYTES_READ=4331616258
13/04/26 17:35:09 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1966598
13/04/26 17:35:09 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=10
13/04/26 17:35:09 INFO mapred.JobClient: File Input Format Counters
13/04/26 17:35:09 INFO mapred.JobClient: Bytes Read=4331603306
13/04/26 17:35:09 INFO mapred.JobClient: Map-Reduce Framework
13/04/26 17:35:09 INFO mapred.JobClient: Map output materialized bytes=3168
13/04/26 17:35:09 INFO mapred.JobClient: Map input records=11517118
13/04/26 17:35:09 INFO mapred.JobClient: Reduce shuffle bytes=3132
13/04/26 17:35:09 INFO mapred.JobClient: Spilled Records=176
13/04/26 17:35:09 INFO mapred.JobClient: Map output bytes=1408
13/04/26 17:35:09 INFO mapred.JobClient: CPU time spent (ms)=1145440
13/04/26 17:35:09 INFO mapred.JobClient: Total committed heap usage (bytes)=90363527168
13/04/26 17:35:09 INFO mapred.JobClient: Combine input records=0
13/04/26 17:35:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=12760
13/04/26 17:35:09 INFO mapred.JobClient: Reduce input records=88
13/04/26 17:35:09 INFO mapred.JobClient: Reduce input groups=1
13/04/26 17:35:09 INFO mapred.JobClient: Combine output records=0
13/04/26 17:35:09 INFO mapred.JobClient: Physical memory (bytes) snapshot=50453352448
13/04/26 17:35:09 INFO mapred.JobClient: Reduce output records=1
13/04/26 17:35:09 INFO mapred.JobClient: Virtual memory (bytes) snapshot=129966493696
13/04/26 17:35:09 INFO mapred.JobClient: Map output records=88
)
package ops.test.uv;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hadoop.mapreduce.LzoTextInputFormat;
public class App extends Configured implements Tool {
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
private long count = 0;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String raw = value.toString();
if (raw.contains("\"site\":\"anjuke\"")) {
++count;
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
context.write(new LongWritable(0), new LongWritable(count));
}
}
public static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(new LongWritable(0), new LongWritable(sum));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(App.class);
job.setJobName("uv-java");
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, "test_logs/soj");
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/tmp/uv-test"));
FileOutputFormat.setCompressOutput(job, false);
FileSystem.get(job.getConfiguration()).delete(FileOutputFormat.getOutputPath(job), true);
job.submit();
return job.waitForCompletion(true) ? 0 : 2;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Configuration(), new App(), args));
}
}
/*
Average time taken by Map tasks: 7sec
Average time taken by Shuffle: 18sec
Average time taken by Reduce tasks: 4sec
13/04/26 17:23:01 INFO input.FileInputFormat: Total input paths to process : 96
13/04/26 17:23:02 INFO mapred.JobClient: Running job: job_201304011713_3880
13/04/26 17:23:03 INFO mapred.JobClient: map 0% reduce 0%
13/04/26 17:23:25 INFO mapred.JobClient: map 6% reduce 0%
13/04/26 17:23:26 INFO mapred.JobClient: map 22% reduce 0%
13/04/26 17:23:27 INFO mapred.JobClient: map 34% reduce 0%
13/04/26 17:23:28 INFO mapred.JobClient: map 38% reduce 0%
13/04/26 17:23:29 INFO mapred.JobClient: map 45% reduce 0%
13/04/26 17:23:31 INFO mapred.JobClient: map 51% reduce 0%
13/04/26 17:23:32 INFO mapred.JobClient: map 52% reduce 0%
13/04/26 17:23:33 INFO mapred.JobClient: map 55% reduce 0%
13/04/26 17:23:34 INFO mapred.JobClient: map 61% reduce 0%
13/04/26 17:23:35 INFO mapred.JobClient: map 79% reduce 17%
13/04/26 17:23:36 INFO mapred.JobClient: map 85% reduce 17%
13/04/26 17:23:37 INFO mapred.JobClient: map 90% reduce 17%
13/04/26 17:23:38 INFO mapred.JobClient: map 95% reduce 17%
13/04/26 17:23:39 INFO mapred.JobClient: map 100% reduce 17%
13/04/26 17:23:44 INFO mapred.JobClient: map 100% reduce 23%
13/04/26 17:23:50 INFO mapred.JobClient: map 100% reduce 100%
13/04/26 17:23:55 INFO mapred.JobClient: Job complete: job_201304011713_3880
13/04/26 17:23:55 INFO mapred.JobClient: Counters: 29
13/04/26 17:23:55 INFO mapred.JobClient: Job Counters
13/04/26 17:23:55 INFO mapred.JobClient: Launched reduce tasks=1
13/04/26 17:23:55 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=684600
13/04/26 17:23:55 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/26 17:23:55 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/04/26 17:23:55 INFO mapred.JobClient: Launched map tasks=88
13/04/26 17:23:55 INFO mapred.JobClient: Data-local map tasks=88
13/04/26 17:23:55 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=23243
13/04/26 17:23:55 INFO mapred.JobClient: File Output Format Counters
13/04/26 17:23:55 INFO mapred.JobClient: Bytes Written=10
13/04/26 17:23:55 INFO mapred.JobClient: FileSystemCounters
13/04/26 17:23:55 INFO mapred.JobClient: FILE_BYTES_READ=489
13/04/26 17:23:55 INFO mapred.JobClient: HDFS_BYTES_READ=4331616258
13/04/26 17:23:55 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1968646
13/04/26 17:23:55 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=10
13/04/26 17:23:55 INFO mapred.JobClient: File Input Format Counters
13/04/26 17:23:55 INFO mapred.JobClient: Bytes Read=4331603306
13/04/26 17:23:55 INFO mapred.JobClient: Map-Reduce Framework
13/04/26 17:23:55 INFO mapred.JobClient: Map output materialized bytes=3168
13/04/26 17:23:55 INFO mapred.JobClient: Map input records=11517118
13/04/26 17:23:55 INFO mapred.JobClient: Reduce shuffle bytes=3132
13/04/26 17:23:55 INFO mapred.JobClient: Spilled Records=176
13/04/26 17:23:55 INFO mapred.JobClient: Map output bytes=1408
13/04/26 17:23:55 INFO mapred.JobClient: CPU time spent (ms)=244000
13/04/26 17:23:55 INFO mapred.JobClient: Total committed heap usage (bytes)=90469695488
13/04/26 17:23:55 INFO mapred.JobClient: Combine input records=0
13/04/26 17:23:55 INFO mapred.JobClient: SPLIT_RAW_BYTES=12760
13/04/26 17:23:55 INFO mapred.JobClient: Reduce input records=88
13/04/26 17:23:55 INFO mapred.JobClient: Reduce input groups=1
13/04/26 17:23:55 INFO mapred.JobClient: Combine output records=0
13/04/26 17:23:55 INFO mapred.JobClient: Physical memory (bytes) snapshot=47529811968
13/04/26 17:23:55 INFO mapred.JobClient: Reduce output records=1
13/04/26 17:23:55 INFO mapred.JobClient: Virtual memory (bytes) snapshot=128776970240
13/04/26 17:23:55 INFO mapred.JobClient: Map output records=88
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment