Skip to content

Instantly share code, notes, and snippets.

@ryan-williams
Last active March 30, 2020 04:55
Show Gist options
  • Save ryan-williams/0aeb2111cb8b7fd588784e2a7c35433e to your computer and use it in GitHub Desktop.
Save ryan-williams/0aeb2111cb8b7fd588784e2a7c35433e to your computer and use it in GitHub Desktop.
A couple example notebooks associated with https://github.com/dask/dask/pull/6038 (`to_sql` support for Dask DataFrames)
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# `dask.dataframe.to_sql` example\n",
"See below for examples of writing a Dask DataFrame to a SQL table:\n",
"- [Setup](#Setup)\n",
"- [Writing blocks in serial](#serial-to-sql)\n",
"- [Writing blocks in parallel](#parallel)\n",
" - theoretically faster / more scalable\n",
" - **BUT: [rows out of order!](parallel-db-table)**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use local Dask, ensure `psycopg2` is installed:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from sys import path, executable as python\n",
"path = ['.'] + path \n",
"!{python} -m pip install -q psycopg2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n",
"Make a simple 10-row DataFrame in Pandas and Dask (with 5 partitions in the latter):"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from dask import delayed\n",
"from dask.dataframe import DataFrame as DDF, from_pandas\n",
"from pandas import DataFrame as DF\n",
"\n",
"df = DF([ {'i':i, 's':str(i)*2 } for i in range(10) ])\n",
"ddf = from_pandas(df, npartitions=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Declare DB configs, and helper for verifying DB-table contents:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pandas import read_sql_table\n",
"table = 'test'\n",
"db = 'sqlite:///test_db'\n",
"def read(index_col='index'): \n",
" df = read_sql_table(table, db, index_col=index_col)\n",
" print(f'{len(df)} rows, {len(df.index.unique())} distinct')\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check out the Dask DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div><strong>Dask DataFrame Structure:</strong></div>\n",
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>i</th>\n",
" <th>s</th>\n",
" </tr>\n",
" <tr>\n",
" <th>npartitions=5</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>int64</td>\n",
" <td>object</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<div>Dask Name: from_pandas, 5 tasks</div>"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" i s\n",
"npartitions=5 \n",
"0 int64 object\n",
"2 ... ...\n",
"... ... ...\n",
"8 ... ...\n",
"9 ... ...\n",
"Dask Name: from_pandas, 5 tasks"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"5 partitions of 2 rows each:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0 2\n",
"1 2\n",
"2 2\n",
"3 2\n",
"4 2\n",
"dtype: int64"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.map_partitions(len).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Empty `_meta`:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>i</th>\n",
" <th>s</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
"Empty DataFrame\n",
"Columns: [i, s]\n",
"Index: []"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf._meta"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The 5 partitions don't depend on one another:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "iVBORw0KGgoAAAANSUhEUgAAAV4AAAAsCAYAAAAtttBkAAAAAXNSR0IArs4c6QAAAERlWElmTU0AKgAAAAgAAYdpAAQAAAABAAAAGgAAAAAAA6ABAAMAAAABAAEAAKACAAQAAAABAAABXqADAAQAAAABAAAALAAAAACjtA/2AAAGP0lEQVR4Ae3dSSxdXxwH8J/6KxEiZiLKAlE2NoYIEiIxJYaFBRISFTWtJBYS8f6GqDRdiA0R3RBNY9ESCSsxJDbmWLSpjaYSEkGLKo2h9bv+Tw15p/F377lv+J7k5Q3nuuecz7v53XsG99n9ukiEBAEIQAAC0gQeSSsJBUEAAhCAgCKAwIsDAQIQgIBkgX9ul2cwGG5/ZBXvm5qaVGkHfMSM8IGPWECcayvHj/2/F8lIwcGJH8nJyWRnZ2c1j+bmZjo/P1faZWzr/3mGj1gNPvARC4hzben4uXPF29jYSNZ41uHAq0aCj1gRPvARC4hzbeX4wRiv+DhALgQgAAHVBRB4VSfFDiEAAQiIBRB4xT7IhQAEIKC6AAKv6qTYIQQgAAGxAAKv2Ae5EIAABFQXQOBVnRQ7hAAEICAWQOAV+yAXAhCAgOoCCLyqk2KHEIAABMQCCLxiH+RCAAIQUF0AgVd1UuwQAhCAgFhAt8C7u7tLuBWw+Ms5PDwUb2DDuVtbWzh+BN//ly9f6OTkRLAFsn7+/Elfv37VBUJ64F1dXaWEhATiG9ckJSXR8PCwLg0350JXVlYoJyeH6urqzLmautRtdnaW4uLiyNfXl0JCQuj169e61MNcC/348SNFR0dTZGQk+fn5UX9/v7lWVfd6lZeXU319vS71kBp4z87OKC8vjzIyMqijo4NevXpFRUVFxGdnpEsBPgMvLy8rD/QIbh4VfKOjmpoaKikpoZmZGSWw1NbW0tHR0c0Nbfhdd3e3Emy3t7eVC5yWlhYb1jDd9JGREerr6zO9gcY5UgPvp0+f6MOHD1RQUKA0KzY2lry9vWl8fFzjZlrO7t3d3am4uJgCAwMtp9KSajo/P08VFRVUWVlJ8fHx1NDQQPv7+zQ9PS2pBuZdzPHxMVVXV1N4eDg5OjpSSkoK+fj4mHeldagdD3N2dXVRbm6uDqVfFik18C4tLSmlBgQEXDXY39+fuPuIBIG/CcTExFBpaenVZk+fPiUHBwfiz5GInJycKCwsTKE4ODhQrnzb29tBc0uAe0ltbW3KsXMrS9pb6YHXxcVFORsbW+jl5YXAa8TA870EpqamKDU1lbiXgPRHYGhoiEJDQ2lhYYHevn37JwOv6N27dxQUFERRUVG6akgNvDyLyFco1xOfpfnXLpAgcB8BHtc1zhPc5+9sYdvMzEyamJig/Px8pUu9vr5uC83+axt53JuHGHiISu8kNfDyEMO3b99uLHP5/v27MialNwTKtyyBqqoq4l8riIiIsKyKS6jt48ePiYdhBgYGyNnZGSuH/jN/+fIlcbzheYJnz54pPe3JyUnlNU/8y0x3fvpHy8J5mQvP1G9ubtKTJ0+UotbW1igxMVHLYrFvKxPo7OxUuoq8QobTxsaGssLh0SOp1xFmr8q9S77YCQ4ONvu6yqggL2O1t7e/Kop72zwJyROQsnvdUgMvz7LyFQqfiXn9HC8j4+VTz58/v8LAi0sBnqGWfRa2BHte9/3mzRvland0dJT29vZocHCQ3r9/bwnV17SOp6enikN6ejq5urrS4uKiMuGWlpamabmWsvPs7GzihzF9/vyZ3Nzc6MWLF8aPpD1LDbx8VuGB/8LCQuJxJw66PT095OHhIa3B5l4QB1w+EHgtL/cMent7qayszNyrLaV+vBSRj50fP35QVlbWVZmtra1Xr235Bf+nIw/B8DHEFzk8kT02NnZnXsWWja63nXtIevWSpAZebjTPts7NzdHOzg55enped8DrCwHu/hh/5hogNwW4t4R/o75pcv0dX73xsAv3Ani1EJJYgHveeiXdBsUQdPX6ylGuNQvwuC6Crvl/w7oFXvOnQQ0hAAEIaCOAwKuNK/YKAQhAwKQAAq9JGmRAAAIQ0EYAgVcbV+wVAhCAgEkBBF6TNMiAAAQgoI0AAq82rtgrBCAAAZMCCLwmaZABAQhAQBsBBF5tXLFXCEAAAiYFEHhN0iADAhCAgDYCCLzauGKvEIAABEwK3LlXA//6r7UlbhPfu1WNBB+xInzgIxYQ59rK8WN3cX/cX9cp+AYt1pgMBoMqzYKPmBE+8BELiHNt5fi5E3jFLMiFAAQgAIGHCmCM96GC+HsIQAAC9xRA4L0nGDaHAAQg8FCB3/cO9A6i5OnVAAAAAElFTkSuQmCC\n",
"text/plain": [
"<IPython.core.display.Image object>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write to a SQL DB <a id=\"serial-to-sql\"></a>\n",
"Prepare to write to the DB:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"res = ddf.to_sql(table, db, if_exists='replace', compute=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task Graph <a id=\"serial-graph\"></a>\n",
"By default, each partition is written in serial:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<IPython.core.display.Image object>"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`None`s are returned for each block:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Make meta (0)\n"
]
},
{
"data": {
"text/plain": [
"[None, None, None, None, None, None]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Verify the DB contents <a id=\"serial-db\"></a>"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"10 rows, 10 distinct\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>i</th>\n",
" <th>s</th>\n",
" </tr>\n",
" <tr>\n",
" <th>index</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>11</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2</td>\n",
" <td>22</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>3</td>\n",
" <td>33</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>4</td>\n",
" <td>44</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>5</td>\n",
" <td>55</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>6</td>\n",
" <td>66</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>7</td>\n",
" <td>77</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>8</td>\n",
" <td>88</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>9</td>\n",
" <td>99</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" i s\n",
"index \n",
"0 0 00\n",
"1 1 11\n",
"2 2 22\n",
"3 3 33\n",
"4 4 44\n",
"5 5 55\n",
"6 6 66\n",
"7 7 77\n",
"8 8 88\n",
"9 9 99"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"read()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Writing blocks concurrently <a id=\"parallel\"></a>\n",
"Passing `parallel=True` causes blocks to be written in parallel:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"res = ddf.to_sql('test', 'sqlite:///test_db', if_exists='replace', parallel=True, compute=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This can be faster / more scalable, but generally causes the rows to end up out of order!\n",
"\n",
"Depending on your use-case (and use of indexes at write- and read-time), that may not matter, but it's important to be aware of."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task Graph\n",
"\"Meta\" is written first, then all blocks are written concurrently (I'm not sure why they're all in the one `combine_list` node as opposed to the graph fanning out there):"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<IPython.core.display.Image object>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compute"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Make meta (0)\n"
]
},
{
"data": {
"text/plain": [
"[None, [None, None, None, None, None]]"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Check the DB table <a id=\"parallel-db-table\"></a>\n",
"Note that the rows are out of order!"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"10 rows, 10 distinct\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>i</th>\n",
" <th>s</th>\n",
" </tr>\n",
" <tr>\n",
" <th>index</th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>6</td>\n",
" <td>66</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>7</td>\n",
" <td>77</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2</td>\n",
" <td>22</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>3</td>\n",
" <td>33</td>\n",
" </tr>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>11</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>4</td>\n",
" <td>44</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>5</td>\n",
" <td>55</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>8</td>\n",
" <td>88</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>9</td>\n",
" <td>99</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" i s\n",
"index \n",
"6 6 66\n",
"7 7 77\n",
"2 2 22\n",
"3 3 33\n",
"0 0 00\n",
"1 1 11\n",
"4 4 44\n",
"5 5 55\n",
"8 8 88\n",
"9 9 99"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"read()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dask-3.8.1",
"language": "python",
"name": "dask-3.8.1"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.1"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment