Created
December 4, 2013 09:29
-
-
Save noctarius/7784770 to your computer and use it in GitHub Desktop.
Hazelcast MapReduce Avg Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hazelcast.mapred.test; | |
import com.hazelcast.core.CompletableFuture; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.mapred.*; | |
import java.util.List; | |
import java.util.Map; | |
public class AvgExample { | |
public static void main(String[] args) { | |
HazelcastInstance hz = DSLUtils.getHazelcastInstance(); | |
IMap<String, User> map = hz.getMap("default"); | |
JobTracker jobTracker = DSLUtils.getJobTracker(hz); | |
Job<String, User> job = jobTracker.newJob(KeyValueSource.fromMap(map)); | |
CompletableFuture<Map<String, List<Integer>>> future1 = job | |
.mapper(new AvgMapper()).submit(); | |
CompletableFuture<Map<String, List<KeyValuePair<Long, Long>>>> future2 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()).submit(); | |
CompletableFuture<Map<String, Long>> future3 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()).submit(); | |
CompletableFuture<Long> future4 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()) | |
.submit((value) -> value.values().iterator().next()); | |
} | |
private static class AvgMapper implements Mapper<String, User, String, Integer> { | |
@Override | |
public void map(String key, User value, Context<String, Integer> context) { | |
if (value.age >= 13 && value.age <= 30) { | |
context.emit("age", value.age); | |
} | |
} | |
} | |
private static class AvgCombiner extends Combiner<String, Integer, KeyValuePair<Long, Long>> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public void combine(String key, Integer value) { | |
count++; | |
total += value; | |
} | |
@Override | |
public KeyValuePair<Long, Long> finalizeCombineChunk() { | |
KeyValuePair<Long, Long> pair = new KeyValuePair<Long, Long>(count, total); | |
count = 0; | |
total = 0; | |
return pair; | |
} | |
} | |
private static class AvgReducer implements Reducer<String, KeyValuePair<Long, Long>, Long> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public Long reduce(String key, KeyValuePair<Long, Long> value, boolean finalValue) { | |
count += value.getKey(); | |
total += value.getValue(); | |
return finalValue ? (total / count) : null; | |
} | |
} | |
private static class User { | |
private String username; | |
private int age; | |
} | |
} |
hmm.. i like it,
here is another Q
why there is no simple implementation for all these Reducer, Mapper, and Combiner interface.
is it mandatory to copy the Client implementation in Server lib ? and why ? any other option ?
i was getting the below Error if i'm not porting the Jar file
Caused by: java.lang.ClassNotFoundException: com.hazelcast.example.mapreduce.TokenizerMapper
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
here's a new comment for you to feel less sad.