Skip to content

Instantly share code, notes, and snippets.

@wasade
Last active May 20, 2016 13:42
Show Gist options
  • Save wasade/c3b3e8baa7c16fe3699d4371c8fffb53 to your computer and use it in GitHub Desktop.
Save wasade/c3b3e8baa7c16fe3699d4371c8fffb53 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"import os\n",
"import ipyparallel as ipp\n",
"\n",
"rc = ipp.Client()\n",
"ar = rc[:].apply_async(os.getpid)\n",
"ar.get_dict()\n",
"\n",
"bv = rc.load_balanced_view()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from skbio.diversity import block_beta_diversity"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from skbio import TreeNode\n",
"from io import StringIO\n",
"import numpy as np\n",
"tree = TreeNode.read(StringIO(\n",
" '(((((OTU1:0.5,OTU2:0.5):0.5,OTU3:1.0):1.0):0.0,'\n",
" '(OTU4:0.75,(OTU5:0.5,(OTU6:0.5,OTU7:0.5):0.5):'\n",
" '0.5):1.25):0.0)root;'))\n",
"otu_ids = ['OTU1', 'OTU2', 'OTU3', 'OTU4', 'OTU5', 'OTU6', 'OTU7']\n",
"data = np.asarray([[23, 64, 14, 0, 0, 3, 1], ## CURRENTLY NEED ARRAY-IZED DATA\n",
" [0, 3, 35, 42, 0, 12, 1],\n",
" [0, 5, 5, 0, 40, 40, 0],\n",
" [44, 35, 9, 0, 1, 0, 0],\n",
" [0, 2, 8, 0, 35, 45, 1],\n",
" [0, 0, 25, 35, 0, 19, 0]], dtype=int)\n",
"ids = list('ABCDEF')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"6x6 distance matrix\n",
"IDs:\n",
"'A', 'B', 'C', 'D', 'E', 'F'\n",
"Data:\n",
"[[ 0. 0.16666667 0.20689655 0.27586207 0.13793103 0.36666667]\n",
" [ 0.16666667 0. 0.23333333 0.40625 0.16666667 0.21428571]\n",
" [ 0.20689655 0.23333333 0. 0.22222222 0.07407407 0.32142857]\n",
" [ 0.27586207 0.40625 0.22222222 0. 0.27586207 0.5 ]\n",
" [ 0.13793103 0.16666667 0.07407407 0.27586207 0. 0.36666667]\n",
" [ 0.36666667 0.21428571 0.32142857 0.5 0.36666667 0. ]]\n"
]
}
],
"source": [
"# serial\n",
"print(str(block_beta_diversity(\"unweighted_unifrac\", data, ids, tree=tree, otu_ids=otu_ids, k=2)))"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"parallelinating\n",
"6x6 distance matrix\n",
"IDs:\n",
"'A', 'B', 'C', 'D', 'E', 'F'\n",
"Data:\n",
"[[ 0. 0.16666667 0.20689655 0.27586207 0.13793103 0.36666667]\n",
" [ 0.16666667 0. 0.23333333 0.40625 0.16666667 0.21428571]\n",
" [ 0.20689655 0.23333333 0. 0.22222222 0.07407407 0.32142857]\n",
" [ 0.27586207 0.40625 0.22222222 0. 0.27586207 0.5 ]\n",
" [ 0.13793103 0.16666667 0.07407407 0.27586207 0. 0.36666667]\n",
" [ 0.36666667 0.21428571 0.32142857 0.5 0.36666667 0. ]]\n"
]
}
],
"source": [
"# parallel execution using ipython\n",
"def map_f(func, kw_gen):\n",
" print(\"parallelinating\")\n",
" return (bv.apply_async(func, **kwargs) for kwargs in kw_gen)\n",
"\n",
"# now with parallel reduction\n",
"def reduce_f(dms):\n",
" from skbio.diversity._block import _reduce\n",
" import itertools\n",
" \n",
" class grow_iter_grow(object):\n",
" \"\"\"a growable iterator\"\"\"\n",
" def __init__(self, starter):\n",
" self._mygen = starter\n",
" self.staged = next(self._mygen)\n",
" \n",
" def _get_next(self):\n",
" try:\n",
" item = next(self._mygen)\n",
" except:\n",
" item = None\n",
" self.staged = item\n",
" \n",
" def __next__(self):\n",
" item = self.staged\n",
" self._get_next()\n",
" return item\n",
" \n",
" def append(self, thing):\n",
" if self.staged is None:\n",
" self.staged = thing\n",
" else:\n",
" self._mygen = itertools.chain(self._mygen, [thing])\n",
" \n",
" def __iter__(self):\n",
" yield next(self)\n",
"\n",
" def empty(self):\n",
" return self.staged is None\n",
" \n",
" dms = grow_iter_grow(dms)\n",
" \n",
" while not dms.empty():\n",
" a = next(dms)\n",
" b = next(dms)\n",
" \n",
" if b is None:\n",
" final = a.get() \n",
" else:\n",
" a = a.get()\n",
" b = b.get()\n",
" dms.append(bv.apply_async(_reduce, [a, b]))\n",
" \n",
" return(final)\n",
" \n",
"print(str(block_beta_diversity(\"unweighted_unifrac\", data, ids, tree=tree, otu_ids=otu_ids, k=4, \n",
" map_f=map_f, reduce_f=reduce_f)))\n"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"parallelinating\n",
"6x6 distance matrix\n",
"IDs:\n",
"'A', 'B', 'C', 'D', 'E', 'F'\n",
"Data:\n",
"[[ 0. 0.16666667 0.20689655 0.27586207 0.13793103 0.36666667]\n",
" [ 0.16666667 0. 0.23333333 0.40625 0.16666667 0.21428571]\n",
" [ 0.20689655 0.23333333 0. 0.22222222 0.07407407 0.32142857]\n",
" [ 0.27586207 0.40625 0.22222222 0. 0.27586207 0.5 ]\n",
" [ 0.13793103 0.16666667 0.07407407 0.27586207 0. 0.36666667]\n",
" [ 0.36666667 0.21428571 0.32142857 0.5 0.36666667 0. ]]\n"
]
}
],
"source": [
"import multiprocessing as mp\n",
"\n",
"# parallel execution using multiprocessing\n",
"def map_f(func, kw_gen):\n",
" print(\"parallelinating\")\n",
" pool = mp.Pool(processes=2)\n",
" \n",
" return (pool.apply_async(func, kwds=kwargs) for kwargs in kw_gen)\n",
"\n",
"# note, still doing the reduction serially\n",
"def reduce_f(dms):\n",
" from skbio.diversity._block import _reduce\n",
" return _reduce((dm.get() for dm in dms))\n",
"\n",
"print(str(block_beta_diversity(\"unweighted_unifrac\", data, ids, tree=tree, otu_ids=otu_ids, k=4, \n",
" map_f=map_f, reduce_f=reduce_f)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment