Instantly share code, notes, and snippets.

Embed
What would you like to do?
Hazelcast MapReduce Avg Example
package com.hazelcast.mapred.test;
import com.hazelcast.core.CompletableFuture;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.mapred.*;
import java.util.List;
import java.util.Map;
public class AvgExample {
public static void main(String[] args) {
HazelcastInstance hz = DSLUtils.getHazelcastInstance();
IMap<String, User> map = hz.getMap("default");
JobTracker jobTracker = DSLUtils.getJobTracker(hz);
Job<String, User> job = jobTracker.newJob(KeyValueSource.fromMap(map));
CompletableFuture<Map<String, List<Integer>>> future1 = job
.mapper(new AvgMapper()).submit();
CompletableFuture<Map<String, List<KeyValuePair<Long, Long>>>> future2 = job
.mapper(new AvgMapper())
.combiner(new AvgCombiner()).submit();
CompletableFuture<Map<String, Long>> future3 = job
.mapper(new AvgMapper())
.combiner(new AvgCombiner())
.reducer(new AvgReducer()).submit();
CompletableFuture<Long> future4 = job
.mapper(new AvgMapper())
.combiner(new AvgCombiner())
.reducer(new AvgReducer())
.submit((value) -> value.values().iterator().next());
}
private static class AvgMapper implements Mapper<String, User, String, Integer> {
@Override
public void map(String key, User value, Context<String, Integer> context) {
if (value.age >= 13 && value.age <= 30) {
context.emit("age", value.age);
}
}
}
private static class AvgCombiner extends Combiner<String, Integer, KeyValuePair<Long, Long>> {
private transient long total = 0;
private transient long count = 0;
@Override
public void combine(String key, Integer value) {
count++;
total += value;
}
@Override
public KeyValuePair<Long, Long> finalizeCombineChunk() {
KeyValuePair<Long, Long> pair = new KeyValuePair<Long, Long>(count, total);
count = 0;
total = 0;
return pair;
}
}
private static class AvgReducer implements Reducer<String, KeyValuePair<Long, Long>, Long> {
private transient long total = 0;
private transient long count = 0;
@Override
public Long reduce(String key, KeyValuePair<Long, Long> value, boolean finalValue) {
count += value.getKey();
total += value.getValue();
return finalValue ? (total / count) : null;
}
}
private static class User {
private String username;
private int age;
}
}
@jerrinot

This comment has been minimized.

jerrinot commented Dec 4, 2013

Why Combiner has a finalization method (finalizeCombineChunk), but Reducer is just receiving an argument finalValue?

@noctarius

This comment has been minimized.

Owner

noctarius commented Dec 4, 2013

Because the Combiner can create chunks of data. Combiner runs next to Mapper and must be able to produce multiple chunks of intermediate data for later supported streaming functionality (will not be in the first version).
In addition to that Reducer will have an abstract implementation that splits it up to two methods just as for Combiner so you can choose which one to use (that's my idea for now). One method version could also be interesting for Java 8 and Lambda support.

@jerrinot

This comment has been minimized.

jerrinot commented Dec 4, 2013

ah, it makes sense now! It still doesn't feel right - that boolean flag finalValue + returning sometimes null, sometimes actual value. I can understand the reasoning behind it, but it stil..

I feel both Combiner and Reducer have very similar "mode of operation" - you feed them with input data, they are "accumulating" (reducing) the input and from time to time you want them to flush the result. The difference is that Reduce is flushing the result once per task, but I still believe the API should be similar.

One method interface is handy in Java 8, but it should be solvable with factory method. Something like: Reducers.lambdaReducer()

@noctarius

This comment has been minimized.

Owner

noctarius commented Dec 5, 2013

Yeah I see your point and exactly this is what I don't like so much for now. It also feels wrong to me to make Combiner and Reducer one interface even if they are similar in principle.

@jerrinot

This comment has been minimized.

jerrinot commented Dec 5, 2013

I see. What is the responsibility of JobTracker class and what is DSLUtils?
(please don't take it as criticism of the API, I like it, I'm just trying to find out what might go wrong)-

@noctarius

This comment has been minimized.

Owner

noctarius commented Dec 5, 2013

DSLUtils is just a class that does nothing. It is a helper class for my test tools :)

JobTracker for now is just responsible for creating the Job depending on the given data structure but there's a good chance that more stuff will come on this like progress watcher and stuff.

@noctarius

This comment has been minimized.

Owner

noctarius commented Dec 7, 2013

No other comments, opinions? That's sad.

@mikojava

This comment has been minimized.

mikojava commented Feb 4, 2014

here's a new comment for you to feel less sad.

@psramkumar

This comment has been minimized.

psramkumar commented Jan 26, 2016

hmm.. i like it,
here is another Q
why there is no simple implementation for all these Reducer, Mapper, and Combiner interface.
is it mandatory to copy the Client implementation in Server lib ? and why ? any other option ?

i was getting the below Error if i'm not porting the Jar file
Caused by: java.lang.ClassNotFoundException: com.hazelcast.example.mapreduce.TokenizerMapper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment