Skip to content

Instantly share code, notes, and snippets.

@kgori
Last active September 24, 2015 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kgori/5fd288b2fe0783ca006d to your computer and use it in GitHub Desktop.
Save kgori/5fd288b2fe0783ca006d to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:c328a3539a20823fcf99c91fd3535c8859c303b4e6e0b45fa27f6b98480d2766"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Running tasks\n",
"This introduces the `zoo.tasks` package. It has two modules: `zoo.tasks.tasks` (`tasks`) and `zoo.tasks.jobhandlers` (`jobhandlers`). \n",
"\n",
"`tasks` is a place for code that runs common tasks, for instance running a phylogenetic analysis and collecting the results. Tasks are written as single functions, but also there are `TaskInterface`s that expose the tasks and also provide functions to fill in the function arguments from simple inputs. So, for example, the `RaxmlTaskInterface` can just be given a list of files, and will coordinate running Raxml on all of them.\n",
"\n",
"`jobhandlers` has code for running tasks either sequentially (`SequentialJobHandler`), in multiple processes (`ProcesspoolJobHandler`) or in multiple threads (`ThreadpoolJobHandler`). These all work the same way -- give them a task function, a list of arguments to pass to the function, and optionally a progress message to print as tasks are completed -- and they work their way through the tasks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###Example\n",
"Here we run Raxml on a list of alignment files using the RaxmlTaskInterface"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from zoo import tasks\n",
"import glob\n",
"\n",
"# There are some alignments in this directory\n",
"indir = '/Users/kgori/scratch/simtest4'\n",
"files = glob.glob('{}/*.phy'.format(indir))[:4]\n",
"files"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 1,
"text": [
"['/Users/kgori/scratch/simtest4/class1_1.phy',\n",
" '/Users/kgori/scratch/simtest4/class1_2.phy',\n",
" '/Users/kgori/scratch/simtest4/class1_3.phy',\n",
" '/Users/kgori/scratch/simtest4/class1_4.phy']"
]
}
],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Finding Raxml on my system\n",
"The `RaxmlTaskInterface` will only run correctly if it can find Raxml on my system. It has a guess, but it's best to set it explicitly to avoid problems."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Set up the task interface\n",
"interface = tasks.RaxmlTaskInterface()\n",
"\n",
"# See what executable the interface wants to run\n",
"print interface.executable # This is the guess"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"raxmlHPC-AVX\n"
]
}
],
"prompt_number": 2
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I have this file on my system, so the interface can run it. If I didn't, I would set the executable directly to one I know I have."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Set the executable directly\n",
"interface.set_executable('raxmlHPC-SSE3')\n",
"print interface.executable"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"raxmlHPC-SSE3\n"
]
}
],
"prompt_number": 3
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###Job handlers\n",
"Now I have set up the task interface, I can set up a job handler to run my tasks.\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Set up a job handler to run jobs one at a time\n",
"handler = tasks.SequentialJobHandler()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# I need to provide a destination directory for the output, and the model for raxml to run\n",
"outdir = indir + '/raxml_sequential'\n",
"model = 'PROTGAMMAWAG' # this is a raxml model for analysing proteins"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Generate a list of arguments to pass to the tasks function using `scrape_args`\n",
"args = interface.scrape_args(files, outdir, model)\n",
"print args"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[('raxmlHPC-SSE3', '/Users/kgori/scratch/simtest4/class1_1.phy', '/Users/kgori/scratch/simtest4/raxml_sequential', 'PROTGAMMAWAG', None, 1, None, False, False), ('raxmlHPC-SSE3', '/Users/kgori/scratch/simtest4/class1_2.phy', '/Users/kgori/scratch/simtest4/raxml_sequential', 'PROTGAMMAWAG', None, 1, None, False, False), ('raxmlHPC-SSE3', '/Users/kgori/scratch/simtest4/class1_3.phy', '/Users/kgori/scratch/simtest4/raxml_sequential', 'PROTGAMMAWAG', None, 1, None, False, False), ('raxmlHPC-SSE3', '/Users/kgori/scratch/simtest4/class1_4.phy', '/Users/kgori/scratch/simtest4/raxml_sequential', 'PROTGAMMAWAG', None, 1, None, False, False)]\n"
]
}
],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# I can be more explicit with how I want the jobs to run -- full description is in the docstring\n",
"args = interface.scrape_args(files, outdir, model, partition_files=None, seed=1234, threads=1, parsimony=False, fast_tree=False)\n",
"print interface.scrape_args.__doc__"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n",
" RaxmlTaskInterface.scrape_args(self, filenames, outdir, model, partition_files=None, \n",
" seed=None, threads=1, parsimony=False, fast_tree=False)\n",
" \n",
" Generate a list of arguments for a set of Raxml tasks.\n",
" filenames - list of alignment filenames\n",
" outdir - output directory to write results to\n",
" model - Raxml model name\n",
" partition_files - list of partitioning scheme files for partitioned analysis\n",
" needs to be in corresponding order to filenames\n",
" Can be `None' if no partitioning is wanted\n",
" seed - The random number seed for Raxml\n",
" threads - number of threads for multithreaded Raxml\n",
" parsimony - only do parsimony tree search, then optimise model parameters [True/False]\n",
" fast_tree - do fast ML tree search, then optimise model parameters. [True/False] \n",
" Takes precedence over parsimony if both are set True \n",
" \n"
]
}
],
"prompt_number": 7
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Running the jobs"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time result = handler(interface.get_task(), args, 'Sequential progress', batchsize=1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Sequential progress: 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Sequential progress: 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Sequential progress: 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Sequential progress: 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Sequential progress: 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Sequential progress: 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Sequential progress: 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Sequential progress: 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Sequential progress: 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Sequential progress: 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 117 ms, sys: 22.7 ms, total: 139 ms\n",
"Wall time: 1min\n"
]
}
],
"prompt_number": 8
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The results are written to the specified output directory. This is where all the Raxml output files are written. The results are parsed by `zoo.wrappers.parsers.RaxmlParser`, and returned to the user. They also get written as `.json` files in a results subdirectory."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Results are a list of dictionaries. The main results are in the top level dictionary\n",
"first_result = result[0]\n",
"print 'Main results: {}\\n'.format(first_result.keys())\n",
"print 'Tree = {}'.format(first_result['ml_tree'])\n",
"print 'Likelihood = {}\\n'.format(first_result['likelihood'])\n",
"\n",
"# More detailed results for each partition are in the result['partitions'][*number*] subdictionaries\n",
"print 'Partitions: {}'.format(first_result['partitions'].keys())\n",
"print 'Detailed per-partition results: {}'.format(first_result['partitions'][0].keys())\n",
"print 'Alpha parameter of Gamma distribution = {}'.format(first_result['partitions'][0]['alpha'])"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Main results: ['likelihood', 'ml_tree', 'partitions']\n",
"\n",
"Tree = ((Sp4:1.02802374596207179991,((Sp9:0.00308537727336775404,Sp8:0.00741267140653078041):0.25388670641335969735,Sp10:0.37424739517223848395):0.68670897669530217566):0.84935962184789259855,((Sp2:0.74273468875662473909,Sp3:0.78754591931459183574):0.65958155653334138435,(Sp6:0.57636803376972933766,(Sp7:0.58977723519308433708,Sp5:0.41683939670653025189):0.09490086555097941579):0.29381005276578120444):0.21112988928908871245,Sp1:1.27568273584952951438):0.0;\n",
"\n",
"Likelihood = -10612.459871\n",
"\n",
"Partitions: [0]\n",
"Detailed per-partition results: ['alpha', 'model', 'rates', 'name', 'frequencies']\n",
"Alpha parameter of Gamma distribution = 999.874019\n"
]
}
],
"prompt_number": 9
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parallel execution\n",
"With `JobHandler`s it is easy to run tasks in parallel. For general python code, thread-based parallelism usually offers no speed-up, because python's Global Interpreter Lock (GIL) prevents concurrent access by threads. However, for tasks like running Raxml, which is independent of python, speed-ups are available through either process-based parallelism or thread-based parallelism.\n",
"\n",
"Here we will use thread-based parallelism via a `ThreadpoolJobHandler`, as it will use less memory than using process-based parallelism."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"handler = tasks.ThreadpoolJobHandler(4) # Use 4 threads\n",
"outdir = indir + '/raxml_threaded'\n",
"args = interface.scrape_args(files, outdir, model)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time result = handler(interface.get_task(), args, 'Threaded progress', batchsize=1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 129 ms, sys: 24.1 ms, total: 154 ms\n",
"Wall time: 25.3 s\n"
]
}
],
"prompt_number": 11
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Parallel execution earned us a more than 2x speed-up (limited by length of time to run the single longest job)."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Can also use multiple threads with Raxml. Here we use the job handler to run 4 Raxml instances at once, \n",
"# and each one of these uses 2 threads (8 threads total)\n",
"\n",
"interface.set_executable('raxmlHPC-PTHREADS-SSE3') # Use threaded version of Raxml\n",
"args = interface.scrape_args(files, outdir+'2', model, threads=2) # <-- Ask for 2 threads"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time result = handler(interface.get_task(), args, 'Threaded progress', batchsize=1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Threaded progress (TP:4w:1b): 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Threaded progress (TP:4w:1b): 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 151 ms, sys: 26.2 ms, total: 177 ms\n",
"Wall time: 18 s\n"
]
}
],
"prompt_number": 13
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##Summary\n",
" interface = tasks.TaskInterface()...\n",
" interface.set_executable(...)\n",
" handler = tasks.JobHandler(...)\n",
" args = interface.scrape_args(files...)\n",
" handler(interface.get_task(), args, message, batchsize)\n",
" \n",
"There is a `batchsize` argument. This is used to package tasks into batches. Each thread will then complete the whole batch of tasks before returning the result and getting another batch. This can be useful when running a large number of fast tasks, to reduce communication overhead. For Raxml / Phyml this is not much of an issue."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Phyml example\n",
"Running Phyml is more or less the same as running Raxml."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"interface = tasks.PhymlTaskInterface()\n",
"interface.set_executable('phyml')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 14
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"handler = tasks.ThreadpoolJobHandler(4)\n",
"args = interface.scrape_args(files, indir + '/phyml_threaded', 'WAG')\n",
"result = handler(interface.get_task(), args, 'Phyml progress', batchsize=1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Phyml progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Phyml progress (TP:4w:1b): 0/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Phyml progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Phyml progress (TP:4w:1b): 1/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Phyml progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Phyml progress (TP:4w:1b): 2/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Phyml progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Phyml progress (TP:4w:1b): 3/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"Phyml progress (TP:4w:1b): 4/4\n"
]
},
{
"output_type": "stream",
"stream": "stderr",
"text": [
"INFO:zoo.tasks.jobhandlers:Phyml progress (TP:4w:1b): 4/4\n"
]
}
],
"prompt_number": 16
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print result"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[{'likelihood': -10612.34981, 'ml_tree': '(Sp8:0.00741377,Sp9:0.00308377,(Sp10:0.37412735,(Sp4:1.02747973,(Sp1:1.27463949,((Sp6:0.57615719,(Sp5:0.41673536,Sp7:0.58962098)0.873000:0.09485845)0.975000:0.29366640,(Sp2:0.74229106,Sp3:0.78719331)1.000000:0.65906872)0.825000:0.21123053)1.000000:0.84853230)0.999000:0.68623861)0.996000:0.25388287);', 'partitions': {0: {'alpha': 61352.061, 'model': 'WAG', 'rates': None, 'frequencies': None}}}, {'likelihood': -9068.42136, 'ml_tree': '(Sp10:0.24214538,(Sp9:0.00236387,Sp8:0.00000019)0.997000:0.34933525,(Sp4:1.21270582,((Sp6:0.36270274,(Sp7:0.61058996,Sp5:0.47723249)0.982000:0.21973359)0.958000:0.31498213,(Sp1:1.48863085,(Sp2:0.58013577,Sp3:0.75928904)0.999000:0.69604362)0.800000:0.15696477)1.000000:0.96200093)0.992000:0.64629138);', 'partitions': {0: {'alpha': 60144.352, 'model': 'WAG', 'rates': None, 'frequencies': None}}}, {'likelihood': -2957.60421, 'ml_tree': '(Sp8:0.00710949,Sp9:0.00000077,(Sp10:0.43317645,(Sp4:0.87687011,((Sp1:1.53140922,(Sp6:0.27321586,(Sp7:0.60927056,Sp5:0.55084860)0.883000:0.19239719)0.837000:0.35856526)0.000000:0.09194060,(Sp2:0.75279515,Sp3:0.72480859)0.978000:0.74100185)0.992000:1.01165804)0.977000:0.61084893)0.798000:0.14117400);', 'partitions': {0: {'alpha': 109.405, 'model': 'WAG', 'rates': None, 'frequencies': None}}}, {'likelihood': -13698.13565, 'ml_tree': '(Sp8:0.00620049,Sp9:0.00634715,(Sp10:0.28364548,(Sp4:0.84891636,((Sp6:0.39724389,(Sp7:0.63452407,Sp5:0.43812009)0.991000:0.16179101)0.983000:0.28461680,(Sp1:1.46752953,(Sp2:0.72612541,Sp3:0.59766623)1.000000:0.70416175)0.609000:0.06277839)1.000000:1.08021621)1.000000:0.64483438)1.000000:0.33591550);', 'partitions': {0: {'alpha': 60194.762, 'model': 'WAG', 'rates': None, 'frequencies': None}}}]\n"
]
}
],
"prompt_number": 17
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment