Skip to content

Instantly share code, notes, and snippets.

@lucahammer
Created February 26, 2020 14:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucahammer/3041a518b2cbde7135ef46e21fcdc02e to your computer and use it in GitHub Desktop.
Save lucahammer/3041a518b2cbde7135ef46e21fcdc02e to your computer and use it in GitHub Desktop.
Performance Test. Ryzen R9 3900X Windows 10
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Python Performance Tests\n",
"A small collection of operations that are typical for my daily work with real data to compare different setups."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Created `%t` as an alias for `%timeit`.\n",
"Created `%%t` as an alias for `%%timeit`.\n"
]
}
],
"source": [
"%alias_magic t timeit\n",
"\n",
"import pandas as pd\n",
"#import dask.dataframe as dd\n",
"\n",
"df = pd.read_json('test-tweets.jsonl', lines=True)\n",
"#dask_df = dd.from_pandas(df, npartitions=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's start with loading data. About half a million Tweet objects as json lines."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1min 11s ± 226 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t df = pd.read_json('test-tweets.jsonl', lines=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For some multi-threaded processing I want the dataframe as a dask dataframe as well."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"34 s ± 50.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df = dd.from_pandas(df, npartitions=5)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def get_full_text(row):\n",
" if row['truncated']:\n",
" return(row['extended_tweet']['full_text'])\n",
" return (row['text'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Twitter hides the full text of Tweets with more than 140 characters in a sub-field. I want one column that has always the complete text."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"25.2 s ± 64.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t df.apply(get_full_text, axis=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Can this be done faster with Dask? "
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"26.2 s ± 49.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Maybe with processes instead of threads?"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"50.5 s ± 110 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute(scheduler='processes')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or by computing it partition wise?"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"26.2 s ± 43.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.map_partitions(lambda ldf: ldf.apply(get_full_text, axis=1), meta=('string')).compute()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"50.6 s ± 88.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.map_partitions(lambda ldf: ldf.apply(get_full_text, axis=1), meta=('string')).compute(scheduler='processes')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That didn't work. It's the first time I tried Dask as I hoped that it would make better use of the high core count. I will have to find a better approach.\n",
"\n",
"But grouping is faster with Dask. For example to show which apps were used and for how many Tweets."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"902 ms ± 5.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t len(df.groupby('source').count())"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"799 ms ± 4.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t len(dask_df.groupby('source').count().compute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, I want to store some data…"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"C:\\Users\\lucah\\Anaconda3\\envs\\Performance Test\\lib\\site-packages\\pyarrow\\feather.py:83: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version\n",
" if isinstance(df, _pandas_api.pd.SparseDataFrame):\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"167 ms ± 659 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%t df[['created_at', 'id', 'text']].to_feather('perf_test.feather')"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"C:\\Users\\lucah\\Anaconda3\\envs\\Performance Test\\lib\\site-packages\\pyarrow\\pandas_compat.py:383: FutureWarning: RangeIndex._start is deprecated and will be removed in a future version. Use RangeIndex.start instead\n",
" 'start': level._start,\n",
"C:\\Users\\lucah\\Anaconda3\\envs\\Performance Test\\lib\\site-packages\\pyarrow\\pandas_compat.py:384: FutureWarning: RangeIndex._stop is deprecated and will be removed in a future version. Use RangeIndex.stop instead\n",
" 'stop': level._stop,\n",
"C:\\Users\\lucah\\Anaconda3\\envs\\Performance Test\\lib\\site-packages\\pyarrow\\pandas_compat.py:385: FutureWarning: RangeIndex._step is deprecated and will be removed in a future version. Use RangeIndex.step instead\n",
" 'step': level._step\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"315 ms ± 741 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t df[['created_at', 'id', 'text']].to_parquet('perf_test.parquet')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"…and read it again."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"52.8 ms ± 266 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%t pd.read_feather('perf_test.feather')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Save the IDs to share them with other researchers."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.32 ms ± 13.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%t df.iloc[0:100]['id'].to_csv('perf_test.csv', index=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dasking around"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>tcp://127.0.0.1:53440</li>\n",
" <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>12</li>\n",
" <li><b>Cores: </b>24</li>\n",
" <li><b>Memory: </b>68.65 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: 'tcp://127.0.0.1:53440' processes=12 threads=24, memory=68.65 GB>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"from dask.distributed import Client, LocalCluster\n",
"cluster = LocalCluster(n_workers=12, threads_per_worker=2)\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"dask_df = dd.from_pandas(df, npartitions=24)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"43.3 s ± 191 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"6min 22s ± 1.61 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute(scheduler='processes')"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"dask_df = dd.from_pandas(df, npartitions=6)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"52.3 s ± 3.29 s per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"dask_df = dd.from_pandas(df, npartitions=6)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n",
"distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting\n",
"distributed.nanny - WARNING - Restarting worker\n"
]
}
],
"source": [
"%t dask_df.apply(get_full_text, axis=1, meta=('string')).compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment