Skip to content

Instantly share code, notes, and snippets.

@RMDK
Created June 28, 2014 02:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RMDK/b5342eea9ff20b79c5c7 to your computer and use it in GitHub Desktop.
Save RMDK/b5342eea9ff20b79c5c7 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"worksheets": [
{
"cells": [
{
"metadata": {},
"cell_type": "heading",
"source": "Parallel Processing Example",
"level": 1
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Email me: <a href=\"mailto:email.ryan.kelly@gmail.com?Subject=Hey\" target=\"_top\">email.ryan.kelly@gmail.com</a>"
},
{
"metadata": {},
"cell_type": "code",
"input": "import numpy.random as npr\nimport numpy as np\nimport pandas as pd\nfrom pandas import DataFrame, date_range\n\nfrom datetime import datetime, timedelta",
"prompt_number": 6,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "np.random.seed(111)\n\n# Function to generate test data\ndef CreateDataSet(Number=1):\n \n Output = []\n \n for i in range(Number):\n \n # Create a date range with hour frequency\n date = date_range(start='10/1/2012', end='10/31/2012', freq='H')\n \n # Create long lat data\n laty = npr.normal(4815862, 5000,size=len(date))\n longx = npr.normal(687993, 5000,size=len(date))\n \n # status of interest\n status = [0,1]\n \n # Make a random list of statuses\n random_status = [status[npr.randint(low=0,high=len(status))] for i in range(len(date))]\n \n # user pool\n user = ['sally','derik','james','bob','ryan','chris']\n \n # Make a random list of users \n random_user = [user[npr.randint(low=0,high=len(user))] for i in range(len(date))]\n \n Output.extend(zip(random_user, random_status, date, longx, laty))\n \n return pd.DataFrame(Output, columns = ['user', 'status', 'date', 'long', 'lat'])",
"prompt_number": 7,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "data = CreateDataSet(20)\ndata.head()",
"prompt_number": 8,
"outputs": [
{
"output_type": "pyout",
"html": "<div style=\"max-height:1000px;max-width:1500px;overflow:auto;\">\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>user</th>\n <th>status</th>\n <th>date</th>\n <th>long</th>\n <th>lat</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td> ryan</td>\n <td> 0</td>\n <td>2012-10-01 00:00:00</td>\n <td> 692823.716714</td>\n <td> 4810192.808328</td>\n </tr>\n <tr>\n <th>1</th>\n <td> ryan</td>\n <td> 1</td>\n <td>2012-10-01 01:00:00</td>\n <td> 679549.965772</td>\n <td> 4817783.595967</td>\n </tr>\n <tr>\n <th>2</th>\n <td> bob</td>\n <td> 0</td>\n <td>2012-10-01 02:00:00</td>\n <td> 686339.324152</td>\n <td> 4823344.768882</td>\n </tr>\n <tr>\n <th>3</th>\n <td> ryan</td>\n <td> 0</td>\n <td>2012-10-01 03:00:00</td>\n <td> 677609.798732</td>\n <td> 4814085.088514</td>\n </tr>\n <tr>\n <th>4</th>\n <td> sally</td>\n <td> 1</td>\n <td>2012-10-01 04:00:00</td>\n <td> 689556.379975</td>\n <td> 4811924.332295</td>\n </tr>\n </tbody>\n</table>\n<p>5 rows × 5 columns</p>\n</div>",
"metadata": {},
"prompt_number": 8,
"text": " user status date long lat\n0 ryan 0 2012-10-01 00:00:00 692823.716714 4810192.808328\n1 ryan 1 2012-10-01 01:00:00 679549.965772 4817783.595967\n2 bob 0 2012-10-01 02:00:00 686339.324152 4823344.768882\n3 ryan 0 2012-10-01 03:00:00 677609.798732 4814085.088514\n4 sally 1 2012-10-01 04:00:00 689556.379975 4811924.332295\n\n[5 rows x 5 columns]"
}
],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "#some time deltas\nbefore = timedelta(hours = 8)\nafter = timedelta(minutes = 1)",
"prompt_number": 167,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "from IPython.parallel import Client\ncli = Client()\ncli.ids\n\ncli = Client()\n#cli[:].apply_sync(get_pid)\ndview=cli[:]\n\nwith dview.sync_imports():\n import numpy as np\n import os\n from datetime import timedelta\n import pandas as pd",
"prompt_number": 168,
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "importing numpy on engine(s)\nimporting os on engine(s)\nimporting timedelta from datetime on engine(s)\nimporting pandas on engine(s)\n"
}
],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "@dview.parallel(block=True)\ndef work(df):\n before = timedelta(hours = 8)\n after = timedelta(minutes = 1)\n output = []\n #loop through data index's\n for i in range(0, len(df)):\n l = []\n #first we will filter out the data by date to have a smaller list to compute distances for\n \n #create a mask to query all dates between range for date i\n date_mask = (df['date'] >= df['date'].iloc[i]-before) & (df['date'] <= df['date'].iloc[i]+after)\n #create a mask to query all users who are not user i (themselves)\n user_mask = df['user']!=df['user'].iloc[i]\n #apply masks\n dists_to_check = df[date_mask & user_mask]\n \n #for point i, create coordinate to calculate distances from\n a = numpy.array((df['long'].iloc[i], df['lat'].iloc[i]))\n #create array of distances to check on the masked data\n b = numpy.array((dists_to_check['long'].values, dists_to_check['lat'].values))\n \n #for j in the date queried data\n for j in range(1, len(dists_to_check)):\n #compute the ueclidean distance between point a and each point of b (the date masked data)\n x = numpy.linalg.norm(a-numpy.array((b[0][j], b[1][j])))\n \n #if the distance is within our range of interest append the index to a list\n if x <=1000:\n l.append(j)\n else:\n pass\n try:\n #use the list of desired index's 'l' to query a final subset of the data\n data = dists_to_check.iloc[l]\n #summarize the column of interest then append to output list\n output.append(data['status'].sum())\n except IndexError, e:\n pass\n output.append(0)\n #print \"There were no data to add\"\n \n return pandas.DataFrame(output)",
"prompt_number": 194,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "start = datetime.now()\nout = work(data)\nprint datetime.now() - start",
"prompt_number": 195,
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "in sync results <function __call__ at 0x108cccaa0>\n0:00:08.475609"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n"
}
],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "",
"prompt_number": 0,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "c = np.array((data['long'].values, data['lat'].values))\na = np.array((data['long'].iloc[0], data['lat'].iloc[0]))\n\ndef t():\n l = []\n for j in range(1, 2000):\n x = np.linalg.norm(a-np.array((b[0][j], b[1][j])))\n \n if x <=1000:\n l.append(j)\n else:\n pass",
"prompt_number": 124,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "code",
"input": "%timeit t",
"prompt_number": 125,
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "10000000 loops, best of 3: 30.4 ns per loop\n"
}
],
"language": "python",
"trusted": true,
"collapsed": false
}
],
"metadata": {}
}
],
"metadata": {
"name": "",
"signature": "sha256:ab0e8723edd987414ef96ed0c0392a5945948e9e9233dd8d1afe7ba755fe8e15"
},
"nbformat": 3
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment