-
-
Save noctarius/7784770 to your computer and use it in GitHub Desktop.
package com.hazelcast.mapred.test; | |
import com.hazelcast.core.CompletableFuture; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.mapred.*; | |
import java.util.List; | |
import java.util.Map; | |
public class AvgExample { | |
public static void main(String[] args) { | |
HazelcastInstance hz = DSLUtils.getHazelcastInstance(); | |
IMap<String, User> map = hz.getMap("default"); | |
JobTracker jobTracker = DSLUtils.getJobTracker(hz); | |
Job<String, User> job = jobTracker.newJob(KeyValueSource.fromMap(map)); | |
CompletableFuture<Map<String, List<Integer>>> future1 = job | |
.mapper(new AvgMapper()).submit(); | |
CompletableFuture<Map<String, List<KeyValuePair<Long, Long>>>> future2 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()).submit(); | |
CompletableFuture<Map<String, Long>> future3 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()).submit(); | |
CompletableFuture<Long> future4 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()) | |
.submit((value) -> value.values().iterator().next()); | |
} | |
private static class AvgMapper implements Mapper<String, User, String, Integer> { | |
@Override | |
public void map(String key, User value, Context<String, Integer> context) { | |
if (value.age >= 13 && value.age <= 30) { | |
context.emit("age", value.age); | |
} | |
} | |
} | |
private static class AvgCombiner extends Combiner<String, Integer, KeyValuePair<Long, Long>> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public void combine(String key, Integer value) { | |
count++; | |
total += value; | |
} | |
@Override | |
public KeyValuePair<Long, Long> finalizeCombineChunk() { | |
KeyValuePair<Long, Long> pair = new KeyValuePair<Long, Long>(count, total); | |
count = 0; | |
total = 0; | |
return pair; | |
} | |
} | |
private static class AvgReducer implements Reducer<String, KeyValuePair<Long, Long>, Long> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public Long reduce(String key, KeyValuePair<Long, Long> value, boolean finalValue) { | |
count += value.getKey(); | |
total += value.getValue(); | |
return finalValue ? (total / count) : null; | |
} | |
} | |
private static class User { | |
private String username; | |
private int age; | |
} | |
} |
I see. What is the responsibility of JobTracker class and what is DSLUtils?
(please don't take it as criticism of the API, I like it, I'm just trying to find out what might go wrong)-
DSLUtils is just a class that does nothing. It is a helper class for my test tools :)
JobTracker for now is just responsible for creating the Job depending on the given data structure but there's a good chance that more stuff will come on this like progress watcher and stuff.
No other comments, opinions? That's sad.
here's a new comment for you to feel less sad.
hmm.. i like it,
here is another Q
why there is no simple implementation for all these Reducer, Mapper, and Combiner interface.
is it mandatory to copy the Client implementation in Server lib ? and why ? any other option ?
i was getting the below Error if i'm not porting the Jar file
Caused by: java.lang.ClassNotFoundException: com.hazelcast.example.mapreduce.TokenizerMapper
Yeah I see your point and exactly this is what I don't like so much for now. It also feels wrong to me to make Combiner and Reducer one interface even if they are similar in principle.