Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chinmaychandak/3137d8825d2f34ed7abdf72068bf78a7 to your computer and use it in GitHub Desktop.
Save chinmaychandak/3137d8825d2f34ed7abdf72068bf78a7 to your computer and use it in GitHub Desktop.
custreamz-CCU-state-management
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# cudf and streamz imports\n",
"import cudf\n",
"from streamz import Stream\n",
"from streamz.dataframe import DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"'''\n",
"This function filters data for relevant events. It also creates a new column based on event start/end value.\n",
"An event marked with \"Started\" indicates that the CCUs for that particular Zone/Application/UserPriority has increased.\n",
"Similarly, an event marked with \"Ended\" indicates that the CCUs for that particular Zone/Application/UserPriority has decreased.\n",
"'''\n",
"def filter_by_event(df):\n",
" mask = (df[\"Event\"] == \"Started\") | (df[\"Event\"] == \"Ended\")\n",
" df = df.loc[mask]\n",
" df[\"CCU\"] = 0\n",
" ccu_inc_mask = (df[\"Event\"] == \"Started\")\n",
" ccu_dec_mask = (df[\"Event\"] == \"Ended\")\n",
" df[\"CCU\"].loc[ccu_inc_mask] = 1\n",
" df[\"CCU\"].loc[ccu_dec_mask] = -1\n",
" return df\n",
"\n",
"'''\n",
"This function calculates the CCU results for this batch only.\n",
"''' \n",
"def ccu_include_aggregations(df):\n",
" groups = df.groupby([\"ZoneName\", \"ApplicationName\", \"UserPriority\"]).CCU.sum()\n",
" return groups.reset_index()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Create a cu-stream\n",
"stream = Stream()\n",
"\n",
"# Preprocess batch of data, and calculate local CCUs\n",
"source = stream.map(filter_by_event).map(ccu_include_aggregations)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"example = cudf.DataFrame({'ZoneName': [], 'ApplicationName':[], 'UserPriority':[], 'CCU':[]})\n",
"\n",
"# Create a Streamz Dataframe to maintain CCU state from the start of the stream\n",
"sdf = DataFrame(source, example=example)\n",
"\n",
"# Calculate global (from time t=0) CCUs \n",
"sdf.groupby([\"ZoneName\", \"ApplicationName\", \"UserPriority\"]).CCU.sum().stream.gather().sink(print)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ZoneName ApplicationName UserPriority\n",
"z1 AnotherApp High 1\n",
" SomeApp High 1\n",
"z2 AnotherApp High 1\n",
" SomeApp High 1\n",
"z3 RandomApp High 1\n",
" SomeApp High 1\n",
"Name: CCU, dtype: int64\n"
]
}
],
"source": [
"df1 = cudf.DataFrame({'UserId': [\"Alice\", \"John\", \"Joe\", \"Tim\", \"Newton\", \"Rando\"],\n",
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\"],\n",
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"SomeApp\", \"RandomApp\"],\n",
" 'Event':[\"Started\",\"Started\", \"Started\",\"Started\",\"Started\", \"Started\"],\n",
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"High\",\"High\"]})\n",
"\n",
"# Emitting first batch of data\n",
"stream.emit(df1)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ZoneName ApplicationName UserPriority\n",
"z1 AnotherApp High 0\n",
" SomeApp High 1\n",
"z2 AnotherApp High 1\n",
" SomeApp High 1\n",
"z3 RandomApp High 1\n",
" SomeApp High 0\n",
"Name: CCU, dtype: int64\n"
]
}
],
"source": [
"df2 = cudf.DataFrame({'UserId': [\"Alice\", \"John\", \"Joe\", \"Tim\", \"Newton\", \"Rando\"],\n",
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\"],\n",
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"SomeApp\", \"RandomApp\"],\n",
" 'Event':[\"Processing\",\"Ended\", \"Processing\",\"Processing\",\"Ended\", \"Processing\"],\n",
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"High\",\"High\"]})\n",
"\n",
"# Emitting second batch of data\n",
"stream.emit(df2)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ZoneName ApplicationName UserPriority\n",
"z1 AnotherApp High 1\n",
" SomeApp High 0\n",
"z2 AnotherApp High 0\n",
" SomeApp High 1\n",
"z3 AnotherApp Low 1\n",
" RandomApp High 2\n",
" Low 1\n",
" SomeApp High 0\n",
"Name: CCU, dtype: int64\n"
]
}
],
"source": [
"df3 = cudf.DataFrame({'UserId': [\"Alice\", \"Jim\", \"Joe\", \"Tim\", \"Einstein\", \"Fluff\", \"Snuff\"],\n",
" 'ZoneName': [\"z1\", \"z1\", \"z2\", \"z2\", \"z3\", \"z3\", \"z3\"],\n",
" 'ApplicationName':[\"SomeApp\", \"AnotherApp\", \"SomeApp\", \"AnotherApp\", \"AnotherApp\", \"RandomApp\", \"RandomApp\"],\n",
" 'Event':[\"Ended\",\"Started\", \"Processing\",\"Ended\",\"Started\", \"Started\", \"Started\"],\n",
" 'UserPriority':[\"High\",\"High\",\"High\",\"High\",\"Low\",\"Low\", \"High\"]})\n",
"\n",
"# Emitting third batch of data\n",
"stream.emit(df3)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (kc3)",
"language": "python",
"name": "kc3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment