Skip to content

Instantly share code, notes, and snippets.

@jblomo
Created December 2, 2011 21:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jblomo/1424916 to your computer and use it in GitHub Desktop.
Save jblomo/1424916 to your computer and use it in GitHub Desktop.
Exploring Hadoop OutputFormat
public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
}
public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
Args args = (Args) Utils.getObject(conf, ARGS_CONF);
fs = Utils.getFS(args.outputDirHdfs, conf);
if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
throw new InvalidJobConfException("Speculative execution should be false");
}
if(fs.exists(new Path(args.outputDirHdfs))) {
throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
}
if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
}
}
}
public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
FileSystem _fs;
Args _args;
Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
Progressable _progressable;
LocalElephantManager _localManager;
int _numWritten = 0;
long _lastCheckpoint = System.currentTimeMillis();
public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
_fs = Utils.getFS(args.outputDirHdfs, conf);
_args = args;
_progressable = progressable;
_localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
}
private String remoteUpdateDirForShard(int shard) {
if(_args.updateDirHdfs==null) return null;
else return _args.updateDirHdfs + "/" + shard;
}
public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
LocalPersistence lp = null;
LocalPersistenceFactory fact = _args.spec.getLPFactory();
Map<String, Object> options = _args.persistenceOptions;
if(_lps.containsKey(shard.get())) {
lp = _lps.get(shard.get());
} else {
String updateDir = remoteUpdateDirForShard(shard.get());
String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
lp = fact.openPersistenceForAppend(localShard, options);
_lps.put(shard.get(), lp);
progress();
}
_args.updater.updateElephant(lp, record.key, record.val);
_numWritten++;
if(_numWritten % 25000 == 0) {
long now = System.currentTimeMillis();
long delta = now - _lastCheckpoint;
_lastCheckpoint = now;
LOG.info("Wrote last 25000 records in " + delta + " ms");
_localManager.progress();
}
}
public void close(Reporter reporter) throws IOException {
for(Integer shard: _lps.keySet()) {
String lpDir = _localManager.localTmpDir("" + shard);
LOG.info("Closing LP for shard " + shard + " at " + lpDir);
_lps.get(shard).close();
LOG.info("Closed LP for shard " + shard + " at " + lpDir);
progress();
String remoteDir = _args.outputDirHdfs + "/" + shard;
if(_fs.exists(new Path(remoteDir))) {
LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
_fs.delete(new Path(remoteDir), true);
LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
}
LOG.info("Copying " + lpDir + " to " + remoteDir);
_fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
LOG.info("Copied " + lpDir + " to " + remoteDir);
progress();
}
_localManager.cleanup();
}
private void progress() {
if(_progressable!=null) _progressable.progress();
}
}
|-- output-directory
| |-- France
| | |-- part-00000
| | |-- part-00001
| | `-- part-00002
... |
| `-- Zimbabwe
| |-- part-00000
| |-- part-00001
| `-- part-00002
package oddjob.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
/**
* Use they key as part of the path for the final output file.
*/
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
return new Path(key.toString(), leaf).toString();
}
/**
* When actually writing the data, discard the key since it is already in
* the file path.
*/
@Override
protected Text generateActualKey(Text key, Text value) {
return null;
}
}
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
-outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
-input search-logs -output search-frequency-by-country \
-mapper parse-logs.py \
-reducer count-searches.py
|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
`-- part-00005
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment