Skip to content

Instantly share code, notes, and snippets.

@emtwo
Last active July 26, 2016 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emtwo/106e9b6701f17f02ce6323ee97bcb686 to your computer and use it in GitHub Desktop.
Save emtwo/106e9b6701f17f02ce6323ee97bcb686 to your computer and use it in GitHub Desktop.
engagement_features
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Engagement Features"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import datetime\n",
"import numpy as np\n",
"import pandas as pd\n",
"import plotly.plotly as py\n",
"import plotly.graph_objs as go\n",
"from pyspark.sql.types import *\n",
"from datetime import date, timedelta\n",
"from pyspark.mllib.stat import Statistics\n",
"from pyspark.sql.window import Window\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.functions import col, count\n",
"from pyspark.mllib.regression import LabeledPoint\n",
"from pyspark.mllib.tree import DecisionTree, RandomForest, RandomForestModel\n",
"from pyspark.mllib.classification import * \n",
"from pyspark.mllib.evaluation import MulticlassMetrics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import Activity Stream Tables"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"activity_stream_events_daily_url = \"activity_stream_events_daily.csv\"\n",
"activity_stream_stats_daily_url = \"activity_stream_stats_daily.csv\""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/hadoop/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.py:2723: DtypeWarning:\n",
"\n",
"Columns (11,15,16) have mixed types. Specify dtype option on import or set low_memory=False.\n",
"\n",
"/home/hadoop/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.py:2723: DtypeWarning:\n",
"\n",
"Columns (14,19,20) have mixed types. Specify dtype option on import or set low_memory=False.\n",
"\n"
]
}
],
"source": [
"pandas_events = pd.read_csv(activity_stream_events_daily_url, \",\")\n",
"pandas_stats = pd.read_csv(activity_stream_stats_daily_url, \",\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"integer_types = [\"max_scroll_depth\", \"load_latency\", \"total_bookmarks\", \"total_history_size\", \"session_duration\"]\n",
"\n",
"events_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_events.columns]\n",
"stats_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_stats.columns]\n",
"\n",
"events_schema = StructType(events_fields)\n",
"stats_schema = StructType(stats_fields)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"activity_stream_events_daily_df = sqlContext.createDataFrame(pandas_events, schema=events_schema)\n",
"activity_stream_stats_daily_df = sqlContext.createDataFrame(pandas_stats, schema=stats_schema) "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"sqlContext.registerDataFrameAsTable(activity_stream_events_daily_df, \"activity_stream_events_daily\")\n",
"sqlContext.registerDataFrameAsTable(activity_stream_stats_daily_df, \"activity_stream_stats_daily\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Identify \"Engaged\" Users - Returning at Least Once Per Week Over the Past Four Weeks"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Find how many days ago our earliest datapoint is.\n",
"earliest_date = activity_stream_events_daily_df \\\n",
" .select(\"date\") \\\n",
" .orderBy(\"date\").first()[0]\n",
"earliest_datetime = datetime.datetime.strptime(earliest_date, \"%Y-%m-%d\")\n",
"days_ago = (datetime.datetime.today() - earliest_datetime).days"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Create a dataframe of all the dates between now and days_ago\n",
"base = datetime.datetime.today()\n",
"date_list = [(datetime.datetime.today() - datetime.timedelta(days=x)).date() for x in range(0, days_ago)]\n",
"pandas_df = pd.DataFrame(date_list, columns=[\"date\"])\n",
"date_list_df = sqlContext.createDataFrame(pandas_df)\n",
"sqlContext.registerDataFrameAsTable(date_list_df, \"date_list\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Map each client to all possible dates\n",
"client_list_df = activity_stream_events_daily_df.select(\"client_id\").distinct()\n",
"all_dates_clients_df = date_list_df.join(client_list_df)\n",
"sqlContext.registerDataFrameAsTable(all_dates_clients_df, \"all_dates_clients\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Create a table of user interactions per day\n",
"interactions_per_user_per_day_df = \\\n",
" activity_stream_events_daily_df \\\n",
" .select(\"client_id\", \"date\") \\\n",
" .groupBy(\"client_id\", \"date\") \\\n",
" .count().orderBy(\"client_id\", \"date\")\n",
"interactions_per_user_per_day_df = interactions_per_user_per_day_df.withColumn(\"date\", col(\"date\").cast(\"date\"))\n",
"sqlContext.registerDataFrameAsTable(interactions_per_user_per_day_df, \"interactions_per_user_per_day\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+----------+-----+\n",
"| client_id| date|count|\n",
"+--------------------+----------+-----+\n",
"|00025585-a4e0-4cd...|2016-05-24| 13|\n",
"|00025585-a4e0-4cd...|2016-05-25| 3|\n",
"|00025585-a4e0-4cd...|2016-05-26| 1|\n",
"|00025585-a4e0-4cd...|2016-05-27| 3|\n",
"|00025585-a4e0-4cd...|2016-05-28| 3|\n",
"|00025585-a4e0-4cd...|2016-05-30| 5|\n",
"|00025585-a4e0-4cd...|2016-05-31| 8|\n",
"|00025585-a4e0-4cd...|2016-06-01| 5|\n",
"|00025585-a4e0-4cd...|2016-06-02| 10|\n",
"|00025585-a4e0-4cd...|2016-06-03| 11|\n",
"|00025585-a4e0-4cd...|2016-06-04| 10|\n",
"|00025585-a4e0-4cd...|2016-06-05| 1|\n",
"|00025585-a4e0-4cd...|2016-06-06| 17|\n",
"|00025585-a4e0-4cd...|2016-06-07| 5|\n",
"|00025585-a4e0-4cd...|2016-06-08| 3|\n",
"|00025585-a4e0-4cd...|2016-06-09| 5|\n",
"|00025585-a4e0-4cd...|2016-06-10| 3|\n",
"|00025585-a4e0-4cd...|2016-06-13| 3|\n",
"|00025585-a4e0-4cd...|2016-06-14| 5|\n",
"|00025585-a4e0-4cd...|2016-06-15| 1|\n",
"+--------------------+----------+-----+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"interactions_per_user_per_day_df.show()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"all_users_dates_counts = sqlContext.sql(\n",
" \"SELECT all_dates_clients.client_id, all_dates_clients.date, COALESCE(interactions_per_user_per_day.count, 0) AS count \" +\n",
" \"FROM all_dates_clients \" +\n",
" \"LEFT JOIN interactions_per_user_per_day \" +\n",
" \"ON all_dates_clients.client_id = interactions_per_user_per_day.client_id \" +\n",
" \"AND all_dates_clients.date = interactions_per_user_per_day.date \" +\n",
" \"ORDER BY client_id, date\")"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[client_id: string, date: date, count: bigint]"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"all_users_dates_counts.cache()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [],
"source": [
"# Hive timestamp is interpreted as UNIX timestamp in seconds*\n",
"days = lambda i: i * 86400 \n",
"\n",
"w = (Window()\n",
" .partitionBy(col(\"client_id\"))\n",
" .orderBy(col(\"date\").cast(\"timestamp\").cast(\"long\"))\n",
" .rangeBetween(-days(6), 0))\n",
"\n",
"weekly_avgs = all_users_dates_counts.select(col(\"client_id\"),\n",
" col(\"date\").cast(\"timestamp\"),\n",
" col(\"count\"),\n",
" mean(\"count\").over(w).alias(\"week_avg\"))"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+-----+--------+\n",
"| client_id| date|count|week_avg|\n",
"+--------------------+--------------------+-----+--------+\n",
"|01e6ca6d-0a0b-47b...|2016-04-08 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-09 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-10 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-11 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-12 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-13 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-14 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-15 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-16 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-17 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-18 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-19 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-20 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-21 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-22 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-23 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-24 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-25 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-26 00:00:...| 0| 0.0|\n",
"|01e6ca6d-0a0b-47b...|2016-04-27 00:00:...| 0| 0.0|\n",
"+--------------------+--------------------+-----+--------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"weekly_avgs.show()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"4686"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# The number of users who return at least once per week over the past 3 weeks\n",
"engaged_users = weekly_avgs \\\n",
" .filter(col(\"date\") >= date_sub(current_date(), 21)) \\\n",
" .filter(col(\"date\") <= date_sub(current_date(), 0)) \\\n",
" .where(col(\"week_avg\") > 0) \\\n",
" .select(\"client_id\") \\\n",
" .groupBy(\"client_id\") \\\n",
" .count() \\\n",
" .where(col(\"count\") == 21) \\\n",
" .select(\"client_id\").distinct()\n",
"\n",
"engaged_users.count()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"14742"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"unengaged_users = activity_stream_stats_daily_df \\\n",
" .where(col(\"date\") >= date_sub(current_date(), 21)) \\\n",
" .select(\"client_id\").distinct() \\\n",
" .subtract(engaged_users)\n",
"\n",
"unengaged_users.count()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2016-07-24\n",
"Engaged Users: 4739\n",
"Unengaged Users: 14905\n",
"Total Users: 19644\n",
"2016-07-17\n",
"Engaged Users: 5083\n",
"Unengaged Users: 15468\n",
"Total Users: 20551\n",
"2016-07-10\n",
"Engaged Users: 5447\n",
"Unengaged Users: 16314\n",
"Total Users: 21761\n",
"2016-07-03\n",
"Engaged Users: 5110\n",
"Unengaged Users: 18356\n",
"Total Users: 23466\n",
"2016-06-26\n",
"Engaged Users: 4649\n",
"Unengaged Users: 20669\n",
"Total Users: 25318\n",
"2016-06-19\n",
"Engaged Users: 4695\n",
"Unengaged Users: 21330\n",
"Total Users: 26025\n",
"2016-06-12\n",
"Engaged Users: 4105\n",
"Unengaged Users: 21180\n",
"Total Users: 25285\n",
"2016-06-05\n",
"Engaged Users: 2622\n",
"Unengaged Users: 22806\n",
"Total Users: 25428\n"
]
}
],
"source": [
"dates = []\n",
"engaged = []\n",
"unengaged = []\n",
"for i in xrange(8):\n",
" engaged_users = weekly_avgs \\\n",
" .filter(col(\"date\") >= date_sub(current_date(), 7 * i + 21)) \\\n",
" .filter(col(\"date\") <= date_sub(current_date(), 7 * i)) \\\n",
" .where(col(\"week_avg\") > 0) \\\n",
" .select(\"client_id\") \\\n",
" .groupBy(\"client_id\") \\\n",
" .count() \\\n",
" .where(col(\"count\") == 21) \\\n",
" .select(\"client_id\").distinct() \n",
" \n",
" unengaged_users = activity_stream_stats_daily_df \\\n",
" .filter(col(\"date\") >= date_sub(current_date(), 7 * i + 21)) \\\n",
" .filter(col(\"date\") <= date_sub(current_date(), 7 * i)) \\\n",
" .select(\"client_id\").distinct() \\\n",
" .subtract(engaged_users)\n",
"\n",
" end_of_week = date.today() - timedelta(days=7 * i)\n",
" print end_of_week\n",
" print \"Engaged Users: \" + str(engaged_users.count())\n",
" print \"Unengaged Users: \" + str(unengaged_users.count())\n",
" print \"Total Users: \" + str(engaged_users.count() + unengaged_users.count())\n",
" dates.append(end_of_week)\n",
" engaged.append(engaged_users.count())\n",
" unengaged.append(unengaged_users.count())\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<iframe id=\"igraph\" scrolling=\"no\" style=\"border:none;\"seamless=\"seamless\" src=\"https://plot.ly/~mozilla/940.embed\" height=\"525\" width=\"100%\"></iframe>"
],
"text/plain": [
"<plotly.tools.PlotlyDisplay object>"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import plotly.plotly as py\n",
"import plotly.graph_objs as go\n",
"\n",
"trace1 = go.Bar(\n",
" x=dates,\n",
" y=engaged,\n",
" name='Engaged Users'\n",
")\n",
"trace2 = go.Bar(\n",
" x=dates,\n",
" y=unengaged,\n",
" name='Unengaged Users'\n",
")\n",
"\n",
"data = [trace1, trace2]\n",
"layout = go.Layout(\n",
" barmode='stack',\n",
" xaxis=dict(\n",
" title='3-Week Period End Date',\n",
" titlefont=dict(\n",
" family='Courier New, monospace',\n",
" size=18,\n",
" color='#7f7f7f'\n",
" )\n",
" ),\n",
" yaxis=dict(\n",
" title='Number of Users',\n",
" titlefont=dict(\n",
" family='Courier New, monospace',\n",
" size=18,\n",
" color='#7f7f7f'\n",
" )\n",
" )\n",
")\n",
"\n",
"fig = go.Figure(data=data, layout=layout)\n",
"py.iplot(fig, filename='stacked-bar')\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Aggregating the Various Component Metrics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loyalty Index"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------------+\n",
"| client_id| loyalty|\n",
"+--------------------+-----------------+\n",
"|6ed543ad-1a37-f84...|99.91735537190083|\n",
"|b736dd76-567c-4f4...|99.88331388564761|\n",
"|3caf9141-fea2-4b4...|99.86962190352021|\n",
"|201d3217-e202-8d4...|99.84639016897081|\n",
"|f9fbb997-d1ee-4f7...|99.83948635634029|\n",
"|8a4c60ba-08ea-482...|99.79166666666667|\n",
"|1d78ab7e-0ee8-154...|99.79166666666667|\n",
"|fb7b3bcb-151d-439...| 99.7907949790795|\n",
"|ff486d55-cff8-184...|99.79035639412997|\n",
"|8ee47177-9238-8c4...|99.77678571428571|\n",
"|00ec0200-9faf-406...|99.77272727272727|\n",
"|3f6162f3-9319-426...|99.76851851851852|\n",
"|05083203-f7c2-450...|99.76689976689977|\n",
"|8ff28fe8-6401-417...|99.76689976689977|\n",
"|b0db015c-60c4-f84...|99.76635514018692|\n",
"|6a7683fb-a6b5-451...|99.76190476190476|\n",
"|697ea9a1-4909-a34...|99.76019184652279|\n",
"|c9b5ef85-57db-c14...|99.75124378109453|\n",
"|141623e8-f407-496...|99.74554707379136|\n",
"|88685cc1-64a3-4be...|99.74160206718346|\n",
"+--------------------+-----------------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"13979"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Users who haven't used activity stream at all in the past 7 days have a loyalty index of 0.\n",
"loyalty_index = activity_stream_stats_daily_df \\\n",
" .where(col(\"date\") > date_sub(current_date(), 7)) \\\n",
" .select(\"client_id\") \\\n",
" .groupBy(\"client_id\").count() \\\n",
" .select(\"client_id\", ((1 - (1 / col(\"count\"))) * 100).alias(\"loyalty\")) \\\n",
" .orderBy(desc(\"loyalty\"))\n",
"\n",
"loyalty_index.show()\n",
"loyalty_index.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Block and Delete Rate"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------------------+\n",
"| client_id|neg_interaction_index|\n",
"+--------------------+---------------------+\n",
"|efbf10b8-e7b9-4cc...| 100.0|\n",
"|5b0c21a7-3d24-4c9...| 100.0|\n",
"|5a5b465b-8954-41f...| 100.0|\n",
"|6a0be346-b48d-48f...| 100.0|\n",
"|8810a3b6-d858-4ec...| 100.0|\n",
"|51dbdfbc-e5f8-4f8...| 100.0|\n",
"|3a1adc85-e9ad-4a3...| 100.0|\n",
"|12bdd0c3-89c8-4e0...| 100.0|\n",
"|88af7c76-f1c0-45c...| 100.0|\n",
"|62bb8554-65d3-40f...| 100.0|\n",
"|3a05996a-d8c2-4c0...| 100.0|\n",
"|8d38cb1b-3578-46e...| 100.0|\n",
"|f794945f-d161-438...| 100.0|\n",
"|d6a21da9-5088-454...| 100.0|\n",
"|0e92be78-f80c-4c4...| 100.0|\n",
"|f9e60b49-3119-4ee...| 100.0|\n",
"|c4ccdec9-8796-f04...| 100.0|\n",
"|d35948d9-1948-41f...| 100.0|\n",
"|40f7e687-ad3a-47d...| 100.0|\n",
"|f2ffa7c6-7a36-432...| 100.0|\n",
"+--------------------+---------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"# Only includes sessions where session_id exists\n",
"events = activity_stream_events_daily_df \\\n",
" .where((col(\"event\") == \"DELETE\") | (col(\"event\") == \"BLOCK\"))\n",
"\n",
"neg_interaction_rate = activity_stream_stats_daily_df \\\n",
" .where(col(\"session_id\") != 'n/a') \\\n",
" .where(col(\"session_id\") != 'NaN') \\\n",
" .select(\"client_id\", \"session_id\") \\\n",
" .join(events, \\\n",
" activity_stream_stats_daily_df.session_id == events.session_id, \"outer\") \\\n",
" .select(activity_stream_stats_daily_df.client_id.alias(\"stats_client_id\"), \\\n",
" activity_stream_stats_daily_df.session_id.alias(\"stats_session_id\"), \\\n",
" events.client_id.alias(\"events_client_id\"), \\\n",
" events.session_id.alias(\"events_session_id\"))\n",
" \n",
"neg_interaction_index = neg_interaction_rate.groupBy(\"stats_client_id\") \\\n",
" .agg((countDistinct(neg_interaction_rate.events_session_id) / countDistinct(neg_interaction_rate.stats_session_id) * 100).alias(\"neg_interaction_index\")) \\\n",
" .na.drop().orderBy(desc(\"neg_interaction_index\")) \\\n",
" .select(col(\"stats_client_id\").alias(\"client_id\"), \"neg_interaction_index\")\n",
"\n",
"neg_interaction_index.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Interaction Rate"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------------+\n",
"| client_id|interaction_index|\n",
"+--------------------+-----------------+\n",
"|7a6e1b37-c76d-4b7...| 100.0|\n",
"|4ec69f00-1d4b-472...| 100.0|\n",
"|21aa4214-8954-40e...| 100.0|\n",
"|2a5e105a-7bdb-a64...| 100.0|\n",
"|e367cbd3-9dce-434...| 100.0|\n",
"|4ec9cb9e-7dfe-4db...| 100.0|\n",
"|62794d3e-50f9-4eb...| 100.0|\n",
"|13348859-13f0-49d...| 100.0|\n",
"|e438e2ef-c475-488...| 100.0|\n",
"|048ac47e-fb89-4e6...| 100.0|\n",
"|f794945f-d161-438...| 100.0|\n",
"|47f67bfe-3fd3-4fd...| 100.0|\n",
"|27411796-6c7f-480...| 100.0|\n",
"|0a0bed91-eb14-48c...| 100.0|\n",
"|35906457-b148-4ea...| 100.0|\n",
"|7f8272bc-417e-438...| 100.0|\n",
"|8a073505-c22b-4ae...| 100.0|\n",
"|22e44914-0a7d-4a3...| 100.0|\n",
"|006b7c41-5034-5f4...| 100.0|\n",
"|07b5669e-6112-46f...| 100.0|\n",
"+--------------------+-----------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"# Only includes sessions where session_id exists\n",
"positive_events = activity_stream_events_daily_df \\\n",
" .where(col(\"event\") != \"DELETE\") \\\n",
" .where(col(\"event\") != \"BLOCK\")\n",
"\n",
"interaction_rate = activity_stream_stats_daily_df \\\n",
" .where(col(\"session_id\") != 'n/a') \\\n",
" .where(col(\"session_id\") != 'NaN') \\\n",
" .select(\"client_id\", \"session_id\") \\\n",
" .join(positive_events, \\\n",
" activity_stream_stats_daily_df.session_id == positive_events.session_id, \"outer\") \\\n",
" .select(activity_stream_stats_daily_df.client_id.alias(\"stats_client_id\"), \\\n",
" activity_stream_stats_daily_df.session_id.alias(\"stats_session_id\"), \\\n",
" positive_events.client_id.alias(\"events_client_id\"), \\\n",
" positive_events.session_id.alias(\"events_session_id\"))\n",
" \n",
"interaction_index = interaction_rate.groupBy(\"stats_client_id\") \\\n",
" .agg((countDistinct(interaction_rate.events_session_id) / countDistinct(interaction_rate.stats_session_id) * 100).alias(\"interaction_index\")) \\\n",
" .na.drop().orderBy(desc(\"interaction_index\")) \\\n",
" .select(col(\"stats_client_id\").alias(\"client_id\"), \"interaction_index\")\n",
"\n",
"interaction_index.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Recency Index"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------------+\n",
"| client_id| recency_index|\n",
"+--------------------+-----------------+\n",
"|5858346a-8d61-403...|98.64864864864865|\n",
"|a5d56ebb-6abb-480...|98.63013698630137|\n",
"|ceb0f74a-c46d-42e...|98.63013698630137|\n",
"|323263fb-45be-4f3...|98.63013698630137|\n",
"|c1961f31-d1b4-4f7...|98.63013698630137|\n",
"|e9b50d52-fcc8-4d3...|98.63013698630137|\n",
"|ee5ec985-f1f9-42b...|98.63013698630137|\n",
"|8ca2e77b-ff90-469...|98.63013698630137|\n",
"|877da732-b6c1-4a6...|98.63013698630137|\n",
"|35d0d5b6-21d1-414...|98.63013698630137|\n",
"|6338c1e7-755e-449...|98.63013698630137|\n",
"|e3c95daf-cd8a-409...|98.61111111111111|\n",
"|ea9c0078-c6bb-439...|98.61111111111111|\n",
"|42c7c6b1-90e3-45d...|98.61111111111111|\n",
"|1def691d-5ca3-44f...|98.61111111111111|\n",
"|583e64c4-c20b-428...|98.61111111111111|\n",
"|59b23f7e-da53-4b6...|98.61111111111111|\n",
"|cd356540-84b8-404...|98.59154929577466|\n",
"|567fd585-3a27-488...|98.59154929577466|\n",
"|98f01aff-91b0-439...|98.59154929577466|\n",
"+--------------------+-----------------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"33753"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wSpec = Window \\\n",
" .partitionBy(col(\"client_id\")) \\\n",
" .orderBy(desc(\"date\"))\n",
"\n",
"recency_index = activity_stream_events_daily_df \\\n",
" .select(\"client_id\", \"date\", coalesce(1 / datediff(col(\"date\"), lead(\"date\").over(wSpec)), lit(0.0)).alias(\"recency_index\")) \\\n",
" .groupBy(\"client_id\", \"date\").max(\"recency_index\").alias(\"recency_index\") \\\n",
" .orderBy(\"client_id\", \"date\") \\\n",
" .groupBy(\"client_id\").avg(\"max(recency_index)\").select(\"*\")\n",
" \n",
"recency_index = recency_index \\\n",
" .select(\"client_id\", (col(recency_index.columns[1]) * 100).alias(\"recency_index\")) \\\n",
" .orderBy(desc(\"recency_index\"))\n",
"\n",
"recency_index.show()\n",
"recency_index.count()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"## Session Duration Index"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+----------------------+\n",
"| client_id|session_duration_index|\n",
"+--------------------+----------------------+\n",
"|8a405600-9a10-4a4...| 100.0|\n",
"|f1d5ae90-3e6c-4e3...| 100.0|\n",
"|8e13812f-7b4a-4fa...| 100.0|\n",
"|ddfd6d88-367a-474...| 100.0|\n",
"|234540b6-6545-411...| 100.0|\n",
"|ca3ad84b-3c6c-481...| 100.0|\n",
"|36ca25d6-feb1-475...| 100.0|\n",
"|d60166de-69b1-401...| 100.0|\n",
"|663d941a-66ee-4f8...| 100.0|\n",
"|76154650-0e5e-a04...| 100.0|\n",
"|8f9df96f-42ee-4d7...| 100.0|\n",
"|e387dd3c-4f1d-497...| 100.0|\n",
"|765c083c-ae2a-4d5...| 100.0|\n",
"|8bd3b298-50d6-4e9...| 100.0|\n",
"|7af2b542-fe8d-4f3...| 100.0|\n",
"|9afeb00e-d1bc-467...| 100.0|\n",
"|74136c2e-da4d-4c8...| 100.0|\n",
"|cbd950dc-3f72-45a...| 100.0|\n",
"|8661ae81-d0f0-564...| 100.0|\n",
"|d00b3ab2-29a3-446...| 100.0|\n",
"+--------------------+----------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"# Only includes sessions within the past 30 days\n",
"session_counts = activity_stream_stats_daily_df \\\n",
" .where(col(\"date\") > date_sub(current_date(), 30)) \\\n",
" .groupBy(\"client_id\").count() \\\n",
" .select(col(\"client_id\").alias(\"client_id1\"), col(\"count\").alias(\"count1\"))\n",
" \n",
"long_session_counts = activity_stream_stats_daily_df \\\n",
" .where(col(\"date\") > date_sub(current_date(), 30)) \\\n",
" .where(col(\"session_duration\") >= 5500) \\\n",
" .groupBy(\"client_id\").count() \\\n",
" .select(col(\"client_id\").alias(\"client_id2\"), col(\"count\").alias(\"count2\"))\n",
"\n",
"session_duration_index = session_counts \\\n",
" .join(long_session_counts, session_counts.client_id1 == long_session_counts.client_id2) \\\n",
" .select(session_counts.client_id1, \\\n",
" (long_session_counts.count2 / session_counts.count1 * 100).alias(\"session_duration_index\")) \\\n",
" .orderBy(desc(\"session_duration_index\"))\n",
"\n",
"session_duration_index = session_duration_index.select(col(\"client_id1\").alias(\"client_id\"), \"session_duration_index\")\n",
"\n",
"session_duration_index.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Average Interaction Count"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+------------------+\n",
"| client_id| avg_interactions|\n",
"+--------------------+------------------+\n",
"|09ad4eb8-8e45-4f2...| 9.725|\n",
"|0a016923-fd82-439...|1.6842105263157894|\n",
"|0b6bef36-baf6-46e...| 2.586206896551724|\n",
"|1712a92f-5489-304...| 1.0|\n",
"|17c7d60b-0b4e-44b...| 1.0|\n",
"|2465b3c3-1827-49e...| 1.0|\n",
"|3d5bab68-b285-473...|2.0303030303030303|\n",
"|4a49a612-d836-491...| 2.0|\n",
"|4aa2baf6-7277-433...| 2.0|\n",
"|61e55a37-1885-4f4...|2.2222222222222223|\n",
"|7a13ecb8-078d-4e8...|2.3333333333333335|\n",
"|7a17b649-1a78-46c...| 1.4|\n",
"|86a43b99-1dc5-46c...| 2.5|\n",
"|a2946180-cd03-584...| 1.6875|\n",
"|b1888ec7-2112-42c...| 1.0|\n",
"|bdd2523b-f730-4a2...| 1.0|\n",
"|cab7dfc7-6fb1-454...| 1.0|\n",
"|cfc203b1-b9f2-a64...| 1.0|\n",
"|d97ee8c8-43e8-418...|1.6153846153846154|\n",
"|e9d57a52-27aa-464...|1.8888888888888888|\n",
"+--------------------+------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"# Compute avg interactions for each client_id\n",
"avg_interactions = interactions_per_user_per_day_df \\\n",
" .groupBy(\"client_id\") \\\n",
" .avg(\"count\") \\\n",
" .select(col(\"client_id\"), col(\"avg(count)\").alias(\"avg_interactions\"))\n",
"avg_interactions.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate Engaged User Features Table"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"| client_id| loyalty| recency_index| interaction_index|session_duration_index| avg_interactions|neg_interaction_index|\n",
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"|0653bd05-1463-450...| 96.0| 75.67460317460318| 27.77777777777778| 90.25974025974025| 2.642857142857143| 0.0|\n",
"|09ad4eb8-8e45-4f2...|98.50746268656717| 84.88005050505049| 57.3170731707317| 81.32183908045977| 9.725| 0.34843205574912894|\n",
"|12a4cd26-1ee3-4f3...|98.41269841269842| 88.8888888888889| 35.12974051896208| 68.26568265682657| 4.066666666666666| 0.5988023952095809|\n",
"|1403919f-ca6a-104...|96.96969696969697| 97.2972972972973| 64.26116838487972| 57.74647887323944| 5.27027027027027| 1.718213058419244|\n",
"|1b7f23c1-329e-4d5...|94.11764705882352| 68.41036414565826|10.615711252653929| 44.0| 2.764705882352941| 0.0|\n",
"|2ad75bfd-bfc5-46b...|98.85057471264368| 98.36065573770492| 84.57888493475683| 91.49888143176734|14.114754098360656| 0.11862396204033215|\n",
"|2caa3ee3-545c-4cd...| 75.0| 58.47222222222223| 84.48275862068965| 92.3076923076923|2.7916666666666665| 0.0|\n",
"|348d742b-f8d2-4fb...|98.76543209876543| 62.16666666666666| 2.705882352941176| 42.095238095238095| 2.433333333333333| 0.1176470588235294|\n",
"|389b289d-05e9-4f6...|97.14285714285714| 72.60683760683759|4.3478260869565215| 62.4|1.7179487179487178| 0.37267080745341613|\n",
"|38a89040-b377-43d...| 80.0|53.333333333333336| 60.71428571428571| 44.0|1.7619047619047619| 1.7857142857142856|\n",
"|3d5bab68-b285-473...|85.71428571428572| 70.988455988456| 38.84297520661157| 55.434782608695656|2.0303030303030303| 0.0|\n",
"|4142e8b7-2a78-47a...|91.66666666666666| 40.64814814814815| 19.0| 73.68421052631578|1.8333333333333333| 1.0|\n",
"|41e79e54-a77e-482...|99.23076923076923| 59.27083333333334| 2.913533834586466| 51.03448275862069| 2.25| 0.09398496240601503|\n",
"|4405c630-fe3c-43c...|98.38709677419355| 89.42528735632183| 36.19246861924686| 72.45901639344262|3.9655172413793105| 0.0|\n",
"|4cbf7f71-dfac-44d...|97.36842105263158| 97.91666666666666| 82.88288288288288| 69.58762886597938|10.291666666666666| 1.2612612612612613|\n",
"|612f02dc-eb7a-4c6...|94.11764705882352| 78.66666666666667| 45.13274336283185| 92.94117647058823| 2.76| 5.3097345132743365|\n",
"|64ede5a3-7914-49c...|97.43589743589743| 94.87179487179486| 38.83495145631068| 76.96629213483146| 5.553846153846154| 0.0|\n",
"|6a38334e-2cf2-4b6...|98.63013698630137| 78.04347826086956| 6.557377049180328| 44.166666666666664| 14.91304347826087| 6.557377049180328|\n",
"|762eb5b1-bca9-4ef...|91.66666666666666| 75.0| 84.40366972477065| 54.43037974683544| 4.0| 0.0|\n",
"|8cd05e12-61f6-45e...| 80.0| 66.61764705882354| 88.40579710144928| 81.81818181818183|2.2058823529411766| 0.0|\n",
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"only showing top 20 rows\n",
"\n",
"4686\n"
]
}
],
"source": [
"# Merge with recency index\n",
"engaged_merged = engaged_users \\\n",
" .join(recency_index, engaged_users.client_id == recency_index.client_id) \\\n",
" .drop(engaged_users.client_id) \\\n",
"\n",
"# Merge with loyalty index\n",
"engaged_merged = engaged_merged \\\n",
" .join(loyalty_index, engaged_merged.client_id == loyalty_index.client_id, \"outer\") \\\n",
" .select(engaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'loyalty': 0})\n",
"\n",
"# Merge with interaction index\n",
"engaged_merged = engaged_merged \\\n",
" .join(interaction_index, engaged_merged.client_id == interaction_index.client_id, \"outer\") \\\n",
" .select(engaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'interaction_index': 0})\n",
" \n",
"# Merge with session duration index\n",
"engaged_merged = engaged_merged \\\n",
" .join(session_duration_index, engaged_merged.client_id == session_duration_index.client_id, \"outer\") \\\n",
" .select(engaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'session_duration_index': 0})\n",
"\n",
"# Merge with avg interactions\n",
"engaged_merged = engaged_merged \\\n",
" .join(avg_interactions, engaged_merged.client_id == avg_interactions.client_id, \"outer\") \\\n",
" .select(engaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\", \"avg_interactions\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'avg_interactions': 0})\n",
" \n",
"# Merge with neg interaction index\n",
"engaged_merged = engaged_merged \\\n",
" .join(neg_interaction_index, engaged_merged.client_id == neg_interaction_index.client_id, \"outer\") \\\n",
" .select(engaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\", \"avg_interactions\", \"neg_interaction_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'neg_interaction_index': 0})\n",
" \n",
" \n",
"engaged_merged.show()\n",
"print engaged_merged.count()"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Row(avg(loyalty)=89.21253148964226)]\n",
"[Row(avg(recency_index)=73.92858847678862)]\n",
"[Row(avg(interaction_index)=36.34412466080004)]\n",
"[Row(avg(neg_interaction_index)=0.9039897561336679)]\n",
"[Row(avg(session_duration_index)=64.46732204561148)]\n"
]
}
],
"source": [
"print engaged_merged.agg(avg(col(\"loyalty\"))).take(1)\n",
"print engaged_merged.agg(avg(col(\"recency_index\"))).take(1)\n",
"print engaged_merged.agg(avg(col(\"interaction_index\"))).take(1)\n",
"print engaged_merged.agg(avg(col(\"neg_interaction_index\"))).take(1)\n",
"print engaged_merged.agg(avg(col(\"session_duration_index\"))).take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate Unengaged User Features Table"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"| client_id| loyalty| recency_index| interaction_index|session_duration_index| avg_interactions|neg_interaction_index|\n",
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"|03fd18b7-cfcb-4cd...| 0.0| 61.23188405797102| 59.64912280701754| 96.7741935483871| 2.608695652173913| 12.280701754385964|\n",
"|0a016923-fd82-439...|98.11320754716981| 64.8893065998329| 4.703476482617587| 79.92700729927007|1.6842105263157894| 0.0|\n",
"|0b6bef36-baf6-46e...|92.85714285714286|62.523510971786834|31.818181818181817| 34.84848484848485| 2.586206896551724| 0.0|\n",
"|1712a92f-5489-304...| 0.0| 4.166666666666666| 12.5| 100.0| 1.0| 0.0|\n",
"|1c6e939d-8b47-44b...|98.63013698630137| 31.02272727272728| 3.206997084548105| 52.118644067796616|2.1818181818181817| 0.2915451895043732|\n",
"|1d8256ec-7342-43b...| 80.0|50.829725829725824| 53.84615384615385| 76.92307692307693| 2.227272727272727| 0.0|\n",
"|1d8a5a4a-70a5-445...|95.65217391304348| 3.421052631578948|0.3225806451612903| 45.664739884393065| 1.0| 0.3225806451612903|\n",
"|1ed0b7f2-4f8b-483...|97.95918367346938|0.8771929824561403|0.2652519893899204| 53.159851301115246| 1.5| 0.0|\n",
"|2098bfd0-40c7-764...| 0.0| 3.341687552213868|1.2422360248447204| 61.53846153846154|2.3333333333333335| 0.0|\n",
"|2293bf9f-ee57-4cb...| 0.0|49.999999999999986| 45.3125| 86.11111111111111|1.7222222222222223| 0.0|\n",
"|2363c2cc-7f94-467...| 0.0| 0.0| 0.0| 12.121212121212121| 1.0| 0.0|\n",
"|2e004da8-945b-402...| 50.0| 27.20797720797721| 50.0| 100.0|1.3333333333333333| 0.0|\n",
"|2f586e36-2df5-43a...|95.83333333333334| 53.32451499118166|2.3529411764705883| 47.31182795698925| 4.0| 2.3529411764705883|\n",
"|2ff6cb1c-fabe-4e2...| 0.0| 43.6026936026936| 2.564102564102564| 68.55123674911661|1.4444444444444444| 0.0|\n",
"|38745a43-31fc-804...| 96.0|12.895768833849331|3.1088082901554404| 64.22764227642277| 1.2| 0.0|\n",
"|3a6c14f4-2f83-4d2...| 0.0|11.527777777777779| 10.0| 62.5| 2.0| 0.0|\n",
"|3ad43720-f5b8-4bf...|97.22222222222221|14.444444444444446|0.5405405405405406| 59.58549222797927| 1.0| 0.2702702702702703|\n",
"|4aa2baf6-7277-433...| 0.0| 0.0| 100.0| 50.0| 2.0| 0.0|\n",
"|55080658-c5b8-404...| 0.0| 2.380952380952381|15.384615384615385| 88.88888888888889| 1.0| 0.0|\n",
"|55c3e4ca-a87f-411...| 0.0| 48.98989898989899| 62.16216216216216| 87.5|2.4615384615384617| 0.0|\n",
"+--------------------+-----------------+------------------+------------------+----------------------+------------------+---------------------+\n",
"only showing top 20 rows\n",
"\n",
"13506\n"
]
}
],
"source": [
"# Merge with recency index\n",
"unengaged_merged = unengaged_users \\\n",
" .join(recency_index, unengaged_users.client_id == recency_index.client_id) \\\n",
" .drop(unengaged_users.client_id)\n",
"\n",
"# Merge with loyalty index\n",
"unengaged_merged = unengaged_merged \\\n",
" .join(loyalty_index, unengaged_merged.client_id == loyalty_index.client_id, \"outer\") \\\n",
" .select(unengaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'loyalty': 0})\n",
"\n",
"# Merge with interaction index\n",
"unengaged_merged = unengaged_merged \\\n",
" .join(interaction_index, unengaged_merged.client_id == interaction_index.client_id, \"outer\") \\\n",
" .select(unengaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'interaction_index': 0})\n",
" \n",
"# Merge with session duration index\n",
"unengaged_merged = unengaged_merged \\\n",
" .join(session_duration_index, unengaged_merged.client_id == session_duration_index.client_id, \"outer\") \\\n",
" .select(unengaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'session_duration_index': 0})\n",
"\n",
"# Merge with avg interactions\n",
"unengaged_merged = unengaged_merged \\\n",
" .join(avg_interactions, unengaged_merged.client_id == avg_interactions.client_id, \"outer\") \\\n",
" .select(unengaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\", \"avg_interactions\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'avg_interactions': 0})\n",
" \n",
" \n",
"# Merge with neg interaction index\n",
"unengaged_merged = unengaged_merged \\\n",
" .join(neg_interaction_index, unengaged_merged.client_id == neg_interaction_index.client_id, \"outer\") \\\n",
" .select(unengaged_merged.client_id.alias(\"client_id\"), \"loyalty\", \"recency_index\", \"interaction_index\", \"session_duration_index\", \"avg_interactions\", \"neg_interaction_index\") \\\n",
" .na.drop('any', None, [\"client_id\"]) \\\n",
" .na.fill({'neg_interaction_index': 0})\n",
" \n",
"unengaged_merged.show()\n",
"print unengaged_merged.count()"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Row(avg(loyalty)=50.040966179128525)]\n",
"[Row(avg(recency_index)=33.92463385332862)]\n",
"[Row(avg(interaction_index)=19.640492853593006)]\n",
"[Row(avg(neg_interaction_index)=0.8667846971471196)]\n",
"[Row(avg(session_duration_index)=61.39640348508209)]\n"
]
}
],
"source": [
"print unengaged_merged.agg(avg(col(\"loyalty\"))).take(1)\n",
"print unengaged_merged.agg(avg(col(\"recency_index\"))).take(1)\n",
"print unengaged_merged.agg(avg(col(\"interaction_index\"))).take(1)\n",
"print unengaged_merged.agg(avg(col(\"neg_interaction_index\"))).take(1)\n",
"print unengaged_merged.agg(avg(col(\"session_duration_index\"))).take(1)"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"645\n",
"4041\n"
]
}
],
"source": [
"engaged_subset = engaged_merged.randomSplit([1.0, 6.0])\n",
"print engaged_subset[0].count()\n",
"print engaged_subset[1].count()"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3868\n",
"9638\n"
]
}
],
"source": [
"unengaged_subset = unengaged_merged.randomSplit([8.0, 20.0])\n",
"print unengaged_subset[0].count()\n",
"print unengaged_subset[1].count()"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"engaged_data = []\n",
"unengaged_data = []\n",
"\n",
"for row in engaged_subset[1].collect():\n",
" engaged_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))\n",
"\n",
"for row in unengaged_subset[0].collect():\n",
" unengaged_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"engaged_test_data = []\n",
"unengaged_test_data = []\n",
"\n",
"for row in engaged_subset[0].collect():\n",
" engaged_test_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))\n",
"\n",
"for row in unengaged_subset[1].collect():\n",
" unengaged_test_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train an SVM"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"data = sc.parallelize(engaged_data + unengaged_data)\n",
"mcm = SVMWithSGD.train(data, iterations=100, regParam=0.001)"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Engaged Accuracy: 0.458914728682\n",
"Unengaged Accuracy: 0.865532268105\n",
"Total Accuracy: 0.840027229408\n",
"Class 0.0 precision = 0.185929648241\n",
"Class 0.0 recall = 0.458914728682\n",
"Class 1.0 precision = 0.959843516281\n",
"Class 1.0 recall = 0.865532268105\n"
]
}
],
"source": [
"score_and_labels = []\n",
"\n",
"test_correct_engaged = 0\n",
"for item in engaged_test_data:\n",
" prediction = mcm.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_engaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Engaged Accuracy: \" + str(test_correct_engaged / float(len(engaged_test_data)))\n",
"\n",
"test_correct_unengaged = 0\n",
"for item in unengaged_test_data:\n",
" prediction = mcm.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_unengaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Unengaged Accuracy: \" + str(test_correct_unengaged / float(len(unengaged_test_data)))\n",
"\n",
"print \"Total Accuracy: \" + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))\n",
"\n",
"metrics = MulticlassMetrics(sc.parallelize(score_and_labels))\n",
"\n",
"# Statistics by class\n",
"labels = [float(1), float(0)]\n",
"for label in sorted(labels):\n",
" print(\"Class %s precision = %s\" % (label, metrics.precision(label)))\n",
" print(\"Class %s recall = %s\" % (label, metrics.recall(label)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train Logistic Regression"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"lr = LogisticRegressionWithSGD.train(data)"
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Engaged Accuracy: 0.996899224806\n",
"Unengaged Accuracy: 0.353807843951\n",
"Total Accuracy: 0.394145677332\n",
"Class 0.0 precision = 0.0935817202736\n",
"Class 0.0 recall = 0.996899224806\n",
"Class 1.0 precision = 0.999413833529\n",
"Class 1.0 recall = 0.353807843951\n"
]
}
],
"source": [
"score_and_labels = []\n",
"\n",
"test_correct_engaged = 0\n",
"for item in engaged_test_data:\n",
" prediction = lr.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_engaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Engaged Accuracy: \" + str(test_correct_engaged / float(len(engaged_test_data)))\n",
"\n",
"test_correct_unengaged = 0\n",
"for item in unengaged_test_data:\n",
" prediction = lr.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_unengaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Unengaged Accuracy: \" + str(test_correct_unengaged / float(len(unengaged_test_data)))\n",
"\n",
"print \"Total Accuracy: \" + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))\n",
"\n",
"metrics = MulticlassMetrics(sc.parallelize(score_and_labels))\n",
"\n",
"# Statistics by class\n",
"labels = [float(1), float(0)]\n",
"for label in sorted(labels):\n",
" print(\"Class %s precision = %s\" % (label, metrics.precision(label)))\n",
" print(\"Class %s recall = %s\" % (label, metrics.recall(label)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dt = DecisionTree.trainClassifier(data, 2, {})"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Engaged Accuracy: 0.902325581395\n",
"Unengaged Accuracy: 0.817908279726\n",
"Total Accuracy: 0.823203345327\n",
"Class 0.0 precision = 0.249037227214\n",
"Class 0.0 recall = 0.902325581395\n",
"Class 1.0 precision = 0.992071482507\n",
"Class 1.0 recall = 0.817908279726\n"
]
}
],
"source": [
"score_and_labels = []\n",
"\n",
"test_correct_engaged = 0\n",
"for item in engaged_test_data:\n",
" prediction = dt.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_engaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Engaged Accuracy: \" + str(test_correct_engaged / float(len(engaged_test_data)))\n",
"\n",
"test_correct_unengaged = 0\n",
"for item in unengaged_test_data:\n",
" prediction = dt.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_unengaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Unengaged Accuracy: \" + str(test_correct_unengaged / float(len(unengaged_test_data)))\n",
"\n",
"print \"Total Accuracy: \" + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))\n",
"\n",
"metrics = MulticlassMetrics(sc.parallelize(score_and_labels))\n",
"\n",
"# Statistics by class\n",
"labels = [float(1), float(0)]\n",
"for label in sorted(labels):\n",
" print(\"Class %s precision = %s\" % (label, metrics.precision(label)))\n",
" print(\"Class %s recall = %s\" % (label, metrics.recall(label)))"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"rf = RandomForest.trainClassifier(data, 2, {}, numTrees=1000)"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Engaged Accuracy: 0.886821705426\n",
"Unengaged Accuracy: 0.843328491388\n",
"Total Accuracy: 0.846056598269\n",
"Class 0.0 precision = 0.274735830932\n",
"Class 0.0 recall = 0.886821705426\n",
"Class 1.0 precision = 0.991098646507\n",
"Class 1.0 recall = 0.843328491388\n"
]
}
],
"source": [
"score_and_labels = []\n",
"\n",
"test_correct_engaged = 0\n",
"for item in engaged_test_data:\n",
" prediction = rf.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_engaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Engaged Accuracy: \" + str(test_correct_engaged / float(len(engaged_test_data)))\n",
"\n",
"test_correct_unengaged = 0\n",
"for item in unengaged_test_data:\n",
" prediction = rf.predict(item.features)\n",
" if prediction == item.label:\n",
" test_correct_unengaged += 1\n",
" score_and_labels.append((float(prediction), item.label))\n",
"\n",
"print \"Unengaged Accuracy: \" + str(test_correct_unengaged / float(len(unengaged_test_data)))\n",
"\n",
"print \"Total Accuracy: \" + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))\n",
"\n",
"metrics = MulticlassMetrics(sc.parallelize(score_and_labels))\n",
"\n",
"# Statistics by class\n",
"labels = [float(1), float(0)]\n",
"for label in sorted(labels):\n",
" print(\"Class %s precision = %s\" % (label, metrics.precision(label)))\n",
" print(\"Class %s recall = %s\" % (label, metrics.recall(label)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Let's see if there is a correlation between average number of interactions and the various engagement metrics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"recency_index\", \"avg_interactions\")\n",
"print unengaged_merged.corr(\"recency_index\", \"avg_interactions\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"loyalty\", \"avg_interactions\")\n",
"print unengaged_merged.corr(\"loyalty\", \"avg_interactions\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"session_duration_index\", \"avg_interactions\")\n",
"print unengaged_merged.corr(\"session_duration_index\", \"avg_interactions\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"interaction_index\", \"avg_interactions\")\n",
"print unengaged_merged.corr(\"interaction_index\", \"avg_interactions\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"interaction_index\", \"loyalty\")\n",
"print unengaged_merged.corr(\"interaction_index\", \"loyalty\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"interaction_index\", \"recency_index\")\n",
"print unengaged_merged.corr(\"interaction_index\", \"recency_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"interaction_index\", \"session_duration_index\")\n",
"print unengaged_merged.corr(\"interaction_index\", \"session_duration_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"interaction_index\", \"neg_interaction_index\")\n",
"print unengaged_merged.corr(\"interaction_index\", \"neg_interaction_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"loyalty\", \"recency_index\")\n",
"print unengaged_merged.corr(\"loyalty\", \"recency_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"loyalty\", \"session_duration_index\")\n",
"print unengaged_merged.corr(\"loyalty\", \"session_duration_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"loyalty\", \"neg_interaction_index\")\n",
"print unengaged_merged.corr(\"loyalty\", \"neg_interaction_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"recency_index\", \"session_duration_index\")\n",
"print unengaged_merged.corr(\"recency_index\", \"session_duration_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"recency_index\", \"neg_interaction_index\")\n",
"print unengaged_merged.corr(\"recency_index\", \"neg_interaction_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print engaged_merged.corr(\"session_duration_index\", \"neg_interaction_index\")\n",
"print unengaged_merged.corr(\"session_duration_index\", \"neg_interaction_index\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# coding: utf-8
# # Engagement Features
# In[55]:
import datetime
import numpy as np
import pandas as pd
import plotly.plotly as py
import plotly.graph_objs as go
from pyspark.sql.types import *
from datetime import date, timedelta
from pyspark.mllib.stat import Statistics
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.functions import col, count
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, RandomForest, RandomForestModel
from pyspark.mllib.classification import *
from pyspark.mllib.evaluation import MulticlassMetrics
# ## Import Activity Stream Tables
# In[3]:
activity_stream_events_daily_url = "activity_stream_events_daily.csv"
activity_stream_stats_daily_url = "activity_stream_stats_daily.csv"
# In[4]:
pandas_events = pd.read_csv(activity_stream_events_daily_url, ",")
pandas_stats = pd.read_csv(activity_stream_stats_daily_url, ",")
# In[5]:
integer_types = ["max_scroll_depth", "load_latency", "total_bookmarks", "total_history_size", "session_duration"]
events_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_events.columns]
stats_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_stats.columns]
events_schema = StructType(events_fields)
stats_schema = StructType(stats_fields)
# In[7]:
activity_stream_events_daily_df = sqlContext.createDataFrame(pandas_events, schema=events_schema)
activity_stream_stats_daily_df = sqlContext.createDataFrame(pandas_stats, schema=stats_schema)
# In[8]:
sqlContext.registerDataFrameAsTable(activity_stream_events_daily_df, "activity_stream_events_daily")
sqlContext.registerDataFrameAsTable(activity_stream_stats_daily_df, "activity_stream_stats_daily")
# ## Identify "Engaged" Users - Returning at Least Once Per Week Over the Past Four Weeks
# In[9]:
# Find how many days ago our earliest datapoint is.
earliest_date = activity_stream_events_daily_df .select("date") .orderBy("date").first()[0]
earliest_datetime = datetime.datetime.strptime(earliest_date, "%Y-%m-%d")
days_ago = (datetime.datetime.today() - earliest_datetime).days
# In[10]:
# Create a dataframe of all the dates between now and days_ago
base = datetime.datetime.today()
date_list = [(datetime.datetime.today() - datetime.timedelta(days=x)).date() for x in range(0, days_ago)]
pandas_df = pd.DataFrame(date_list, columns=["date"])
date_list_df = sqlContext.createDataFrame(pandas_df)
sqlContext.registerDataFrameAsTable(date_list_df, "date_list")
# In[11]:
# Map each client to all possible dates
client_list_df = activity_stream_events_daily_df.select("client_id").distinct()
all_dates_clients_df = date_list_df.join(client_list_df)
sqlContext.registerDataFrameAsTable(all_dates_clients_df, "all_dates_clients")
# In[12]:
# Create a table of user interactions per day
interactions_per_user_per_day_df = activity_stream_events_daily_df .select("client_id", "date") .groupBy("client_id", "date") .count().orderBy("client_id", "date")
interactions_per_user_per_day_df = interactions_per_user_per_day_df.withColumn("date", col("date").cast("date"))
sqlContext.registerDataFrameAsTable(interactions_per_user_per_day_df, "interactions_per_user_per_day")
# In[13]:
interactions_per_user_per_day_df.show()
# In[14]:
all_users_dates_counts = sqlContext.sql(
"SELECT all_dates_clients.client_id, all_dates_clients.date, COALESCE(interactions_per_user_per_day.count, 0) AS count " +
"FROM all_dates_clients " +
"LEFT JOIN interactions_per_user_per_day " +
"ON all_dates_clients.client_id = interactions_per_user_per_day.client_id " +
"AND all_dates_clients.date = interactions_per_user_per_day.date " +
"ORDER BY client_id, date")
# In[15]:
all_users_dates_counts.cache()
# In[16]:
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
w = (Window()
.partitionBy(col("client_id"))
.orderBy(col("date").cast("timestamp").cast("long"))
.rangeBetween(-days(6), 0))
weekly_avgs = all_users_dates_counts.select(col("client_id"),
col("date").cast("timestamp"),
col("count"),
mean("count").over(w).alias("week_avg"))
# In[17]:
weekly_avgs.show()
# In[18]:
# The number of users who return at least once per week over the past 3 weeks
engaged_users = weekly_avgs .filter(col("date") >= date_sub(current_date(), 21)) .filter(col("date") <= date_sub(current_date(), 0)) .where(col("week_avg") > 0) .select("client_id") .groupBy("client_id") .count() .where(col("count") == 21) .select("client_id").distinct()
engaged_users.count()
# In[19]:
unengaged_users = activity_stream_stats_daily_df .where(col("date") >= date_sub(current_date(), 21)) .select("client_id").distinct() .subtract(engaged_users)
unengaged_users.count()
# In[21]:
dates = []
engaged = []
unengaged = []
for i in xrange(8):
engaged_users = weekly_avgs .filter(col("date") >= date_sub(current_date(), 7 * i + 21)) .filter(col("date") <= date_sub(current_date(), 7 * i)) .where(col("week_avg") > 0) .select("client_id") .groupBy("client_id") .count() .where(col("count") == 21) .select("client_id").distinct()
unengaged_users = activity_stream_stats_daily_df .filter(col("date") >= date_sub(current_date(), 7 * i + 21)) .filter(col("date") <= date_sub(current_date(), 7 * i)) .select("client_id").distinct() .subtract(engaged_users)
end_of_week = date.today() - timedelta(days=7 * i)
print end_of_week
print "Engaged Users: " + str(engaged_users.count())
print "Unengaged Users: " + str(unengaged_users.count())
print "Total Users: " + str(engaged_users.count() + unengaged_users.count())
dates.append(end_of_week)
engaged.append(engaged_users.count())
unengaged.append(unengaged_users.count())
# In[22]:
import plotly.plotly as py
import plotly.graph_objs as go
trace1 = go.Bar(
x=dates,
y=engaged,
name='Engaged Users'
)
trace2 = go.Bar(
x=dates,
y=unengaged,
name='Unengaged Users'
)
data = [trace1, trace2]
layout = go.Layout(
barmode='stack',
xaxis=dict(
title='3-Week Period End Date',
titlefont=dict(
family='Courier New, monospace',
size=18,
color='#7f7f7f'
)
),
yaxis=dict(
title='Number of Users',
titlefont=dict(
family='Courier New, monospace',
size=18,
color='#7f7f7f'
)
)
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='stacked-bar')
# # Aggregating the Various Component Metrics
# ## Loyalty Index
# In[20]:
# Users who haven't used activity stream at all in the past 7 days have a loyalty index of 0.
loyalty_index = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 7)) .select("client_id") .groupBy("client_id").count() .select("client_id", ((1 - (1 / col("count"))) * 100).alias("loyalty")) .orderBy(desc("loyalty"))
loyalty_index.show()
loyalty_index.count()
# ## Block and Delete Rate
# In[21]:
# Only includes sessions where session_id exists
events = activity_stream_events_daily_df .where((col("event") == "DELETE") | (col("event") == "BLOCK"))
neg_interaction_rate = activity_stream_stats_daily_df .where(col("session_id") != 'n/a') .where(col("session_id") != 'NaN') .select("client_id", "session_id") .join(events, activity_stream_stats_daily_df.session_id == events.session_id, "outer") .select(activity_stream_stats_daily_df.client_id.alias("stats_client_id"), activity_stream_stats_daily_df.session_id.alias("stats_session_id"), events.client_id.alias("events_client_id"), events.session_id.alias("events_session_id"))
neg_interaction_index = neg_interaction_rate.groupBy("stats_client_id") .agg((countDistinct(neg_interaction_rate.events_session_id) / countDistinct(neg_interaction_rate.stats_session_id) * 100).alias("neg_interaction_index")) .na.drop().orderBy(desc("neg_interaction_index")) .select(col("stats_client_id").alias("client_id"), "neg_interaction_index")
neg_interaction_index.show()
# ## Interaction Rate
# In[22]:
# Only includes sessions where session_id exists
positive_events = activity_stream_events_daily_df .where(col("event") != "DELETE") .where(col("event") != "BLOCK")
interaction_rate = activity_stream_stats_daily_df .where(col("session_id") != 'n/a') .where(col("session_id") != 'NaN') .select("client_id", "session_id") .join(positive_events, activity_stream_stats_daily_df.session_id == positive_events.session_id, "outer") .select(activity_stream_stats_daily_df.client_id.alias("stats_client_id"), activity_stream_stats_daily_df.session_id.alias("stats_session_id"), positive_events.client_id.alias("events_client_id"), positive_events.session_id.alias("events_session_id"))
interaction_index = interaction_rate.groupBy("stats_client_id") .agg((countDistinct(interaction_rate.events_session_id) / countDistinct(interaction_rate.stats_session_id) * 100).alias("interaction_index")) .na.drop().orderBy(desc("interaction_index")) .select(col("stats_client_id").alias("client_id"), "interaction_index")
interaction_index.show()
# ## Recency Index
# In[23]:
wSpec = Window .partitionBy(col("client_id")) .orderBy(desc("date"))
recency_index = activity_stream_events_daily_df .select("client_id", "date", coalesce(1 / datediff(col("date"), lead("date").over(wSpec)), lit(0.0)).alias("recency_index")) .groupBy("client_id", "date").max("recency_index").alias("recency_index") .orderBy("client_id", "date") .groupBy("client_id").avg("max(recency_index)").select("*")
recency_index = recency_index .select("client_id", (col(recency_index.columns[1]) * 100).alias("recency_index")) .orderBy(desc("recency_index"))
recency_index.show()
recency_index.count()
# ## Session Duration Index
# In[24]:
# Only includes sessions within the past 30 days
session_counts = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 30)) .groupBy("client_id").count() .select(col("client_id").alias("client_id1"), col("count").alias("count1"))
long_session_counts = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 30)) .where(col("session_duration") >= 5500) .groupBy("client_id").count() .select(col("client_id").alias("client_id2"), col("count").alias("count2"))
session_duration_index = session_counts .join(long_session_counts, session_counts.client_id1 == long_session_counts.client_id2) .select(session_counts.client_id1, (long_session_counts.count2 / session_counts.count1 * 100).alias("session_duration_index")) .orderBy(desc("session_duration_index"))
session_duration_index = session_duration_index.select(col("client_id1").alias("client_id"), "session_duration_index")
session_duration_index.show()
# ## Average Interaction Count
# In[25]:
# Compute avg interactions for each client_id
avg_interactions = interactions_per_user_per_day_df .groupBy("client_id") .avg("count") .select(col("client_id"), col("avg(count)").alias("avg_interactions"))
avg_interactions.show()
# ## Generate Engaged User Features Table
# In[26]:
# Merge with recency index
engaged_merged = engaged_users .join(recency_index, engaged_users.client_id == recency_index.client_id) .drop(engaged_users.client_id)
# Merge with loyalty index
engaged_merged = engaged_merged .join(loyalty_index, engaged_merged.client_id == loyalty_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index") .na.drop('any', None, ["client_id"]) .na.fill({'loyalty': 0})
# Merge with interaction index
engaged_merged = engaged_merged .join(interaction_index, engaged_merged.client_id == interaction_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'interaction_index': 0})
# Merge with session duration index
engaged_merged = engaged_merged .join(session_duration_index, engaged_merged.client_id == session_duration_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index") .na.drop('any', None, ["client_id"]) .na.fill({'session_duration_index': 0})
# Merge with avg interactions
engaged_merged = engaged_merged .join(avg_interactions, engaged_merged.client_id == avg_interactions.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions") .na.drop('any', None, ["client_id"]) .na.fill({'avg_interactions': 0})
# Merge with neg interaction index
engaged_merged = engaged_merged .join(neg_interaction_index, engaged_merged.client_id == neg_interaction_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions", "neg_interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'neg_interaction_index': 0})
engaged_merged.show()
print engaged_merged.count()
# In[71]:
print engaged_merged.agg(avg(col("loyalty"))).take(1)
print engaged_merged.agg(avg(col("recency_index"))).take(1)
print engaged_merged.agg(avg(col("interaction_index"))).take(1)
print engaged_merged.agg(avg(col("neg_interaction_index"))).take(1)
print engaged_merged.agg(avg(col("session_duration_index"))).take(1)
# ## Generate Unengaged User Features Table
# In[27]:
# Merge with recency index
unengaged_merged = unengaged_users .join(recency_index, unengaged_users.client_id == recency_index.client_id) .drop(unengaged_users.client_id)
# Merge with loyalty index
unengaged_merged = unengaged_merged .join(loyalty_index, unengaged_merged.client_id == loyalty_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index") .na.drop('any', None, ["client_id"]) .na.fill({'loyalty': 0})
# Merge with interaction index
unengaged_merged = unengaged_merged .join(interaction_index, unengaged_merged.client_id == interaction_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'interaction_index': 0})
# Merge with session duration index
unengaged_merged = unengaged_merged .join(session_duration_index, unengaged_merged.client_id == session_duration_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index") .na.drop('any', None, ["client_id"]) .na.fill({'session_duration_index': 0})
# Merge with avg interactions
unengaged_merged = unengaged_merged .join(avg_interactions, unengaged_merged.client_id == avg_interactions.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions") .na.drop('any', None, ["client_id"]) .na.fill({'avg_interactions': 0})
# Merge with neg interaction index
unengaged_merged = unengaged_merged .join(neg_interaction_index, unengaged_merged.client_id == neg_interaction_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions", "neg_interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'neg_interaction_index': 0})
unengaged_merged.show()
print unengaged_merged.count()
# In[72]:
print unengaged_merged.agg(avg(col("loyalty"))).take(1)
print unengaged_merged.agg(avg(col("recency_index"))).take(1)
print unengaged_merged.agg(avg(col("interaction_index"))).take(1)
print unengaged_merged.agg(avg(col("neg_interaction_index"))).take(1)
print unengaged_merged.agg(avg(col("session_duration_index"))).take(1)
# In[42]:
engaged_subset = engaged_merged.randomSplit([1.0, 6.0])
print engaged_subset[0].count()
print engaged_subset[1].count()
# In[43]:
unengaged_subset = unengaged_merged.randomSplit([8.0, 20.0])
print unengaged_subset[0].count()
print unengaged_subset[1].count()
# In[44]:
engaged_data = []
unengaged_data = []
for row in engaged_subset[1].collect():
engaged_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
for row in unengaged_subset[0].collect():
unengaged_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
# In[45]:
engaged_test_data = []
unengaged_test_data = []
for row in engaged_subset[0].collect():
engaged_test_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
for row in unengaged_subset[1].collect():
unengaged_test_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
# ## Train an SVM
# In[46]:
data = sc.parallelize(engaged_data + unengaged_data)
mcm = SVMWithSGD.train(data, iterations=100, regParam=0.001)
# In[47]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = mcm.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = mcm.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# ## Train Logistic Regression
# In[66]:
lr = LogisticRegressionWithSGD.train(data)
# In[67]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = lr.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = lr.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# In[ ]:
# In[50]:
dt = DecisionTree.trainClassifier(data, 2, {})
# In[51]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = dt.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = dt.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# In[63]:
rf = RandomForest.trainClassifier(data, 2, {}, numTrees=1000)
# In[64]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = rf.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = rf.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# ## Let's see if there is a correlation between average number of interactions and the various engagement metrics
# In[ ]:
print engaged_merged.corr("recency_index", "avg_interactions")
print unengaged_merged.corr("recency_index", "avg_interactions")
# In[ ]:
print engaged_merged.corr("loyalty", "avg_interactions")
print unengaged_merged.corr("loyalty", "avg_interactions")
# In[ ]:
print engaged_merged.corr("session_duration_index", "avg_interactions")
print unengaged_merged.corr("session_duration_index", "avg_interactions")
# In[ ]:
print engaged_merged.corr("interaction_index", "avg_interactions")
print unengaged_merged.corr("interaction_index", "avg_interactions")
# In[ ]:
# In[ ]:
print engaged_merged.corr("interaction_index", "loyalty")
print unengaged_merged.corr("interaction_index", "loyalty")
# In[ ]:
print engaged_merged.corr("interaction_index", "recency_index")
print unengaged_merged.corr("interaction_index", "recency_index")
# In[ ]:
print engaged_merged.corr("interaction_index", "session_duration_index")
print unengaged_merged.corr("interaction_index", "session_duration_index")
# In[ ]:
print engaged_merged.corr("interaction_index", "neg_interaction_index")
print unengaged_merged.corr("interaction_index", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("loyalty", "recency_index")
print unengaged_merged.corr("loyalty", "recency_index")
# In[ ]:
print engaged_merged.corr("loyalty", "session_duration_index")
print unengaged_merged.corr("loyalty", "session_duration_index")
# In[ ]:
print engaged_merged.corr("loyalty", "neg_interaction_index")
print unengaged_merged.corr("loyalty", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("recency_index", "session_duration_index")
print unengaged_merged.corr("recency_index", "session_duration_index")
# In[ ]:
print engaged_merged.corr("recency_index", "neg_interaction_index")
print unengaged_merged.corr("recency_index", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("session_duration_index", "neg_interaction_index")
print unengaged_merged.corr("session_duration_index", "neg_interaction_index")
# In[ ]:
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment