Skip to content

Instantly share code, notes, and snippets.

@PiotrJander
Created November 15, 2016 03:32
Show Gist options
  • Save PiotrJander/114ee18b3b03af8993c1cde22cffd091 to your computer and use it in GitHub Desktop.
Save PiotrJander/114ee18b3b03af8993c1cde22cffd091 to your computer and use it in GitHub Desktop.
/**
* Initialization, just saves the output stream destination
*/
@Override
public void prepare(Map<String,String> stormConf,
TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.context = context;
if (!stormConf.containsKey("mapClass"))
throw new RuntimeException("Mapper class is not specified as a config option");
else {
String mapperClass = stormConf.get("mapClass");
try {
mapJob = (Job)Class.forName(mapperClass).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
// Auto-generated catch block
e.printStackTrace();
throw new RuntimeException("Unable to instantiate the class " + mapperClass);
}
}
if (!stormConf.containsKey("spoutExecutors")) {
throw new RuntimeException("Mapper class doesn't know how many input spout executors");
}
// TODO: determine how many end-of-stream requests are needed
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment