Skip to content

Instantly share code, notes, and snippets.

@bvpatel
Last active April 11, 2019 06:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bvpatel/5bb98bcdf5622262629c363da5b5b9eb to your computer and use it in GitHub Desktop.
Save bvpatel/5bb98bcdf5622262629c363da5b5b9eb to your computer and use it in GitHub Desktop.
Distributed cache for object in Map reduce job
package pubmatic.matrix.confluent.utils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.Logger;
import java.io.*;
import java.net.URI;
import java.util.UUID;
/**
* {@link DistributedObjectCache} is used to store and distribute java object across all mappers and reducers.
* Hadoop API does not support this kind of functionality. We wrote custom wrapper on DistributedCache.
* We used serialization and deserialization to store and retrieve the object. The object which is going to
* store in DistributedObjectCache should implement serialize interface. We are storing object into HDFS.
*
* @author Bhavesh Patel
* @version 1.0
* @since 2019-05-09
*/
public class DistributedObjectCache {
private static Logger LOGGER = Logger.getLogger(DistributedObjectCache.class);
//Default object path which is going to use for storing object into HDFS.
private static final String DEFAULT_OBJECT_PATH = "/raw/distributed_objects";
//Default prefix for object key
private static final String DEFAULT_OBJECT_PREFIX = "distributed_object";
//Default file extension for object key
private static final String DEFAULT_OBJECT_EXTENSION = ".ser";
/**
* Store object into DistributedObjectCache and returns object key.
* The Object which is going to store into DistributedObjectCache implements Serialisation interface.
*
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers
* @param object The object which is going to store into DistributedObjectCache
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache
*/
public static String addObjectToCache(Job job, Object object) {
return addObjectInCache(job, writeObjectToHDFS(job, DEFAULT_OBJECT_PATH + "/" + generateObjectKey(), object));
}
/**
* Store object into DistributedObjectCache and returns object key.
* The Object which is going to store into DistributedObjectCache implements Serialisation interface.
*
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers
* @param object The object which is going to store into DistributedObjectCache
* @param objectKey The object key with which the specified object is to be associated
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache
*/
public static String addObjectToCache(Job job, Object object, String objectKey) {
return addObjectInCache(job, writeObjectToHDFS(job, DEFAULT_OBJECT_PATH + "/" + getObjectKey(objectKey), object));
}
/**
* Store object into DistributedObjectCache and returns object key.
* The Object which is going to store into DistributedObjectCache implements Serialisation interface.
*
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers
* @param object The object which is going to store into DistributedObjectCache
* @param objectKey The object key with which the specified object is to be associated
* @param hdfsPath The HDFS path which is used to store objects on HDFS.
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache
*/
public static String addObjectToCache(Job job, Object object, String objectKey, String hdfsPath) {
return addObjectInCache(job, writeObjectToHDFS(job, hdfsPath + "/" + getObjectKey(objectKey), object));
}
/**
* Returns the object to which the specified object key is mapped, or null
* if {@link DistributedObjectCache} contains no mapping for the object key.
*
* @param context The hadoop job context i.e. mapper context or reducer context
* @param objectKey The object key with which the specified object is to be associated
* @param objectClass The class of the object
* @return the value to which the specified object key is mapped, or null
* if {@link DistributedObjectCache} contains no mapping for the object key
*/
public static Object getObjectFromCache(JobContext context, String objectKey, Class objectClass) {
Object object = loadObjectFromHDFS(context, getObjectPathFromCache(context, objectKey));
return objectClass.cast(object);
}
/**
* Returns object path to which the specified object key is mapped from hadoop context(mapper or reducer context),
* or null if hadoop job context contains on mapping for the object key.
*
* @param context The hadoop map-reduce job context
* @param objectKey The object key with which the specified object path is to be associated
* @return the object path to which the specified object key is mapped, or null
* if The hadoop map-reduce job context contains no mapping for the object key
*/
private static String getObjectPathFromCache(JobContext context, String objectKey) {
try {
LOGGER.info("Getting object path from cache objects for DistributedObjectCache for Object key: " + objectKey);
if (context == null || objectKey == null) {
LOGGER.warn("Job context or Object key is null. " +
"It is mandatory parameter for getting object from cache");
LOGGER.warn("Object key: " + objectKey);
return null;
}
URI[] cacheObjects = context.getCacheFiles();
if (cacheObjects != null && cacheObjects.length > 0) {
for (URI cacheObjectPath : cacheObjects) {
LOGGER.info("Objects from cache: " + cacheObjectPath);
if (cacheObjectPath.toString().contains(objectKey)) {
LOGGER.info("Successfully found object path from cache objects for DistributedObjectCache. Object path: " + cacheObjectPath);
return cacheObjectPath.toString();
}
}
}
} catch (IOException ex) {
LOGGER.error("Exception occurred while getting object from cache. ", ex);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex);
}
return null;
}
/**
* Returns the object to which the specified object key is mapped, or
* null if {@link DistributedObjectCache} contains no mapping for the object key.
*
* @param context The hadoop job context i.e. mapper context or reducer context
* @param objectKey The object key with which the specified object is to be associated
* @return the value to which the specified object key is mapped, or
* null if {@link DistributedObjectCache} contains no mapping for the object key
*/
private static Object loadObjectFromCache(JobContext context, String objectKey) {
LOGGER.info("Loading Object from HDFS for DistributedObjectCache with Object key: " + objectKey);
if (context == null || objectKey == null) {
LOGGER.warn("Job context or Object key is null. " +
"It is mandatory parameter for getting object from cache");
LOGGER.warn("Object key: " + objectKey);
return null;
}
ObjectInputStream ois = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(FilenameUtils.getName(objectKey));
ois = new ObjectInputStream(fis);
Object object = ois.readObject();
LOGGER.info("Successfully Loaded Object from HDFS for DistributedObjectCache with Object key: " + objectKey);
return object;
} catch (IOException ex) {
LOGGER.error("Exception occurred while getting object from cache. ", ex);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex);
} catch (ClassNotFoundException e) {
LOGGER.error("Exception occurred while getting object from cache. ", e);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", e);
} finally {
try {
if (fis != null)
fis.close();
if (ois != null)
ois.close();
} catch (IOException e) {
LOGGER.error("Exception occurred while getting object from cache", e);
}
}
}
/**
* Returns the object to which the specified object key is mapped, or
* null if {@link DistributedObjectCache} contains no mapping for the object key.
*
* @param context The hadoop job context i.e. mapper context or reducer context
* @param objectKey The object key with which the specified object is to be associated
* @return the value to which the specified object key is mapped, or
* null if {@link DistributedObjectCache} contains no mapping for the object key
*/
private static Object loadObjectFromHDFS(JobContext context, String objectKey) {
LOGGER.info("Loading Object from HDFS for DistributedObjectCache with Object key: " + objectKey);
if (context == null || objectKey == null) {
LOGGER.warn("Job context or Object key is null. " +
"It is mandatory parameter for getting object from cache");
LOGGER.warn("Object key: " + objectKey);
return null;
}
ObjectInputStream ois = null;
try {
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
ois = new ObjectInputStream(fileSystem.open(new Path(objectKey)));
Object object = ois.readObject();
LOGGER.info("Successfully Loaded Object from HDFS for DistributedObjectCache with Object key: " + objectKey);
return object;
} catch (IOException ex) {
LOGGER.error("Exception occurred while getting object from cache. ", ex);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex);
} catch (ClassNotFoundException e) {
LOGGER.error("Exception occurred while getting object from cache. ", e);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", e);
} finally {
try {
if (ois != null)
ois.close();
} catch (IOException e) {
LOGGER.error("Exception occurred while getting object from cache", e);
}
}
}
/**
* Store object into HDFS. It will serialize object and store into HDFS. Object should support serialization
*
* @param job Hadoop map-reduce job
* @param objectKey The object key with which the specified object is to be associated
* @param object The object which store into {@link DistributedObjectCache}
* @return The object key with which the specified object is to be associated
*/
private static String writeObjectToHDFS(Job job, String objectKey, Object object) {
LOGGER.info("Writing Object into HDFS for DistributedObjectCache with Object key: " + objectKey);
if (job == null || objectKey == null || object == null) {
LOGGER.warn("Job configuration, Object key or Object is null. Please provide valid inputs");
LOGGER.warn("Object key: " + objectKey + " , Object: " + object);
return null;
}
OutputStream out = null;
ObjectOutputStream oos = null;
try {
FileSystem fs = FileSystem.get(URI.create(objectKey), job.getConfiguration());
out = fs.create(new Path(objectKey));
oos = new ObjectOutputStream(out);
oos.writeObject(object);
LOGGER.info("Successfully wrote Object: " + objectKey + " into HDFS");
return objectKey;
} catch (IOException ioe) {
LOGGER.error("Unable to add object into DistributedObjectCache.", ioe);
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ioe);
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
if (oos != null) {
oos.flush();
oos.close();
}
} catch (IOException ioe) {
LOGGER.warn("Unable to close output stream while adding DistributedObjectCache", ioe);
}
}
}
/**
* Add object file path into the hadoop map-reduce job cache.
*
* @param job The hadoop map reduce job
* @param objectKey Unique object key
* @return Object key which will use for retrieve Object from cache.
*/
private static String addObjectInCache(Job job, String objectKey) {
if (objectKey != null)
job.addCacheFile(new Path(objectKey).toUri());
return FilenameUtils.getName(objectKey);
}
/**
* Check object key is valid. If object key is null then will create new unique object key
*
* @param objectKey the objectkey
* @return Unique object key
*/
private static String getObjectKey(String objectKey) {
if (objectKey == null)
return generateObjectKey();
return isObjectNameExtension(objectKey) ? objectKey : objectKey + DEFAULT_OBJECT_EXTENSION;
}
/**
* Find the object key has extension or not.
*
* @param objectKey The object key which we want to find object extension.
* @return true if object name has extension else false.
*/
private static boolean isObjectNameExtension(String objectKey) {
String extension = FilenameUtils.getExtension(objectKey);
return extension != null && !extension.trim().isEmpty();
}
/**
* Generate unique object key with default object key prefix and extension
*
* @return Generated unique object key
*/
private static String generateObjectKey() {
return DEFAULT_OBJECT_PREFIX + UUID.randomUUID().toString() + DEFAULT_OBJECT_EXTENSION;
}
}
/**
* {@link DistributedObjectCacheNotFound} is used to throw exception when object is not found or unable to read from DistributedObjectCache.
*
* @author Bhavesh Patel
* @version 1.0
* @since 2019-05-09
*/
class DistributedObjectCacheNotFound extends RuntimeException {
public DistributedObjectCacheNotFound(Exception ex) {
super(ex);
}
public DistributedObjectCacheNotFound(String message, Exception ex) {
super(message, ex);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment