Created
December 2, 2011 21:31
-
-
Save jblomo/1424916 to your computer and use it in GitHub Desktop.
Exploring Hadoop OutputFormat
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> { | |
public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class); | |
public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException { | |
return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable); | |
} | |
public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException { | |
Args args = (Args) Utils.getObject(conf, ARGS_CONF); | |
fs = Utils.getFS(args.outputDirHdfs, conf); | |
if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) { | |
throw new InvalidJobConfException("Speculative execution should be false"); | |
} | |
if(fs.exists(new Path(args.outputDirHdfs))) { | |
throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs); | |
} | |
if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) { | |
throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> { | |
FileSystem _fs; | |
Args _args; | |
Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>(); | |
Progressable _progressable; | |
LocalElephantManager _localManager; | |
int _numWritten = 0; | |
long _lastCheckpoint = System.currentTimeMillis(); | |
public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException { | |
_fs = Utils.getFS(args.outputDirHdfs, conf); | |
_args = args; | |
_progressable = progressable; | |
_localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf)); | |
} | |
private String remoteUpdateDirForShard(int shard) { | |
if(_args.updateDirHdfs==null) return null; | |
else return _args.updateDirHdfs + "/" + shard; | |
} | |
public void write(IntWritable shard, ElephantRecordWritable record) throws IOException { | |
LocalPersistence lp = null; | |
LocalPersistenceFactory fact = _args.spec.getLPFactory(); | |
Map<String, Object> options = _args.persistenceOptions; | |
if(_lps.containsKey(shard.get())) { | |
lp = _lps.get(shard.get()); | |
} else { | |
String updateDir = remoteUpdateDirForShard(shard.get()); | |
String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir); | |
lp = fact.openPersistenceForAppend(localShard, options); | |
_lps.put(shard.get(), lp); | |
progress(); | |
} | |
_args.updater.updateElephant(lp, record.key, record.val); | |
_numWritten++; | |
if(_numWritten % 25000 == 0) { | |
long now = System.currentTimeMillis(); | |
long delta = now - _lastCheckpoint; | |
_lastCheckpoint = now; | |
LOG.info("Wrote last 25000 records in " + delta + " ms"); | |
_localManager.progress(); | |
} | |
} | |
public void close(Reporter reporter) throws IOException { | |
for(Integer shard: _lps.keySet()) { | |
String lpDir = _localManager.localTmpDir("" + shard); | |
LOG.info("Closing LP for shard " + shard + " at " + lpDir); | |
_lps.get(shard).close(); | |
LOG.info("Closed LP for shard " + shard + " at " + lpDir); | |
progress(); | |
String remoteDir = _args.outputDirHdfs + "/" + shard; | |
if(_fs.exists(new Path(remoteDir))) { | |
LOG.info("Deleting existing shard " + shard + " at " + remoteDir); | |
_fs.delete(new Path(remoteDir), true); | |
LOG.info("Deleted existing shard " + shard + " at " + remoteDir); | |
} | |
LOG.info("Copying " + lpDir + " to " + remoteDir); | |
_fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir)); | |
LOG.info("Copied " + lpDir + " to " + remoteDir); | |
progress(); | |
} | |
_localManager.cleanup(); | |
} | |
private void progress() { | |
if(_progressable!=null) _progressable.progress(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
|-- output-directory | |
| |-- France | |
| | |-- part-00000 | |
| | |-- part-00001 | |
| | `-- part-00002 | |
... | | |
| `-- Zimbabwe | |
| |-- part-00000 | |
| |-- part-00001 | |
| `-- part-00002 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package oddjob.hadoop; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; | |
public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> { | |
/** | |
* Use they key as part of the path for the final output file. | |
*/ | |
@Override | |
protected String generateFileNameForKeyValue(Text key, Text value, String leaf) { | |
return new Path(key.toString(), leaf).toString(); | |
} | |
/** | |
* When actually writing the data, discard the key since it is already in | |
* the file path. | |
*/ | |
@Override | |
protected Text generateActualKey(Text key, Text value) { | |
return null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \ | |
-outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \ | |
-input search-logs -output search-frequency-by-country \ | |
-mapper parse-logs.py \ | |
-reducer count-searches.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
|-- output-directory | |
| |-- part-00000 | |
| |-- part-00001 | |
| |-- part-00002 | |
| |-- part-00003 | |
| |-- part-00004 | |
`-- part-00005 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment