Last active
December 28, 2015 10:49
-
-
Save dbtsai/7488981 to your computer and use it in GitHub Desktop.
InMapperCombiner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class InMapperCombiner < KEY extends WritableComparable, VALUE extends Writable > { | |
private static final int DEFAULT_CAPACITY = 8388608; | |
private static final int DEFAULT_INITIAL_CAPACITY = 65536; | |
private static final float DEFAULT_LOAD_FACTOR = .75F; | |
private final int maxCacheCapacity; | |
private final Map < KEY, VALUE > lruCache; | |
private CombiningFunction < VALUE > combiningFunction; | |
private Mapper.Context context; | |
public InMapperCombiner(int cacheCapacity, CombiningFunction < VALUE > combiningFunction, int initialCapacity, float loadFactor) { | |
this.combiningFunction = combiningFunction; | |
this.maxCacheCapacity = cacheCapacity; | |
lruCache = new LinkedHashMap < KEY, VALUE > (initialCapacity, loadFactor, true) { | |
protected boolean removeEldestEntry(Map.Entry < KEY, VALUE > eldest) { | |
boolean isFull = size() > maxCacheCapacity; | |
if (isFull) { | |
try { | |
// If the cache is full, emit the eldest key value pair to the reducer, and delete them from cache | |
context.write(eldest.getKey(), eldest.getValue()); | |
} catch (IOException ex) { | |
throw new UncheckedIOException(ex); | |
} catch (InterruptedException ex) { | |
throw new UncheckedInterruptedException(ex); | |
} | |
} | |
return isFull; | |
} | |
}; | |
} | |
public InMapperCombiner() { | |
this(DEFAULT_CAPACITY, null, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR); | |
} | |
public InMapperCombiner(int cacheCapacity) { | |
this(cacheCapacity, null, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR); | |
} | |
public InMapperCombiner(CombiningFunction < VALUE > combiningFunction) { | |
this(DEFAULT_CAPACITY, combiningFunction, 512, .75F); | |
} | |
public InMapperCombiner(int cacheCapacity, CombiningFunction < VALUE > combiningFunction) { | |
this(cacheCapacity, combiningFunction, 512, .75F); | |
} | |
public void setCombiningFunction(CombiningFunction < VALUE > combiningFunction) { | |
this.combiningFunction = combiningFunction; | |
} | |
public void write(KEY key, VALUE value, Mapper.Context context) throws InterruptedException, IOException { | |
/** | |
* In Hadoop, context.write(key, value) should have a new copy of key and value in context.write(). | |
* This is because lots of time, people use | |
* private final static LongWritable one = new LongWritable(1); | |
* in the mapper class as global variable, and we don't want to reference to it in the LRU cache | |
* which may cause unexpected result. As a result, we will clone the key and value objects. | |
* */ | |
Configuration conf = context.getConfiguration(); | |
key = WritableUtils.clone(key, conf); | |
value = WritableUtils.clone(value, conf); | |
this.context = context; | |
if (combiningFunction != null) { | |
try { | |
if (!lruCache.containsKey(key)) { | |
lruCache.put(key, value); | |
} else { | |
lruCache.put(key, combiningFunction.combine(lruCache.get(key), value)); | |
} | |
} catch (UncheckedIOException ex) { | |
throw new IOException(ex); | |
} catch (UncheckedInterruptedException ex) { | |
throw new InterruptedException(ex.toString()); | |
} | |
} else { | |
context.write(key, value); | |
} | |
} | |
public void flush(Mapper.Context context) throws IOException, InterruptedException { | |
// Emit the key-value pair from the LRU cache. | |
if (!lruCache.isEmpty()) { | |
for (Map.Entry < KEY, VALUE > item: lruCache.entrySet()) { | |
context.write(item.getKey(), item.getValue()); | |
} | |
} | |
lruCache.clear(); | |
} | |
private static class UncheckedIOException extends java.lang.RuntimeException { | |
public UncheckedIOException(Throwable throwable) { | |
super(throwable); | |
} | |
} | |
private static class UncheckedInterruptedException extends java.lang.RuntimeException { | |
public UncheckedInterruptedException(Throwable throwable) { | |
super(throwable); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment