Created
January 20, 2015 20:26
-
-
Save strubell/700e30c45721b0120165 to your computer and use it in GitHub Desktop.
qsubbing using factorie
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ProcessSlotFillingCorpusParallel { | |
def main(args: Array[String]) { | |
implicit val random = new scala.util.Random(0) | |
val opts = new ProcessSlotFillingCorpusParallelOpts | |
opts.parse(args) | |
/* Load data files */ | |
if (opts.dataDirs.wasInvoked && opts.dataFiles.wasInvoked) { | |
println("Please specify either a list of data directories or files but not both.") | |
System.exit(1) | |
} | |
assert(opts.dataDirs.wasInvoked || opts.dataFiles.wasInvoked || opts.dataFilesFile.wasInvoked) | |
val allDataFileNames = { | |
if (opts.dataDirs.wasInvoked) | |
opts.dataDirs.value.flatMap(FileUtils.getFileListFromDir(_)) | |
else if (opts.dataFilesFile.wasInvoked) | |
io.Source.fromFile(opts.dataFilesFile.value).getLines.toList | |
else opts.dataFiles.value | |
} | |
// divide all documents into numJobs sets of equal size | |
val numJobs = opts.numJobs.value | |
val memory = opts.memPerJob.value | |
val numCores = opts.numThreads.value | |
val dividedDocuments = Utilities.cut(util.Random.shuffle(allDataFileNames), numJobs) | |
// write filelists | |
val fnamePrefix = "tmp-filelist-" | |
val fnames = (0 until numJobs).map(i => Paths.get(s"$fnamePrefix$i").toAbsolutePath().toString) | |
dividedDocuments.zipWithIndex.foreach{case(doclist,idx) => { | |
val writer = new PrintWriter(fnames(idx)) | |
doclist.foreach{fname => writer.println(fname)} | |
println(fnames(idx)) | |
writer.close() | |
}} | |
println(s"Distributed ${allDataFileNames.length} data files into ${numJobs} sets of ${dividedDocuments.map(_.length).min}-${dividedDocuments.map(_.length).max} files") | |
val docsParam = DistributorParameter[String](opts.dataFilesFile, fnames) | |
val qs = new cc.factorie.util.QSubExecutor(memory, "edu.umass.cs.iesl.tackbp2014.process.ProcessSlotFillingCorpus", numCores) | |
val qsOpts = opts.writeInto(new ProcessSlotFillingCorpusOpts) | |
qsOpts.dataFilesFile.invoke() | |
val distributor = new cc.factorie.util.JobDistributor(qsOpts, Seq(docsParam), qs.execute, 60) | |
val result = distributor.distribute | |
println(s"Finished running $result jobs") | |
// remove created filelists | |
fnames.foreach{fname => Files.delete(Paths.get(fname))} | |
println("Done") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
That "edu.umass.cs.iesl.tackbp2014.process.ProcessSlotFillingCorpus" parameter is the class you want each job to run, it needs to extend HyperparameterMain, which basically means instead of "main" it has a function called "evaluateParameters" which returns a double (since you are not actually optimizing, the value it returns is irrelevant).