-
-
Save noctarius/7784770 to your computer and use it in GitHub Desktop.
package com.hazelcast.mapred.test; | |
import com.hazelcast.core.CompletableFuture; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.mapred.*; | |
import java.util.List; | |
import java.util.Map; | |
public class AvgExample { | |
public static void main(String[] args) { | |
HazelcastInstance hz = DSLUtils.getHazelcastInstance(); | |
IMap<String, User> map = hz.getMap("default"); | |
JobTracker jobTracker = DSLUtils.getJobTracker(hz); | |
Job<String, User> job = jobTracker.newJob(KeyValueSource.fromMap(map)); | |
CompletableFuture<Map<String, List<Integer>>> future1 = job | |
.mapper(new AvgMapper()).submit(); | |
CompletableFuture<Map<String, List<KeyValuePair<Long, Long>>>> future2 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()).submit(); | |
CompletableFuture<Map<String, Long>> future3 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()).submit(); | |
CompletableFuture<Long> future4 = job | |
.mapper(new AvgMapper()) | |
.combiner(new AvgCombiner()) | |
.reducer(new AvgReducer()) | |
.submit((value) -> value.values().iterator().next()); | |
} | |
private static class AvgMapper implements Mapper<String, User, String, Integer> { | |
@Override | |
public void map(String key, User value, Context<String, Integer> context) { | |
if (value.age >= 13 && value.age <= 30) { | |
context.emit("age", value.age); | |
} | |
} | |
} | |
private static class AvgCombiner extends Combiner<String, Integer, KeyValuePair<Long, Long>> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public void combine(String key, Integer value) { | |
count++; | |
total += value; | |
} | |
@Override | |
public KeyValuePair<Long, Long> finalizeCombineChunk() { | |
KeyValuePair<Long, Long> pair = new KeyValuePair<Long, Long>(count, total); | |
count = 0; | |
total = 0; | |
return pair; | |
} | |
} | |
private static class AvgReducer implements Reducer<String, KeyValuePair<Long, Long>, Long> { | |
private transient long total = 0; | |
private transient long count = 0; | |
@Override | |
public Long reduce(String key, KeyValuePair<Long, Long> value, boolean finalValue) { | |
count += value.getKey(); | |
total += value.getValue(); | |
return finalValue ? (total / count) : null; | |
} | |
} | |
private static class User { | |
private String username; | |
private int age; | |
} | |
} |
Because the Combiner can create chunks of data. Combiner runs next to Mapper and must be able to produce multiple chunks of intermediate data for later supported streaming functionality (will not be in the first version).
In addition to that Reducer will have an abstract implementation that splits it up to two methods just as for Combiner so you can choose which one to use (that's my idea for now). One method version could also be interesting for Java 8 and Lambda support.
ah, it makes sense now! It still doesn't feel right - that boolean flag finalValue + returning sometimes null, sometimes actual value. I can understand the reasoning behind it, but it stil..
I feel both Combiner and Reducer have very similar "mode of operation" - you feed them with input data, they are "accumulating" (reducing) the input and from time to time you want them to flush the result. The difference is that Reduce is flushing the result once per task, but I still believe the API should be similar.
One method interface is handy in Java 8, but it should be solvable with factory method. Something like: Reducers.lambdaReducer()
Yeah I see your point and exactly this is what I don't like so much for now. It also feels wrong to me to make Combiner and Reducer one interface even if they are similar in principle.
I see. What is the responsibility of JobTracker class and what is DSLUtils?
(please don't take it as criticism of the API, I like it, I'm just trying to find out what might go wrong)-
DSLUtils is just a class that does nothing. It is a helper class for my test tools :)
JobTracker for now is just responsible for creating the Job depending on the given data structure but there's a good chance that more stuff will come on this like progress watcher and stuff.
No other comments, opinions? That's sad.
here's a new comment for you to feel less sad.
hmm.. i like it,
here is another Q
why there is no simple implementation for all these Reducer, Mapper, and Combiner interface.
is it mandatory to copy the Client implementation in Server lib ? and why ? any other option ?
i was getting the below Error if i'm not porting the Jar file
Caused by: java.lang.ClassNotFoundException: com.hazelcast.example.mapreduce.TokenizerMapper
Why Combiner has a finalization method (finalizeCombineChunk), but Reducer is just receiving an argument finalValue?