Skip to content

Instantly share code, notes, and snippets.

@andersy005
Forked from cicdw/prefect_coiled_demo.ipynb
Created July 10, 2020 00:39
Show Gist options
  • Save andersy005/553f55240ada297f166f50b0a025dced to your computer and use it in GitHub Desktop.
Save andersy005/553f55240ada297f166f50b0a025dced to your computer and use it in GitHub Desktop.
Outline of Prefect + Coiled demo
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Objective and Outline\n",
"\n",
"Main goal is to have some fun with the PPP dataset [located here](https://home.treasury.gov/policy-issues/cares-act/assistance-for-small-businesses/sba-paycheck-protection-program-loan-level-data). Inspired by [this tweet](https://twitter.com/dataeditor/status/1280278987797942272) we'll look at city misspellings in the PPP data, starting with Philadelphia. Specifically we'll look at calculating a histogram of the [Levenshtein distance](https://en.wikipedia.org/wiki/Levenshtein_distance) across all misspellings in the dataset.\n",
"\n",
"Some features we hope to highlight:\n",
"- cloud-based file caching for efficient re-use of task results\n",
"- task retries\n",
"- Prefect's dynamic mapping feature, and how this places nicely with Dask\n",
"- running Prefect Flows on different dask cluster configs, beginning with a `LocalCluster` and then to a `CoiledCluster`; we'll also highlight configuration options for a `CoiledCluster`\n",
"\n",
"After iterative local development, and time permitting, we also want to look at what a \"deployed\" Prefect Flow looks like in the Prefect UI; we'll compare / contrast Coiled's UI with Prefect's and kick off a few Flow runs against a Coiled cluster from Prefect's UI."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The Data\n",
"\n",
"- download the data\n",
"- look at it\n",
"- eyeball the Philadelphia mispellings"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>LoanRange</th>\n",
" <th>BusinessName</th>\n",
" <th>Address</th>\n",
" <th>City</th>\n",
" <th>State</th>\n",
" <th>Zip</th>\n",
" <th>NAICSCode</th>\n",
" <th>BusinessType</th>\n",
" <th>RaceEthnicity</th>\n",
" <th>Gender</th>\n",
" <th>Veteran</th>\n",
" <th>NonProfit</th>\n",
" <th>JobsRetained</th>\n",
" <th>DateApproved</th>\n",
" <th>Lender</th>\n",
" <th>CD</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>a $5-10 million</td>\n",
" <td>ARCTIC SLOPE NATIVE ASSOCIATION, LTD.</td>\n",
" <td>7000 Uula St</td>\n",
" <td>BARROW</td>\n",
" <td>AK</td>\n",
" <td>99723.0</td>\n",
" <td>813920.0</td>\n",
" <td>Non-Profit Organization</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Y</td>\n",
" <td>295.0</td>\n",
" <td>04/14/2020</td>\n",
" <td>National Cooperative Bank, National Association</td>\n",
" <td>AK - 00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>a $5-10 million</td>\n",
" <td>CRUZ CONSTRUCTION INC</td>\n",
" <td>7000 East Palmer Wasilla Hwy</td>\n",
" <td>PALMER</td>\n",
" <td>AK</td>\n",
" <td>99645.0</td>\n",
" <td>238190.0</td>\n",
" <td>Subchapter S Corporation</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>NaN</td>\n",
" <td>215.0</td>\n",
" <td>04/15/2020</td>\n",
" <td>First National Bank Alaska</td>\n",
" <td>AK - 00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>a $5-10 million</td>\n",
" <td>I. C. E. SERVICES, INC</td>\n",
" <td>2606 C Street</td>\n",
" <td>ANCHORAGE</td>\n",
" <td>AK</td>\n",
" <td>99503.0</td>\n",
" <td>722310.0</td>\n",
" <td>Corporation</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>NaN</td>\n",
" <td>367.0</td>\n",
" <td>04/11/2020</td>\n",
" <td>KeyBank National Association</td>\n",
" <td>AK - 00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>a $5-10 million</td>\n",
" <td>KATMAI HEALTH SERVICES LLC</td>\n",
" <td>11001 O'MALLEY CENTRE DRIVE, SUITE 204</td>\n",
" <td>ANCHORAGE</td>\n",
" <td>AK</td>\n",
" <td>99515.0</td>\n",
" <td>621111.0</td>\n",
" <td>Limited Liability Company(LLC)</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>NaN</td>\n",
" <td>0.0</td>\n",
" <td>04/29/2020</td>\n",
" <td>Truist Bank d/b/a Branch Banking &amp; Trust Co</td>\n",
" <td>AK - 00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>a $5-10 million</td>\n",
" <td>MATANUSKA TELEPHONE ASSOCIATION</td>\n",
" <td>1740 S. CHUGACH ST</td>\n",
" <td>PALMER</td>\n",
" <td>AK</td>\n",
" <td>99645.0</td>\n",
" <td>517311.0</td>\n",
" <td>Cooperative</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>Unanswered</td>\n",
" <td>NaN</td>\n",
" <td>267.0</td>\n",
" <td>06/10/2020</td>\n",
" <td>CoBank ACB</td>\n",
" <td>AK - 00</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" LoanRange BusinessName \\\n",
"0 a $5-10 million ARCTIC SLOPE NATIVE ASSOCIATION, LTD. \n",
"1 a $5-10 million CRUZ CONSTRUCTION INC \n",
"2 a $5-10 million I. C. E. SERVICES, INC \n",
"3 a $5-10 million KATMAI HEALTH SERVICES LLC \n",
"4 a $5-10 million MATANUSKA TELEPHONE ASSOCIATION \n",
"\n",
" Address City State Zip \\\n",
"0 7000 Uula St BARROW AK 99723.0 \n",
"1 7000 East Palmer Wasilla Hwy PALMER AK 99645.0 \n",
"2 2606 C Street ANCHORAGE AK 99503.0 \n",
"3 11001 O'MALLEY CENTRE DRIVE, SUITE 204 ANCHORAGE AK 99515.0 \n",
"4 1740 S. CHUGACH ST PALMER AK 99645.0 \n",
"\n",
" NAICSCode BusinessType RaceEthnicity Gender \\\n",
"0 813920.0 Non-Profit Organization Unanswered Unanswered \n",
"1 238190.0 Subchapter S Corporation Unanswered Unanswered \n",
"2 722310.0 Corporation Unanswered Unanswered \n",
"3 621111.0 Limited Liability Company(LLC) Unanswered Unanswered \n",
"4 517311.0 Cooperative Unanswered Unanswered \n",
"\n",
" Veteran NonProfit JobsRetained DateApproved \\\n",
"0 Unanswered Y 295.0 04/14/2020 \n",
"1 Unanswered NaN 215.0 04/15/2020 \n",
"2 Unanswered NaN 367.0 04/11/2020 \n",
"3 Unanswered NaN 0.0 04/29/2020 \n",
"4 Unanswered NaN 267.0 06/10/2020 \n",
"\n",
" Lender CD \n",
"0 National Cooperative Bank, National Association AK - 00 \n",
"1 First National Bank Alaska AK - 00 \n",
"2 KeyBank National Association AK - 00 \n",
"3 Truist Bank d/b/a Branch Banking & Trust Co AK - 00 \n",
"4 CoBank ACB AK - 00 "
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"\n",
"df = pd.read_csv(\"/Users/chris/Downloads/150k plus/PPP Data 150k plus.csv\")\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"57"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(df.State.value_counts())"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>LoanRange</th>\n",
" <th>BusinessName</th>\n",
" <th>Address</th>\n",
" <th>City</th>\n",
" <th>State</th>\n",
" <th>Zip</th>\n",
" <th>NAICSCode</th>\n",
" <th>BusinessType</th>\n",
" <th>RaceEthnicity</th>\n",
" <th>Gender</th>\n",
" <th>Veteran</th>\n",
" <th>NonProfit</th>\n",
" <th>JobsRetained</th>\n",
" <th>DateApproved</th>\n",
" <th>Lender</th>\n",
" <th>CD</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>29024</th>\n",
" <td>b $2-5 million</td>\n",
" <td>YEEZY LLC</td>\n",
" <td>6 CENTERPOINTE DR #700</td>\n",
" <td>LA PALMA</td>\n",
" <td>CA</td>\n",
" <td>90623.0</td>\n",
" <td>448110.0</td>\n",
" <td>Limited Liability Company(LLC)</td>\n",
" <td>Black or African American</td>\n",
" <td>Male Owned</td>\n",
" <td>Non-Veteran</td>\n",
" <td>NaN</td>\n",
" <td>106.0</td>\n",
" <td>04/13/2020</td>\n",
" <td>City National Bank</td>\n",
" <td>CA - 38</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" LoanRange BusinessName Address City State \\\n",
"29024 b $2-5 million YEEZY LLC 6 CENTERPOINTE DR #700 LA PALMA CA \n",
"\n",
" Zip NAICSCode BusinessType \\\n",
"29024 90623.0 448110.0 Limited Liability Company(LLC) \n",
"\n",
" RaceEthnicity Gender Veteran NonProfit \\\n",
"29024 Black or African American Male Owned Non-Veteran NaN \n",
"\n",
" JobsRetained DateApproved Lender CD \n",
"29024 106.0 04/13/2020 City National Bank CA - 38 "
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df[df['BusinessName'].str.lower().str.contains('yeez') == True]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PHILADELPHIA 2459\n",
"PHILA 23\n",
"PHILIPSBURG 16\n",
"PHILADELPHIA, 3\n",
"PHILADEPHIA 1\n",
"PHILADELPIA 1\n",
"PHILA. 1\n",
"PHILADELKPHIA 1\n",
"PHILDADELPHIA 1\n",
"PHILADELPHILA 1\n",
"PHILDELPHIA 1\n",
"Name: City, dtype: int64"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"philly_mask = (df['City'].str.lower().str.startswith('phil') == True) & (df['State'] == 'PA')\n",
"df[philly_mask].City.value_counts()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"df2 = pd.read_csv(\"/Users/chris/Downloads/Pennsylvania/PPP Data up to 150K - PA.csv\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PHILADELPHIA 13498\n",
"PHILA 263\n",
"PHILIPSBURG 85\n",
"PHILADEPHIA 19\n",
"PHILADLEPHIA 19\n",
"PHILDELPHIA 15\n",
"PHILADELPIA 8\n",
"PHILADELPHIA, 6\n",
"PHILADELHIA 4\n",
"PHILADELPHA 4\n",
"PHILADLPHIA 4\n",
"PHILA. 3\n",
"PHILLY 2\n",
"PHILDADELPHIA 2\n",
"PHILADEPHILA 2\n",
"PHILADELLPHIA 2\n",
"PHILADELPHIA, PA 2\n",
"PHILADEPLHIA 2\n",
"PHILADELPHIAP 2\n",
"PHILLIPSBURG 2\n",
"PHILAELPHIA 2\n",
"PHILADELPHIA PA 2\n",
"PHILADELPHI 2\n",
"PHILLADELPHIA 2\n",
"PHILADELPHAI 2\n",
"PHILADELPPHIA 1\n",
"PHILADELPH 1\n",
"PHILADALPHIA 1\n",
"PHILDADLPHIA 1\n",
"PHILADELOHIA 1\n",
"PHILADEPHA 1\n",
"PHILADELPHIOA 1\n",
"PHILADELAPHIA 1\n",
"PHILIDELPHIA 1\n",
"PHILAD 1\n",
"PHILLA 1\n",
"PHILADEDLPHIA 1\n",
"PHILIADELPHIA 1\n",
"PHILADLELPHIA 1\n",
"PHILADERLPHIA 1\n",
"PHILDAELPHIA 1\n",
"PHILADELPHILA 1\n",
"PHILOADELPHIA 1\n",
"PHILADELPHIAPHIA 1\n",
"PHILDEPPHIA 1\n",
"PHILADPHIA 1\n",
"PHILADRLPHIA 1\n",
"PHILADELPHIA` 1\n",
"PHILADELHPIA 1\n",
"PHILADELPOHIA 1\n",
"Name: City, dtype: int64"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"philly_mask2 = (df2['City'].str.lower().str.startswith('phil') == True) & (df2['State'] == 'PA')\n",
"df2[philly_mask2].City.value_counts()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build a Prefect Flow for reproducibility + efficiency\n",
"\n",
"Now we'll build a more programmatic set of tasks for downloading / processing our files. Some Prefect features this is intended to highlight:\n",
"- parametrization\n",
"- caching via S3-bucket targets\n",
"- retries\n",
"- Prefect \"mapping\"\n",
"\n",
"We should walk through / build this incrementally live but here's the final product:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"374pt\" height=\"496pt\"\n",
" viewBox=\"0.00 0.00 374.16 496.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 492)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-492 370.1625,-492 370.1625,4 -4,4\"/>\n",
"<!-- 140267427671184 -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>140267427671184</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"39.0813\" cy=\"-384\" rx=\"39.1627\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"39.0813\" y=\"-379.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">GetItem</text>\n",
"</g>\n",
"<!-- 140267427712464 -->\n",
"<g id=\"node2\" class=\"node\">\n",
"<title>140267427712464</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"113.0813\" cy=\"-276\" rx=\"106.7251\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"113.0813\" y=\"-271.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">download_and_extract_csv</text>\n",
"</g>\n",
"<!-- 140267427671184&#45;&gt;140267427712464 -->\n",
"<g id=\"edge5\" class=\"edge\">\n",
"<title>140267427671184&#45;&gt;140267427712464</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M40.4718,-365.9085C42.4459,-350.3504 47.2204,-327.9601 58.7639,-312 62.0789,-307.4166 66.1711,-303.2503 70.6103,-299.513\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"73.0215,-302.0775 78.8829,-293.2517 68.797,-296.496 73.0215,-302.0775\"/>\n",
"<text text-anchor=\"middle\" x=\"91.74\" y=\"-325.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">shared_link</text>\n",
"</g>\n",
"<!-- 140267427711696 -->\n",
"<g id=\"node4\" class=\"node\">\n",
"<title>140267427711696</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"211.0813\" cy=\"-190\" rx=\"87.9239\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"211.0813\" y=\"-185.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">extract_city_spellings</text>\n",
"</g>\n",
"<!-- 140267427712464&#45;&gt;140267427711696 -->\n",
"<g id=\"edge7\" class=\"edge\">\n",
"<title>140267427712464&#45;&gt;140267427711696</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M133.3869,-258.1807C147.7095,-245.6119 167.0823,-228.6113 182.8998,-214.7307\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"185.5939,-217.0231 190.8016,-207.7964 180.9768,-211.7617 185.5939,-217.0231\"/>\n",
"<text text-anchor=\"middle\" x=\"178.74\" y=\"-228.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">data</text>\n",
"</g>\n",
"<!-- 140267427669648 -->\n",
"<g id=\"node3\" class=\"node\">\n",
"<title>140267427669648</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"158.0813\" cy=\"-384\" rx=\"61.9082\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"158.0813\" y=\"-379.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">BOX_CREDS</text>\n",
"</g>\n",
"<!-- 140267427669648&#45;&gt;140267427712464 -->\n",
"<g id=\"edge9\" class=\"edge\">\n",
"<title>140267427669648&#45;&gt;140267427712464</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M150.4476,-365.6793C143.3258,-348.5868 132.6071,-322.8619 124.4735,-303.3413\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"127.6791,-301.9348 120.6022,-294.0502 121.2176,-304.6271 127.6791,-301.9348\"/>\n",
"<text text-anchor=\"middle\" x=\"170.8503\" y=\"-325.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">box_creds</text>\n",
"</g>\n",
"<!-- 140267404397200 -->\n",
"<g id=\"node7\" class=\"node\">\n",
"<title>140267404397200</title>\n",
"<polygon fill=\"none\" stroke=\"#000000\" points=\"336.4105,-122 201.7521,-122 201.7521,-86 336.4105,-86 336.4105,-122\"/>\n",
"<text text-anchor=\"middle\" x=\"269.0813\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">compute_dist &lt;map&gt;</text>\n",
"</g>\n",
"<!-- 140267427711696&#45;&gt;140267404397200 -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
"<title>140267427711696&#45;&gt;140267404397200</title>\n",
"<path fill=\"none\" stroke=\"#000000\" stroke-dasharray=\"5,2\" d=\"M223.3816,-171.7616C231.4557,-159.7896 242.1498,-143.9328 251.1629,-130.5685\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"254.109,-132.4598 256.7987,-122.2121 248.3055,-128.5458 254.109,-132.4598\"/>\n",
"<text text-anchor=\"middle\" x=\"259.0261\" y=\"-142.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">string</text>\n",
"</g>\n",
"<!-- 140268010669776 -->\n",
"<g id=\"node5\" class=\"node\">\n",
"<title>140268010669776</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"248.0813\" cy=\"-470\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"248.0813\" y=\"-465.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">config</text>\n",
"</g>\n",
"<!-- 140268010669776&#45;&gt;140267427671184 -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
"<title>140268010669776&#45;&gt;140267427671184</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M222.0338,-459.2819C185.7918,-444.3689 119.8815,-417.248 77.8519,-399.9535\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"79.0698,-396.6699 68.4903,-396.1013 76.4061,-403.1433 79.0698,-396.6699\"/>\n",
"<text text-anchor=\"middle\" x=\"186.4089\" y=\"-422.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">task_result</text>\n",
"</g>\n",
"<!-- 140268010669776&#45;&gt;140267427711696 -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
"<title>140268010669776&#45;&gt;140267427711696</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M247.4958,-451.9969C246.0761,-415.2675 241.5938,-329.1514 229.0813,-258 226.7183,-244.5629 222.9897,-229.9034 219.5374,-217.6475\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"222.8856,-216.6266 216.7385,-207.9974 216.1627,-218.5766 222.8856,-216.6266\"/>\n",
"<text text-anchor=\"middle\" x=\"257.5813\" y=\"-325.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">config</text>\n",
"</g>\n",
"<!-- 140267427712528 -->\n",
"<g id=\"node6\" class=\"node\">\n",
"<title>140267427712528</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"327.0813\" cy=\"-330\" rx=\"39.1627\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"327.0813\" y=\"-325.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">GetItem</text>\n",
"</g>\n",
"<!-- 140268010669776&#45;&gt;140267427712528 -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
"<title>140268010669776&#45;&gt;140267427712528</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M257.9269,-452.552C271.5279,-428.449 296.1831,-384.7562 312.0835,-356.5784\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"315.2452,-358.0971 317.1115,-347.6679 309.1488,-354.657 315.2452,-358.0971\"/>\n",
"<text text-anchor=\"middle\" x=\"305.4089\" y=\"-422.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">task_result</text>\n",
"</g>\n",
"<!-- 140267427712528&#45;&gt;140267404397200 -->\n",
"<g id=\"edge6\" class=\"edge\">\n",
"<title>140267427712528&#45;&gt;140267404397200</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M327.2245,-311.9611C326.9569,-282.0817 324.2563,-220.7764 308.0813,-172 303.3023,-157.5888 295.2819,-142.824 287.7529,-130.7506\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"290.523,-128.5889 282.1531,-122.0932 284.6454,-132.3907 290.523,-128.5889\"/>\n",
"<text text-anchor=\"middle\" x=\"335.302\" y=\"-228.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">truth</text>\n",
"</g>\n",
"<!-- 140267404431696 -->\n",
"<g id=\"node8\" class=\"node\">\n",
"<title>140267404431696</title>\n",
"<ellipse fill=\"none\" stroke=\"#000000\" cx=\"269.0813\" cy=\"-18\" rx=\"65.2304\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"269.0813\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">aggregate_dists</text>\n",
"</g>\n",
"<!-- 140267404397200&#45;&gt;140267404431696 -->\n",
"<g id=\"edge8\" class=\"edge\">\n",
"<title>140267404397200&#45;&gt;140267404431696</title>\n",
"<path fill=\"none\" stroke=\"#000000\" d=\"M269.0813,-85.7616C269.0813,-74.3597 269.0813,-59.4342 269.0813,-46.494\"/>\n",
"<polygon fill=\"#000000\" stroke=\"#000000\" points=\"272.5814,-46.2121 269.0813,-36.2121 265.5814,-46.2121 272.5814,-46.2121\"/>\n",
"<text text-anchor=\"middle\" x=\"294.74\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">distances</text>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<graphviz.dot.Digraph at 0x7f928e332650>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import collections\n",
"import datetime\n",
"import glob\n",
"import io\n",
"import os\n",
"import pandas as pd\n",
"import tempfile\n",
"import zipfile\n",
"\n",
"from boxsdk import OAuth2, Client\n",
"from Levenshtein import distance\n",
"\n",
"from prefect import task, Flow, Parameter, unmapped\n",
"from prefect.engine.results import S3Result\n",
"from prefect.tasks.secrets import PrefectSecret\n",
"\n",
"\n",
"# where we want to checkpoint our data\n",
"BUCKET_RESULT = S3Result(bucket='prefect-coiled-data')\n",
"config = Parameter(\"config\", default=dict(prefix=\"phil\", \n",
" city=\"Philadelphia\", \n",
" link=\"https://sba.app.box.com/s/l8myesmuylwckpkf99zerfbdvohxy9qx\"))\n",
"\n",
"@task(max_retries=2, retry_delay=datetime.timedelta(seconds=1), target=\"{shared_link}.bytes\")\n",
"def download_and_extract_csv(box_creds, shared_link):\n",
" \"\"\"\n",
" Given a shared Box link:\n",
" - downloads the contents\n",
" - unzips the contents\n",
" - extracts the CSV into a pandas DataFrame\n",
" \"\"\"\n",
" auth = OAuth2(\n",
" client_id=box_creds.get('client_id'),\n",
" client_secret=box_creds.get('client_secret'),\n",
" access_token=box_creds.get('access_token'),\n",
" )\n",
" client = Client(auth)\n",
" box_file = client.get_shared_item(shared_link)\n",
" \n",
" ## download and unzip\n",
" stream = io.BytesIO()\n",
" box_file.download_to(stream)\n",
" stream.seek(0)\n",
" zipped = zipfile.ZipFile(stream)\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" zipped.extractall(tmpdir)\n",
" csvs = glob.glob(f\"{tmpdir}/*/*.csv\")\n",
" assert len(csvs) == 1, \"More than one .csv file found\"\n",
" return pd.read_csv(csvs[0])\n",
" \n",
"\n",
"@task\n",
"def extract_city_spellings(config, data):\n",
" \"\"\"\n",
" Given config w/ prefix and dataset, returns the unique\n",
" set of cities beginning with that prefix.\n",
" \"\"\"\n",
" prefix = config['prefix'].lower()\n",
" mask = data['City'].str.lower().str.startswith(prefix) == True\n",
" return list(set(data[mask].City.unique()))\n",
"\n",
"\n",
"@task\n",
"def compute_dist(string, truth):\n",
" \"\"\"\n",
" Returns the Levenshtein distance between the two strings\n",
" \"\"\"\n",
" return distance(string, truth)\n",
"\n",
"\n",
"@task\n",
"def aggregate_dists(distances):\n",
" \"\"\"\n",
" Returns a 'histogram' of the counts\n",
" \"\"\"\n",
" return collections.Counter(distances)\n",
"\n",
"\n",
"# compile our tasks into a Flow object\n",
"with Flow(\"PPP Coiled Demo\") as flow:\n",
"\n",
" # basic parametrized data pull + process\n",
" box_creds = PrefectSecret(\"BOX_CREDS\")\n",
" data = download_and_extract_csv(box_creds, config['link'])\n",
" spellings = extract_city_spellings(config, data)\n",
" \n",
" # more interesting dynamic fan out + reduce step\n",
" distances = compute_dist.map(spellings, unmapped(config['city']))\n",
" results = aggregate_dists(distances)\n",
"\n",
" \n",
"flow.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Various ways of running the Flow"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# purely local sequential run\n",
"flow_state = flow.run()\n",
"flow_state.result[results].result # final tally"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# local dask run\n",
"from prefect.engine.executors import DaskExecutor\n",
"\n",
"flow_state = flow.run(executor=DaskExecutor())\n",
"flow_state.result[results].result # final tally"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# coiled dask run\n",
"from coiled import CoiledCluster\n",
"\n",
"coiled_executor = DaskExecutor(cluster_class=CoiledCluster, \n",
" cluster_kwargs=dict(name=\"prefect-demo\", configuration=\"cicdw/demo-cluster-config\"))\n",
"\n",
"flow_state = flow.run(executor=coiled_executor)\n",
"flow_state.result[results].result # final tally"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Addendum: Coiled Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from coiled import Cloud\n",
"\n",
"cloud = Cloud()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cloud.create_software_environment(name=\"prefect-demo\", container=\"prefecthq/demos:coiled\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cloud.create_cluster_configuration(\n",
" name=\"demo-cluster-config\",\n",
" worker_memory=\"8 GiB\",\n",
" worker_cpu=4,\n",
" scheduler_memory=\"4 GiB\",\n",
" scheduler_cpu=1,\n",
" software=\"cicdw/prefect-demo\",\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment