Skip to content

Instantly share code, notes, and snippets.

@strubell
Created January 20, 2015 20:26
Show Gist options
  • Save strubell/700e30c45721b0120165 to your computer and use it in GitHub Desktop.
Save strubell/700e30c45721b0120165 to your computer and use it in GitHub Desktop.
qsubbing using factorie
object ProcessSlotFillingCorpusParallel {
def main(args: Array[String]) {
implicit val random = new scala.util.Random(0)
val opts = new ProcessSlotFillingCorpusParallelOpts
opts.parse(args)
/* Load data files */
if (opts.dataDirs.wasInvoked && opts.dataFiles.wasInvoked) {
println("Please specify either a list of data directories or files but not both.")
System.exit(1)
}
assert(opts.dataDirs.wasInvoked || opts.dataFiles.wasInvoked || opts.dataFilesFile.wasInvoked)
val allDataFileNames = {
if (opts.dataDirs.wasInvoked)
opts.dataDirs.value.flatMap(FileUtils.getFileListFromDir(_))
else if (opts.dataFilesFile.wasInvoked)
io.Source.fromFile(opts.dataFilesFile.value).getLines.toList
else opts.dataFiles.value
}
// divide all documents into numJobs sets of equal size
val numJobs = opts.numJobs.value
val memory = opts.memPerJob.value
val numCores = opts.numThreads.value
val dividedDocuments = Utilities.cut(util.Random.shuffle(allDataFileNames), numJobs)
// write filelists
val fnamePrefix = "tmp-filelist-"
val fnames = (0 until numJobs).map(i => Paths.get(s"$fnamePrefix$i").toAbsolutePath().toString)
dividedDocuments.zipWithIndex.foreach{case(doclist,idx) => {
val writer = new PrintWriter(fnames(idx))
doclist.foreach{fname => writer.println(fname)}
println(fnames(idx))
writer.close()
}}
println(s"Distributed ${allDataFileNames.length} data files into ${numJobs} sets of ${dividedDocuments.map(_.length).min}-${dividedDocuments.map(_.length).max} files")
val docsParam = DistributorParameter[String](opts.dataFilesFile, fnames)
val qs = new cc.factorie.util.QSubExecutor(memory, "edu.umass.cs.iesl.tackbp2014.process.ProcessSlotFillingCorpus", numCores)
val qsOpts = opts.writeInto(new ProcessSlotFillingCorpusOpts)
qsOpts.dataFilesFile.invoke()
val distributor = new cc.factorie.util.JobDistributor(qsOpts, Seq(docsParam), qs.execute, 60)
val result = distributor.distribute
println(s"Finished running $result jobs")
// remove created filelists
fnames.foreach{fname => Files.delete(Paths.get(fname))}
println("Done")
}
@strubell
Copy link
Author

That "edu.umass.cs.iesl.tackbp2014.process.ProcessSlotFillingCorpus" parameter is the class you want each job to run, it needs to extend HyperparameterMain, which basically means instead of "main" it has a function called "evaluateParameters" which returns a double (since you are not actually optimizing, the value it returns is irrelevant).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment