Skip to content

Instantly share code, notes, and snippets.

@parzonka
Forked from MikhailGolubtsov/DistributionTest
Last active July 27, 2016 18:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parzonka/9ab676de54d4030af61daf25f19f2aba to your computer and use it in GitHub Desktop.
Save parzonka/9ab676de54d4030af61daf25f19f2aba to your computer and use it in GitHub Desktop.
Hazelcast map-reduce example - counting distribution
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.Combiner;
import com.hazelcast.mapreduce.CombinerFactory;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Job;
import com.hazelcast.mapreduce.JobCompletableFuture;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("serial")
public class DistributionTest {
private static final int HAZELCAST_INSTANCES = 2;
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
HazelcastInstance inst = Hazelcast.newHazelcastInstance();
for (int i = 1; i < HAZELCAST_INSTANCES; i++) {
Hazelcast.newHazelcastInstance();
}
while (!inst.getPartitionService().isClusterSafe()) {
System.out.println("Waiting until cluster with " + HAZELCAST_INSTANCES + " instances has stabilized...");
Thread.sleep(1000);
}
IMap<String, ParametersValues> map = inst.getMap("parameters");
Map<String, ParametersValues> data = new HashMap<>();
Random random = new Random();
if (map.isEmpty()) {
System.out.println("Preparing data...");
for (int i = 0; i < 5000; i++) {
ParametersValues values = new ParametersValues();
for (int j = 0; j < 100; j++) {
String paramName = "param" + j;
values.setParamValue(paramName, new ParameterValue(random.nextInt(10)));
}
map.put("number" + i, values);
}
map.putAll(data);
}
while (!inst.getPartitionService().isClusterSafe()) {
Thread.sleep(1000);
}
System.out.println("Data is ready");
JobTracker jobTracker = inst.getJobTracker("default");
KeyValueSource<String, ParametersValues> source = KeyValueSource.fromMap(map);
long begin = System.currentTimeMillis();
System.out.println("Starting map reduce");
Job<String, ParametersValues> job = jobTracker.newJob(source);
JobCompletableFuture<Map<String, Distribution>> distrF = job.mapper(filterMapper())
.combiner(distrCombinerFactory()).reducer(distrReducerFactory()).submit();
System.out.println("job is submitted");
Map<String, Distribution> distr = distrF.get(180, TimeUnit.SECONDS);
System.out.println("Distributions are " + distr);
System.out.println("it took " + (System.currentTimeMillis() - begin));
inst.shutdown();
}
public static CombinerFactory<String, ParameterValue, Distribution> distrCombinerFactory() {
return new CombinerFactory<String, ParameterValue, Distribution>() {
public Combiner<ParameterValue, Distribution> newCombiner(String key) {
return new Combiner<ParameterValue, Distribution>() {
private Distribution distr;
public void beginCombine() {
distr = new Distribution();
}
public void combine(ParameterValue value) {
for (Integer v : value.values) {
distr.count(v);
}
}
public Distribution finalizeChunk() {
Distribution result = distr;
beginCombine();
return result;
}
};
}
};
}
public static ReducerFactory<String, Distribution, Distribution> distrReducerFactory() {
return new ReducerFactory<String, Distribution, Distribution>() {
public Reducer<Distribution, Distribution> newReducer(String key) {
return new Reducer<Distribution, Distribution>() {
private Distribution distr = new Distribution();
public void reduce(Distribution other) {
distr.merge(other);
}
public Distribution finalizeReduce() {
return distr;
}
};
}
};
}
public static class ParametersValues implements Serializable {
private Map<String, ParameterValue> values = new HashMap<>();
public void setParamValue(String paramName, ParameterValue value) {
values.put(paramName, value);
}
}
public static class ParameterValue implements Serializable {
private final List<Integer> values;
public ParameterValue(List<Integer> values) {
this.values = values;
}
public ParameterValue(Integer singleVal) {
this.values = Arrays.asList(singleVal);
}
}
public static class Distribution implements Serializable {
private final Map<Integer, Integer> distr = new HashMap<>();
public void count(Integer value) {
if (!distr.containsKey(value)) {
distr.put(value, 0);
}
distr.put(value, distr.get(value) + 1);
}
public String toString() {
return "Distribution [distr=" + distr + "]";
}
public void merge(Distribution other) {
for (Integer otherKey : other.distr.keySet()) {
int thisCount = distr.containsKey(otherKey) ? distr.get(otherKey) : 0;
int otherCount = other.distr.get(otherKey);
distr.put(otherKey, thisCount + otherCount);
}
}
}
public static Mapper<String, ParametersValues, String, ParameterValue> filterMapper() {
return new CustomMapper();
}
public static class CustomMapper implements Mapper<String, ParametersValues, String, ParameterValue>, Serializable {
public void map(String key, ParametersValues values, Context<String, ParameterValue> context) {
for (String paramName : values.values.keySet()) {
ParameterValue parameterValue = values.values.get(paramName);
context.emit(paramName, parameterValue);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment