Skip to content

Instantly share code, notes, and snippets.

@chinmaychandak
Last active October 30, 2020 17:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chinmaychandak/5728baacee10c42cdb7fc601b97a8c77 to your computer and use it in GitHub Desktop.
Save chinmaychandak/5728baacee10c42cdb7fc601b97a8c77 to your computer and use it in GitHub Desktop.
cuStreamz Word Count @ Scale
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This custreamz job aims to compute the Streaming Word Count from Kafka in a distributed setting.\n",
"This was tested on 2 Tesla T4 (g4dn.xlarge) instances on AWS EC2, with the Dask scheduler on one instances and Dask workers split evenly on both instances."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import confluent_kafka as ck\n",
"import cudf\n",
"import json\n",
"import random\n",
"from distributed import Client\n",
"from streamz import Stream\n",
"from streamz.dataframe import DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>tcp://localhost:8786</li>\n",
" <li><b>Dashboard: </b><a href='http://localhost:8787/status' target='_blank'>http://localhost:8787/status</a></li>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>2</li>\n",
" <li><b>Cores: </b>4</li>\n",
" <li><b>Memory: </b>16.00 GB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: 'tcp://localhost:8786' processes=2 threads=4, memory=16.00 GB>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"'''\n",
"Create a Dask client to connect to an already up Dask scheduler and workers started from CLI.\n",
"'''\n",
"client = Client(\"localhost:8786\")\n",
"client.get_versions(check=True)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To start a Dask scheduler from CLI, run: <br>\n",
"**dask-scheduler --host scheduler_IP --port scheduler_port**\n",
"\n",
"To start Dask workers to connect to the scheduler, use a command like: <br>\n",
"**CUDA_VISIBLE_DEVICES=0 dask-worker scheduler_IP:scheduler_port --nprocs 1 --nthreads 2**\n",
"\n",
"One can add more workers from different machines to run a custreamz job in a distributed mode. In the above command, we're starting 1 bi-threaded Dask worker (each worker being a process) on 2 machines. So a total of 2 workers (4 cores) are ready to be used.\n",
"\n",
"Refer to: https://docs.dask.org/en/latest/setup/cli.html?highlight=cli, for more details on how to start the Dask scheduler and workers from CLI with custom configurations.\n",
"\n",
"For long-running and computationally expensive jobs, you might want to change the default configs in: <br>\n",
"**~./config/dask/distributed.yaml**.\n",
"\n",
"For example, we've usually observed that streaming jobs in a distributed mode run more smoothly after increasing the following timeout configs on both the scheduler and the worker machines in the file config mentioned above: <br>\n",
"comm.timeouts.connect=\"100s\" <br>\n",
"comm.timeouts.tcp=\"600s\" <br>\n",
"\n",
"Please refer to: https://docs.dask.org/en/latest/configuration.html for more information on Dask configurations."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"Kafka configs\n",
"'''\n",
"\n",
"# This topic has 4 partitions\n",
"KAFKA_INPUT_TOPIC = \"word-count-input\"\n",
"\n",
"# This topic has 1 partition\n",
"KAFKA_OUTPUT_TOPIC = \"word-count-result\"\n",
"\n",
"# Kafka brokers\n",
"bootstrap_servers = \"localhost:9092\"\n",
"\n",
"# Kafka Consumer Configuration\n",
"CONSUMER_ARGS = {\n",
" \"bootstrap.servers\": bootstrap_servers,\n",
" \"group.id\": 'custreamz-word-count',\n",
"}\n",
"# Kafka Producer Configuration\n",
"PRODUCER_ARGS = {\n",
" \"bootstrap.servers\": bootstrap_servers,\n",
" 'compression.type':'snappy'\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"Helper function to produce messages in JSON format with dummy data to Kafka. \n",
"Each JSON message in Kafka has 3 fields: \"word\", \"num-occurences\", \"metadata\".\n",
"\n",
"An example JSON message looks like the one below: \n",
"{\"word\": \"RAPIDS\", \"num-occurrences\": 4, \"metadata\": \"GPU-accelerated Streaming @ Scale\"}\n",
"'''\n",
"def produce_random_word_count_data_to_kafka(max_messages):\n",
" words = [\"RAPIDS\", \"custreamz\", \"GPU\", \"Streaming\", \"Amazing\"]\n",
" occurrences = occurrences = [5, 4, 10, 1, 99]\n",
" metadata = [\"GPU Streaming Python Libraries\", \"GPU-accelerated Streaming @ Scale\",\n",
" \"NVIDIA\", \"Real-time\", \"Oh, yeah!\"]\n",
" producer = ck.Producer(PRODUCER_ARGS)\n",
" count = 1\n",
" columns = [\"word\", \"num-occurrences\", \"metadata\"] \n",
" while count <= max_messages:\n",
" dict_out = {}\n",
" for col in columns:\n",
" random_index = random.randrange(0,5)\n",
" if col is \"word\": dict_out[col] = words[random_index] \n",
" elif col is \"num-occurrences\": dict_out[col] = occurrences[random_index] \n",
" else: dict_out[col] = metadata[random_index] \n",
" producer.produce(KAFKA_INPUT_TOPIC, json.dumps(dict_out))\n",
" producer.flush()\n",
" count = count + 1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the input to the below function is a cuDF dataframe that is created by reading the batch of streaming data from Kafka in an accelerated manner and uploading it to the GPU. This route avoids any JSON -> cuDF preprocessing needing to be done. \n",
"\n",
"This accelerated cuStreamz Kafka consumer currently _only_ works when the data in Kafka is in JSON format, but we will be supporting formats like Avro, CSV, soon. \n",
"\n",
"If one prefers to read JSON data from Kafka as is, and then do some preprocessing before converting it to a cuDF dataframe, please use engine=\"None\" in the from_kafka_batched API while setting up the stream from Kafka as explained below."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"This is the entry-point function (as visible to the user) in this custreamz pipeline.\n",
"It is basically a helper function to do some data pre-processing before \n",
"computing/updating the word count. This function drops the metadata column since it \n",
"isn't useful for our use case, and renames a column to \"count\".\n",
"'''\n",
"def process_batch(gdf):\n",
" gdf = gdf.drop([\"metadata\"], axis=1)\n",
" gdf = gdf.rename({\"num-occurrences\":\"count\"}, axis=1)\n",
" return gdf"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"Writes the cumulative Word Count output back to Kafka in JSON format. This function \n",
"is executed over Dask. Please also take a look at the to_kafka() API as an \n",
"alternative in streamz.\n",
"'''\n",
"def write_to_kafka(sdf_out):\n",
" producer = ck.Producer(PRODUCER_ARGS)\n",
" producer.produce(KAFKA_OUTPUT_TOPIC, sdf_out.to_json(orient=\"records\"))\n",
" producer.flush()\n",
" return sdf_out"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"Setting up a stream to read from Kafka.\n",
"\n",
"To leverage the cuStreamz accelerated Kafka reader (note that data in Kafka must be \n",
"JSON format), one must use engine=\"cudf\". This reads data from Kafka directly into \n",
"cuDF dataframes. We have observed close to ~2x speedup in the end-to-end processing \n",
"when using this accelerated reader.\n",
"\n",
"If one expects Kafka topic partitions to be added on the fly during the stream run-time \n",
"(maybe to handle additional load), set refresh_partitions to True. \n",
"\n",
"Please refer to below link for more details:\n",
"https://streamz.readthedocs.io/en/latest/api.html#streamz.from_kafka_batched\n",
"'''\n",
"source = Stream.from_kafka_batched(KAFKA_INPUT_TOPIC, CONSUMER_ARGS, refresh_partitions=True, \n",
" poll_interval='10s', max_batch_size=1000, asynchronous=True, \n",
" engine=\"cudf\", dask=True)\n",
"\n",
"# Preprocess every batch\n",
"stream_df = source.map(process_batch)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"SDFs are the data structure that must be used for and form the crux of state management in custreamz. As a rule of thumb, whenever the user wants to maintain some sort of \"state\" across batches of streaming data, SDFs are the way to go."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"Create a streamz dataframe (SDF) to get the stateful/cumulative word count.\n",
"'''\n",
"sdf = DataFrame(stream_df, example=cudf.DataFrame({'word':[], 'count':[]}))\n",
"\n",
"# Compute cumulative word count from the start of the stream, after every batch. \n",
"out_stream = sdf.groupby('word').sum().stream\n",
"\n",
"# Write the latest Word Count result to Kafka and also sink the output to a list.\n",
"output = out_stream.map(write_to_kafka).gather().sink_to_list()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# Start the stream\n",
"source.start()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Produce 10 messages to Kafka\n",
"produce_random_word_count_data_to_kafka(10)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>count</th>\n",
" </tr>\n",
" <tr>\n",
" <th>word</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>Amazing</th>\n",
" <td>10</td>\n",
" </tr>\n",
" <tr>\n",
" <th>GPU</th>\n",
" <td>109</td>\n",
" </tr>\n",
" <tr>\n",
" <th>RAPIDS</th>\n",
" <td>105</td>\n",
" </tr>\n",
" <tr>\n",
" <th>Streaming</th>\n",
" <td>9</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" count\n",
"word \n",
"Amazing 10\n",
"GPU 109\n",
"RAPIDS 105\n",
"Streaming 9"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Last element in the output would be the latest word count\n",
"output[-1]"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# Produce 20 messages to Kafka\n",
"produce_random_word_count_data_to_kafka(20)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>count</th>\n",
" </tr>\n",
" <tr>\n",
" <th>word</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>Amazing</th>\n",
" <td>213</td>\n",
" </tr>\n",
" <tr>\n",
" <th>GPU</th>\n",
" <td>214</td>\n",
" </tr>\n",
" <tr>\n",
" <th>RAPIDS</th>\n",
" <td>316</td>\n",
" </tr>\n",
" <tr>\n",
" <th>Streaming</th>\n",
" <td>211</td>\n",
" </tr>\n",
" <tr>\n",
" <th>custreamz</th>\n",
" <td>25</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" count\n",
"word \n",
"Amazing 213\n",
"GPU 214\n",
"RAPIDS 316\n",
"Streaming 211\n",
"custreamz 25"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Last element in the output would be the latest word count\n",
"output[-1]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (custreamz4)",
"language": "python",
"name": "custreamz4"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment