Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
How to use a pool of processes to speed up analysis in Python
{
"metadata": {
"name": "",
"signature": "sha256:a7fcf068a9e2c463f700485ac9a4fa64f79122a2ed4302a9c0e3f79d4d90cbf1"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"!date\n",
"from __future__ import print_function\n",
"import time"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Wed Aug 27 19:16:08 JST 2014\r\n"
]
}
],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## How to use a pool of processes to speed up analysis in Python"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Scripts written in Python always execute as a single process. This is fine for short analyses, but soon becomes a hindrance when running analyses that take more than a minute to finish and need to be done for a couple thousand iterations. \n",
"\n",
"If you have a multi-core processor, and have multiple indepedent samples that will be processed by the same fuction, one way to speed up the work is to parallelize execution. Python has a built-in module called `multiprocessing` that can be used to easily refactor code that was originally set-up as a `for` loop. \n",
"\n",
"The `multiprocessing` module is an interface to spawn new processes using subprocesses instead of threads to sidestep the Python global intepreter lock. In the `multprocessing` module, we will use the `Pool` class to instantiate a pool of worker processes."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from multiprocessing import Pool"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here we define a function that contains the code for analyzing the data. In this example, The function will compute the %GC of a sequence. To set up the function, I used the input parameter to pass it a sequence to be analyzed. The function returns the %GC of the input sequence as a floating point number."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import numpy as np\n",
"\n",
"def gc(sequence):\n",
" return np.sum((1 for b in sequence if b in ['G', 'C'])) / float(len(sequence))"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Without thinking about parallel execution, we can get the same values by using a `for` loop. Given 100 samples, I wrote a `for` loop that iterates through a list of the samples and calls the function during each iteration."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from Bio import SeqIO\n",
"samples = [str(rec.seq) for rec in SeqIO.parse('cds.all.fa', 'fasta') if len(rec.seq) > 1]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"start_time = time.time() # Time the execution\n",
"gcs = []\n",
"for sample in samples:\n",
" percent_gc = gc(sample)\n",
" gcs.append(percent_gc)\n",
"print('Executed in {0} seconds'.format(time.time() - start_time))\n",
"print('-'*80)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Executed in 6.11677885056 seconds\n",
"--------------------------------------------------------------------------------\n"
]
}
],
"prompt_number": 5
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To refactor the code for parallel execution, first you need a new `Pool` object. \n",
"\n",
"Here, I specified `processes=2` such that the pool accomodates up to 2 workers simulataneously. This number depends on how many processes a machine can run at the same time. For example: I have a Mac mini with a quad-core processor that can execute 2 processes per core, which means I can up to 8 (4\\*2) processes at a time.\n",
"\n",
"To check the maximum number of processes for your machine, you can run the following statement:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import multiprocessing\n",
"multiprocessing.cpu_count()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 6,
"text": [
"8"
]
}
],
"prompt_number": 6
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After creating the `Pool` object, write a `for` loop that maps the function to its input. \n",
"\n",
"The statement `pool.imap_unordered()` is funcationally similar to `map()` in that it applies a transforming function unto a each element of a list. However, `pool.imap_unordered()` is a generator that does not guarantee that the output sequence follows the input sequence. "
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"if __name__ == '__main__':\n",
" start_time = time.time() # Time the execution\n",
" gcs = []\n",
" pool = Pool(processes=2)\n",
" for _ in pool.imap_unordered(gc, samples): \n",
" gcs.append(_)\n",
" pool.close()\n",
" print('Executed in {0} seconds'.format(time.time() - start_time))\n",
" print('-'*80)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Executed in 5.38215899467 seconds\n",
"--------------------------------------------------------------------------------\n"
]
}
],
"prompt_number": 7
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When execution time is short, splitting the work into multiple processes doesn't help because of the time it takes just to initialize the set-up. \n",
"\n",
"The time saved by running processes in parellel becomes more evident as your code runs longer for each sample, have more samples to be processed, and have more computing cores available."
]
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment