Skip to content

Instantly share code, notes, and snippets.

@juliantaylor
Last active August 29, 2015 13:59
Show Gist options
  • Save juliantaylor/10942132 to your computer and use it in GitHub Desktop.
Save juliantaylor/10942132 to your computer and use it in GitHub Desktop.
blocked threaded numpy
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"import numpy as np\n",
"import numexpr\n",
"import sys\n",
"import ast"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"class GetVars(ast.NodeTransformer):\n",
" def __init__(self):\n",
" self.vars = set()\n",
" def visit_Name(self, node):\n",
" self.vars.add(node.id)\n",
" return node"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def blocked(op, local_dict=None):\n",
" call_frame = sys._getframe(1)\n",
" if local_dict is None:\n",
" local_dict = call_frame.f_locals\n",
" global_dict = call_frame.f_globals\n",
" a = ast.parse(op, mode='eval')\n",
" parser = GetVars()\n",
" parser.visit(a)\n",
" rargs = [local_dict[id] for id in parser.vars]\n",
" r = np.empty_like(rargs[0])\n",
" s = 2 * (64 * 1024) / r.itemsize\n",
" c = compile(a, '<string>', 'eval')\n",
" full = dict((id, local_dict[id]) for id in parser.vars)\n",
" for i in range(0, r.size, s): \n",
" u = min(r.size, i + s)\n",
" loc = {id : v[i:u] for id, v in full.items()}\n",
" r[i:u] = eval(c, global_dict, loc)\n",
" return r.reshape(rargs[0].shape)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 22
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def small(operation, r, full):\n",
" s = 2 * (64 * 1024) / r.itemsize\n",
" for i in range(0, r.size, s): \n",
" u = min(r.size, i + s)\n",
" loc = {id : v[i:u] for id, v in full.items()}\n",
" r[i:u] = eval(operation, loc)\n",
"\n",
"def blocked_thread(op, local_dict=None, pool=None):\n",
" call_frame = sys._getframe(1)\n",
" if local_dict is None:\n",
" local_dict = call_frame.f_locals\n",
" global_dict = call_frame.f_globals\n",
" a = ast.parse(op, mode='eval')\n",
" parser = GetVars()\n",
" parser.visit(a)\n",
" rargs = [local_dict[id] for id in parser.vars]\n",
" r = np.empty_like(rargs[0])\n",
"\n",
" c = compile(a, '<string>', 'eval')\n",
" full = dict((id, local_dict[id]) for id in parser.vars)\n",
" \n",
" s = r.size // pool._processes\n",
" a = []\n",
" for i in range(0, r.size, s): \n",
" u = min(r.size, i + s)\n",
" loc = {id : v[i:u] for id, v in full.items()}\n",
" a.append(pool.apply_async(small, (c, r[i:u], loc)))\n",
" [x.get() for x in a]\n",
" return r.reshape(rargs[0].shape)\n"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 23
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"a = np.arange(1e7)\n",
"b = np.arange(1e7)\n",
"def test(a, b):\n",
" return blocked(\"a**2 + b**2 + a*b * 2\")\n",
"\n",
"print test(a, b)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[ 0.00000000e+00 4.00000000e+00 1.60000000e+01 ..., 3.99999760e+14\n",
" 3.99999840e+14 3.99999920e+14]\n"
]
}
],
"prompt_number": 24
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from multiprocessing.pool import ThreadPool\n",
"t = ThreadPool()\n",
"s = \"a**2 + b**2 + a*b * 2\""
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 25
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit\n",
"a**2 + b**2 + a*b * 2"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 3: 420 ms per loop\n"
]
}
],
"prompt_number": 29
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit\n",
"numexpr.set_num_threads(1)\n",
"numexpr.evaluate(s, local_dict={'a' : a, 'b' : b})"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"10 loops, best of 3: 114 ms per loop\n"
]
}
],
"prompt_number": 30
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit\n",
"blocked(s, local_dict={'a' : a, 'b' : b})"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"10 loops, best of 3: 193 ms per loop\n"
]
}
],
"prompt_number": 31
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit\n",
"blocked_thread(s, local_dict={'a' : a, 'b' : b}, pool=t)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"10 loops, best of 3: 95.3 ms per loop\n"
]
}
],
"prompt_number": 32
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit\n",
"numexpr.set_num_threads(4)\n",
"numexpr.evaluate(s, local_dict={'a' : a, 'b' : b})"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"10 loops, best of 3: 45.2 ms per loop\n"
]
}
],
"prompt_number": 33
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 3: 339 ms per loop\n"
]
}
],
"prompt_number": 21
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment