Skip to content

Instantly share code, notes, and snippets.

@ayushdg
Last active January 21, 2021 21:24
Show Gist options
  • Save ayushdg/0c3cc0f52f5a4c49caec9182ee7089f6 to your computer and use it in GitHub Desktop.
Save ayushdg/0c3cc0f52f5a4c49caec9182ee7089f6 to your computer and use it in GitHub Desktop.
Prefect Flow example with cuDF
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Orchestrating [Rapids](rapids.ai) workflows with [Prefect](prefect.io)\n",
"\n",
"\n",
"This snippet demonstrates a simple Prefect Flow using Rapids to download, read and perform a groupby operation on the [Nyc-Taxi dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).\n",
"For more information on setting up Prefect Flows, refer to the [Prefect docs](https://docs.prefect.io/core/getting_started/first-steps.html#thinking-prefectly) which have excellent [tutorials](https://docs.prefect.io/core/advanced_tutorials/) and [examples](https://github.com/PrefectHQ/prefect/tree/master/examples).\n",
"\n",
"\n",
"\n",
"### Table of Contents:\n",
"1. [Prefect Task & Flow Setup](#Setting-up-cuDF-tasks)\n",
"2. [Single GPU, cuDF tasks](#Single-GPU,-cuDF-tasks)\n",
"3. [Multi GPU, cuDF tasks](#Multi-GPU-cuDF)\n",
"4. [Multi GPU, dask-cuDF tasks](#Multi-GPU-dask-cuDF)\n",
"5. [Scheduling runs at intervals](#We-can-also-schedule-runs-at-intervals)\n",
"6. [Next Steps](#Next-Steps)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Imports\n",
"from prefect import task, Flow, Parameter\n",
"import prefect\n",
"import cudf\n",
"import dask_cudf\n",
"import gcsfs\n",
"import os\n",
"from glob import glob"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setting up cuDF tasks"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"@task()\n",
"def download_data(source_bucket, destination_path):\n",
" if not os.path.exists(destination_path) or len(os.listdir(destination_path)) == 0:\n",
" fs = gcsfs.GCSFileSystem()\n",
" fs.get(source_bucket, destination_path)\n",
" return glob(f\"{destination_path}/*\")\n",
"\n",
"\n",
"@task()\n",
"def read_data(file_name):\n",
" df = cudf.read_csv(file_name)\n",
" return df\n",
"\n",
"\n",
"@task()\n",
"def groupby_chunk(df):\n",
" grouped_df = df.groupby(\"Passenger_count\", sort=False).agg(\n",
" {\"Tip_amount\": [\"count\", \"sum\"]}\n",
" )\n",
" return grouped_df\n",
"\n",
"\n",
"@task()\n",
"def concat_frames(dfs):\n",
" return cudf.concat(dfs)\n",
"\n",
"\n",
"@task()\n",
"def groupby_aggregate(df):\n",
" df = df.reset_index()\n",
" df.columns = [\"Passenger_count\", \"Tip_count\", \"Tip_sum\"]\n",
" grouped_df = df.groupby(\"Passenger_count\", sort=False).agg(\n",
" {\"Tip_count\": \"sum\", \"Tip_sum\": \"sum\"}\n",
" )\n",
" grouped_df[\"Tip_mean\"] = grouped_df[\"Tip_sum\"] / grouped_df[\"Tip_count\"]\n",
" return grouped_df[\"Tip_mean\"].to_frame()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Creating a Prefect flow using their [functional API](https://docs.prefect.io/core/getting_started/first-steps.html#functional-api)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"with Flow(\"cuDF NYTaxi Prefect Flow\") as nytaxi_cudf_flow:\n",
" source_bucket = Parameter(\"source_bucket\")\n",
" destination_path = Parameter(\"destination_path\")\n",
"\n",
" file_list = download_data(source_bucket, destination_path)\n",
" dfs = read_data.map(file_list) # Maps each downloaded file to a read_data operation\n",
" grouped_dfs = groupby_chunk.map(\n",
" dfs\n",
" ) # Maps each dataframe read to a groupby_chunk operation\n",
" combined_df = concat_frames(grouped_dfs)\n",
" result_df = groupby_aggregate(combined_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Visualizing the flow"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.42.3 (20191010.1750)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"362pt\" height=\"479pt\"\n",
" viewBox=\"0.00 0.00 362.34 479.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 475)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-475 358.34,-475 358.34,4 -4,4\"/>\n",
"<!-- 140424197147088 -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>140424197147088</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-366\" rx=\"80.69\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-362.3\" font-family=\"Times,serif\" font-size=\"14.00\">download_data</text>\n",
"</g>\n",
"<!-- 140424197146768 -->\n",
"<g id=\"node5\" class=\"node\">\n",
"<title>140424197146768</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"245.79,-297 98.79,-297 98.79,-261 245.79,-261 245.79,-297\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-275.3\" font-family=\"Times,serif\" font-size=\"14.00\">read_data &lt;map&gt;</text>\n",
"</g>\n",
"<!-- 140424197147088&#45;&gt;140424197146768 -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
"<title>140424197147088&#45;&gt;140424197146768</title>\n",
"<path fill=\"none\" stroke=\"black\" stroke-dasharray=\"5,2\" d=\"M172.29,-347.8C172.29,-336.16 172.29,-320.55 172.29,-307.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-307.18 172.29,-297.18 168.79,-307.18 175.79,-307.18\"/>\n",
"<text text-anchor=\"middle\" x=\"207.29\" y=\"-318.8\" font-family=\"Times,serif\" font-size=\"14.00\">file_name</text>\n",
"</g>\n",
"<!-- 140424200281872 -->\n",
"<g id=\"node2\" class=\"node\">\n",
"<title>140424200281872</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"79.29\" cy=\"-453\" rx=\"79.09\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"79.29\" y=\"-449.3\" font-family=\"Times,serif\" font-size=\"14.00\">source_bucket</text>\n",
"</g>\n",
"<!-- 140424200281872&#45;&gt;140424197147088 -->\n",
"<g id=\"edge6\" class=\"edge\">\n",
"<title>140424200281872&#45;&gt;140424197147088</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M83.92,-435.02C87.5,-424.47 93.39,-411.2 102.29,-402 108.01,-396.1 114.93,-391.03 122.17,-386.73\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"124.32,-389.55 131.44,-381.7 120.98,-383.39 124.32,-389.55\"/>\n",
"<text text-anchor=\"middle\" x=\"155.29\" y=\"-405.8\" font-family=\"Times,serif\" font-size=\"14.00\">source_bucket</text>\n",
"</g>\n",
"<!-- 140424197144912 -->\n",
"<g id=\"node3\" class=\"node\">\n",
"<title>140424197144912</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"265.79,-210 78.79,-210 78.79,-174 265.79,-174 265.79,-210\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-188.3\" font-family=\"Times,serif\" font-size=\"14.00\">groupby_chunk &lt;map&gt;</text>\n",
"</g>\n",
"<!-- 140424197076176 -->\n",
"<g id=\"node6\" class=\"node\">\n",
"<title>140424197076176</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-105\" rx=\"79.09\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\">concat_frames</text>\n",
"</g>\n",
"<!-- 140424197144912&#45;&gt;140424197076176 -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
"<title>140424197144912&#45;&gt;140424197076176</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M172.29,-173.8C172.29,-162.16 172.29,-146.55 172.29,-133.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-133.18 172.29,-123.18 168.79,-133.18 175.79,-133.18\"/>\n",
"<text text-anchor=\"middle\" x=\"183.29\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">dfs</text>\n",
"</g>\n",
"<!-- 140427728619792 -->\n",
"<g id=\"node4\" class=\"node\">\n",
"<title>140427728619792</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"265.29\" cy=\"-453\" rx=\"89.08\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"265.29\" y=\"-449.3\" font-family=\"Times,serif\" font-size=\"14.00\">destination_path</text>\n",
"</g>\n",
"<!-- 140427728619792&#45;&gt;140424197147088 -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
"<title>140427728619792&#45;&gt;140424197147088</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M246.92,-435.21C233.04,-422.52 213.79,-404.93 198.32,-390.79\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"200.37,-387.92 190.63,-383.76 195.65,-393.09 200.37,-387.92\"/>\n",
"<text text-anchor=\"middle\" x=\"284.79\" y=\"-405.8\" font-family=\"Times,serif\" font-size=\"14.00\">destination_path</text>\n",
"</g>\n",
"<!-- 140424197146768&#45;&gt;140424197144912 -->\n",
"<g id=\"edge5\" class=\"edge\">\n",
"<title>140424197146768&#45;&gt;140424197144912</title>\n",
"<path fill=\"none\" stroke=\"black\" stroke-dasharray=\"5,2\" d=\"M172.29,-260.8C172.29,-249.16 172.29,-233.55 172.29,-220.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-220.18 172.29,-210.18 168.79,-220.18 175.79,-220.18\"/>\n",
"<text text-anchor=\"middle\" x=\"179.79\" y=\"-231.8\" font-family=\"Times,serif\" font-size=\"14.00\">df</text>\n",
"</g>\n",
"<!-- 140424197078352 -->\n",
"<g id=\"node7\" class=\"node\">\n",
"<title>140424197078352</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-18\" rx=\"101.28\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\">groupby_aggregate</text>\n",
"</g>\n",
"<!-- 140424197076176&#45;&gt;140424197078352 -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
"<title>140424197076176&#45;&gt;140424197078352</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M172.29,-86.8C172.29,-75.16 172.29,-59.55 172.29,-46.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-46.18 172.29,-36.18 168.79,-46.18 175.79,-46.18\"/>\n",
"<text text-anchor=\"middle\" x=\"179.79\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">df</text>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<graphviz.dot.Digraph at 0x7fb70e63b050>"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"nytaxi_cudf_flow.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since the downstream tasks depended on the outputs of the previous tasks, Prefect automatically handled the DAG creation which sequentially map one task to the other. `read_data` and `groupy_chunk` are mapped tasks where each element in the collection of file_names are mapped to these tasks that can run in parallel on a distributed executor such as a [DaskExecutor](https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Single GPU, cuDF tasks\n",
"#### Running the flow and viewing the result"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2020-10-30 19:30:07] INFO - prefect.FlowRunner | Beginning Flow run for 'cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'source_bucket': Starting task run...\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'source_bucket': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'destination_path': Starting task run...\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'destination_path': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'download_data': Starting task run...\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'download_data': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'read_data': Starting task run...\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'read_data': finished task run for task with final state: 'Mapped'\n",
"[2020-10-30 19:30:07] INFO - prefect.TaskRunner | Task 'read_data[0]': Starting task run...\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[0]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[1]': Starting task run...\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[1]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[2]': Starting task run...\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[2]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[3]': Starting task run...\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[3]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:09] INFO - prefect.TaskRunner | Task 'read_data[4]': Starting task run...\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[4]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[5]': Starting task run...\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[5]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[6]': Starting task run...\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[6]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[7]': Starting task run...\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[7]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[8]': Starting task run...\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[8]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:10] INFO - prefect.TaskRunner | Task 'read_data[9]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'read_data[9]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'read_data[10]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'read_data[10]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'read_data[11]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'read_data[11]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk': finished task run for task with final state: 'Mapped'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[0]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[0]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[1]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[1]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[2]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[2]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[3]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[3]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[4]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[4]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[5]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[5]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[6]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[6]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[7]': Starting task run...\n",
"[2020-10-30 19:30:11] INFO - prefect.TaskRunner | Task 'groupby_chunk[7]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[8]': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[8]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[9]': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[9]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[10]': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[10]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[11]': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_chunk[11]': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'concat_frames': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'concat_frames': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_aggregate': Starting task run...\n",
"[2020-10-30 19:30:12] INFO - prefect.TaskRunner | Task 'groupby_aggregate': finished task run for task with final state: 'Success'\n",
"[2020-10-30 19:30:12] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n"
]
}
],
"source": [
"flow_run1 = nytaxi_cudf_flow.run(\n",
" source_bucket=\"gs://anaconda-public-data/nyc-taxi/csv/2014/green_tripdata*.csv\",\n",
" destination_path=\"./nyc-taxi/green_tripdata/\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output of a flow run can be used to view the results of each stage. \n",
"\n",
"In this case, we query the result of the task returning `result_df` from the output of `flow_run1`."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Tip_mean</th>\n",
" </tr>\n",
" <tr>\n",
" <th>Passenger_count</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0.370795</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1.073249</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1.178949</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1.184562</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1.044996</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1.072943</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>1.276358</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>0.665341</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>0.843497</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>1.316867</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Tip_mean\n",
"Passenger_count \n",
"0 0.370795\n",
"1 1.073249\n",
"2 1.178949\n",
"3 1.184562\n",
"4 1.044996\n",
"5 1.072943\n",
"6 1.276358\n",
"7 0.665341\n",
"8 0.843497\n",
"9 1.316867"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_run1.result[result_df].result.sort_index()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scaling the Prefect flow to multiple GPUs <a class=\"anchor\" id=\"Multi-GPU-cuDF\"></a>\n",
"\n",
"Prefect has a tight integration with [Dask](https://dask.org) allowing users to scale tasks on a Dask cluster. Rapids also has a tight dask integration to scale workflows to multiple GPUs using [dask-cuda](https://github.com/rapidsai/dask-cuda) which builds on the dask-worker to make it easy to use with cuda enabled GPUs. \n",
"\n",
"There are multiple ways to scale a Prefect flow out to multiple GPUs (Single and Multi Node)\n",
"\n",
"#### Option 1: Starting a LocalCUDACluster directly using `DaskExecutor` (Single Node only)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from prefect.engine.executors import DaskExecutor\n",
"\n",
"executor = DaskExecutor(\n",
" cluster_class=\"dask_cuda.LocalCUDACluster\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Running the flow on multiple GPUs on the node"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2020-10-30 19:30:12] INFO - prefect.FlowRunner | Beginning Flow run for 'cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:30:44] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n"
]
}
],
"source": [
"flow_run2 = nytaxi_cudf_flow.run(\n",
" source_bucket=\"gs://anaconda-public-data/nyc-taxi/csv/2014/green_tripdata*.csv\",\n",
" destination_path=\"./nyc-taxi/green_tripdata/\",\n",
" executor=executor,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Tip_mean</th>\n",
" </tr>\n",
" <tr>\n",
" <th>Passenger_count</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0.370795</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1.073249</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1.178949</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1.184562</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1.044996</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1.072943</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>1.276358</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>0.665341</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>0.843497</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>1.316867</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Tip_mean\n",
"Passenger_count \n",
"0 0.370795\n",
"1 1.073249\n",
"2 1.178949\n",
"3 1.184562\n",
"4 1.044996\n",
"5 1.072943\n",
"6 1.276358\n",
"7 0.665341\n",
"8 0.843497\n",
"9 1.316867"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_run2.result[result_df].result.sort_index()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"del (flow_run1, flow_run2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Option 2: Start a dask-cuda cluster through any preferred method and point the `DaskExecutor` object to the scheduler address\n",
"\n",
"For the purposes of this example a single node `LocalCUDACluster` is used. This could easily be replaced by a multi node cluster each running `dask-cuda-worker` and connected to a common `dask-scheduler`."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"\n",
"cluster = LocalCUDACluster()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from prefect.engine.executors import DaskExecutor\n",
"\n",
"executor = DaskExecutor(address=cluster.scheduler_address)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2020-10-30 19:30:54] INFO - prefect.FlowRunner | Beginning Flow run for 'cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:31:10] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n"
]
}
],
"source": [
"flow_run3 = nytaxi_cudf_flow.run(\n",
" source_bucket=\"gs://anaconda-public-data/nyc-taxi/csv/2014/green_tripdata*.csv\",\n",
" destination_path=\"./nyc-taxi/green_tripdata/\",\n",
" executor=executor,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"### Using the dask_cudf api to leverage muti-gpu dataframe operations directly <a class=\"anchor\" id=\"Multi-GPU-dask-cuDF\"></a>\n",
"\n",
"The examples above used `cuDF` a single GPU dataframe library each performing a small task and scaled by scheduling multiple tasks using dask. In the real world, datasets can often be much larger where a single GPU may not be sufficient to perform this unit of work (Task).The dask-cuDF distributed dataframe library (built on top of Dask Dataframe) can also be used with Prefect tasks just as easily.\n",
"\n",
"#### Setting up dask-cuDF tasks"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"from distributed import worker_client\n",
"\n",
"\n",
"@task(checkpoint=False)\n",
"def dask_read_data(file_names):\n",
" with worker_client() as client:\n",
" return dask_cudf.read_csv(file_names)\n",
"\n",
"\n",
"@task()\n",
"def dask_groupby_task(ddf):\n",
" with worker_client() as client:\n",
" return ddf.groupby(\"Passenger_count\").Tip_amount.mean().compute().to_frame()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Creating a dask-cuDF Prefect flow"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"with Flow(\"Dask-cuDF NYTaxi Prefect Flow\") as nytaxi_dask_cudf_flow:\n",
" source_bucket = Parameter(\"source_bucket\")\n",
" destination_path = Parameter(\"destination_path\")\n",
"\n",
" file_list = download_data(source_bucket, destination_path)\n",
" ddf = dask_read_data(file_list)\n",
" result_df = dask_groupby_task(ddf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Visualizing the flow"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.42.3 (20191010.1750)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"381pt\" height=\"305pt\"\n",
" viewBox=\"0.00 0.00 381.29 305.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 301)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-301 377.29,-301 377.29,4 -4,4\"/>\n",
"<!-- 140424158593808 -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>140424158593808</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-192\" rx=\"80.69\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-188.3\" font-family=\"Times,serif\" font-size=\"14.00\">download_data</text>\n",
"</g>\n",
"<!-- 140424030740944 -->\n",
"<g id=\"node5\" class=\"node\">\n",
"<title>140424030740944</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-105\" rx=\"83.69\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\">dask_read_data</text>\n",
"</g>\n",
"<!-- 140424158593808&#45;&gt;140424030740944 -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
"<title>140424158593808&#45;&gt;140424030740944</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M172.29,-173.8C172.29,-162.16 172.29,-146.55 172.29,-133.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-133.18 172.29,-123.18 168.79,-133.18 175.79,-133.18\"/>\n",
"<text text-anchor=\"middle\" x=\"210.79\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">file_names</text>\n",
"</g>\n",
"<!-- 140424157175376 -->\n",
"<g id=\"node2\" class=\"node\">\n",
"<title>140424157175376</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"79.29\" cy=\"-279\" rx=\"79.09\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"79.29\" y=\"-275.3\" font-family=\"Times,serif\" font-size=\"14.00\">source_bucket</text>\n",
"</g>\n",
"<!-- 140424157175376&#45;&gt;140424158593808 -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
"<title>140424157175376&#45;&gt;140424158593808</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M97.67,-261.21C111.55,-248.52 130.79,-230.93 146.26,-216.79\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"148.94,-219.09 153.96,-209.76 144.21,-213.92 148.94,-219.09\"/>\n",
"<text text-anchor=\"middle\" x=\"185.29\" y=\"-231.8\" font-family=\"Times,serif\" font-size=\"14.00\">source_bucket</text>\n",
"</g>\n",
"<!-- 140424163702352 -->\n",
"<g id=\"node3\" class=\"node\">\n",
"<title>140424163702352</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"265.29\" cy=\"-279\" rx=\"89.08\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"265.29\" y=\"-275.3\" font-family=\"Times,serif\" font-size=\"14.00\">destination_path</text>\n",
"</g>\n",
"<!-- 140424163702352&#45;&gt;140424158593808 -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
"<title>140424163702352&#45;&gt;140424158593808</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M260.51,-260.55C256.91,-250.07 251.07,-237.06 242.29,-228 236.58,-222.1 229.66,-217.03 222.42,-212.73\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"223.61,-209.39 213.15,-207.7 220.27,-215.55 223.61,-209.39\"/>\n",
"<text text-anchor=\"middle\" x=\"312.79\" y=\"-231.8\" font-family=\"Times,serif\" font-size=\"14.00\">destination_path</text>\n",
"</g>\n",
"<!-- 140424163323728 -->\n",
"<g id=\"node4\" class=\"node\">\n",
"<title>140424163323728</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"172.29\" cy=\"-18\" rx=\"100.98\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"172.29\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\">dask_groupby_task</text>\n",
"</g>\n",
"<!-- 140424030740944&#45;&gt;140424163323728 -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
"<title>140424030740944&#45;&gt;140424163323728</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M172.29,-86.8C172.29,-75.16 172.29,-59.55 172.29,-46.24\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"175.79,-46.18 172.29,-36.18 168.79,-46.18 175.79,-46.18\"/>\n",
"<text text-anchor=\"middle\" x=\"184.29\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">ddf</text>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<graphviz.dot.Digraph at 0x7fb7047b9310>"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"nytaxi_dask_cudf_flow.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here `dask_read_data` and `dask_groupby_task` operations work directly with dask-cudf dataframes and use the entire cluster for the task computations.\n",
"\n",
"#### Running the Flow"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2020-10-30 19:31:11] INFO - prefect.FlowRunner | Beginning Flow run for 'Dask-cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:31:14] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n"
]
}
],
"source": [
"executor = DaskExecutor(address=cluster.scheduler_address)\n",
"\n",
"flow_run4 = nytaxi_dask_cudf_flow.run(\n",
" source_bucket=\"gs://anaconda-public-data/nyc-taxi/csv/2014/green_tripdata*.csv\",\n",
" destination_path=\"./nyc-taxi/green_tripdata/\",\n",
" executor=executor,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Tip_amount</th>\n",
" </tr>\n",
" <tr>\n",
" <th>Passenger_count</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0.370795</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1.073249</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1.178949</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1.184562</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1.044996</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1.072943</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>1.276358</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>0.665341</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>0.843497</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>1.316867</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Tip_amount\n",
"Passenger_count \n",
"0 0.370795\n",
"1 1.073249\n",
"2 1.178949\n",
"3 1.184562\n",
"4 1.044996\n",
"5 1.072943\n",
"6 1.276358\n",
"7 0.665341\n",
"8 0.843497\n",
"9 1.316867"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_run4.result[result_df].result.sort_index()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"del (flow_run3, flow_run4)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### We can also schedule runs at intervals"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"from datetime import timedelta, datetime\n",
"from prefect.schedules import IntervalSchedule\n",
"\n",
"# Schedule runs at intervals of 2 minutes\n",
"schedule = IntervalSchedule(\n",
" start_date=datetime.utcnow() + timedelta(seconds=1),\n",
" interval=timedelta(minutes=2),\n",
")\n",
"\n",
"nytaxi_dask_cudf_flow.schedule = schedule"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2020-10-30 19:31:14] INFO - prefect.Dask-cuDF NYTaxi Prefect Flow | Waiting for next scheduled run at 2020-10-30T19:31:15.262056+00:00\n",
"[2020-10-30 19:31:15] INFO - prefect.FlowRunner | Beginning Flow run for 'Dask-cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:31:16] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n",
"[2020-10-30 19:31:16] INFO - prefect.Dask-cuDF NYTaxi Prefect Flow | Waiting for next scheduled run at 2020-10-30T19:33:15.262056+00:00\n",
"[2020-10-30 19:33:15] INFO - prefect.FlowRunner | Beginning Flow run for 'Dask-cuDF NYTaxi Prefect Flow'\n",
"[2020-10-30 19:33:16] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n",
"[2020-10-30 19:33:16] INFO - prefect.Dask-cuDF NYTaxi Prefect Flow | Waiting for next scheduled run at 2020-10-30T19:35:15.262056+00:00\n"
]
}
],
"source": [
"interval_run = nytaxi_dask_cudf_flow.run(\n",
" source_bucket=\"gs://anaconda-public-data/nyc-taxi/csv/2014/green_tripdata*.csv\",\n",
" destination_path=\"./nyc-taxi/green_tripdata/\",\n",
" executor=executor,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"### Next steps: Prefect [server orchestration](https://docs.prefect.io/orchestration/) <a class=\"anchor\" id=\"Next-Steps\"></a>\n",
"\n",
"The existing flow can be registered to a Prefect backend (server or cloud) which can be used to monitor, schedule and execute multiple Prefect flows across project and environments.\n",
"\n",
"When using the Local server as the backend, the flow can be modified as follows:\n",
"- Setup an execution environment for the flow eg: `LocalEnvironment`\n",
"\n",
"```python\n",
"from prefect.environments import LocalEnvironment\n",
"from prefect.engine.executors import DaskExecutor\n",
"\n",
"executor = DaskExecutor(address=\"{scheduler_address}\")\n",
"environment = LocalEnvironment(executor=executor)\n",
"```\n",
"\n",
"- Register the flow to a project\n",
" ```python\n",
" flow.register(\"project_name\")\n",
" ```\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Rapids-16",
"language": "python",
"name": "rapids-16"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment