Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
engagement_features
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # Engagement Features
# In[55]:
import datetime
import numpy as np
import pandas as pd
import plotly.plotly as py
import plotly.graph_objs as go
from pyspark.sql.types import *
from datetime import date, timedelta
from pyspark.mllib.stat import Statistics
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.functions import col, count
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, RandomForest, RandomForestModel
from pyspark.mllib.classification import *
from pyspark.mllib.evaluation import MulticlassMetrics
# ## Import Activity Stream Tables
# In[3]:
activity_stream_events_daily_url = "activity_stream_events_daily.csv"
activity_stream_stats_daily_url = "activity_stream_stats_daily.csv"
# In[4]:
pandas_events = pd.read_csv(activity_stream_events_daily_url, ",")
pandas_stats = pd.read_csv(activity_stream_stats_daily_url, ",")
# In[5]:
integer_types = ["max_scroll_depth", "load_latency", "total_bookmarks", "total_history_size", "session_duration"]
events_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_events.columns]
stats_fields = [StructField(field_name, IntegerType(), True) if field_name in integer_types else StructField(field_name, StringType(), True) for field_name in pandas_stats.columns]
events_schema = StructType(events_fields)
stats_schema = StructType(stats_fields)
# In[7]:
activity_stream_events_daily_df = sqlContext.createDataFrame(pandas_events, schema=events_schema)
activity_stream_stats_daily_df = sqlContext.createDataFrame(pandas_stats, schema=stats_schema)
# In[8]:
sqlContext.registerDataFrameAsTable(activity_stream_events_daily_df, "activity_stream_events_daily")
sqlContext.registerDataFrameAsTable(activity_stream_stats_daily_df, "activity_stream_stats_daily")
# ## Identify "Engaged" Users - Returning at Least Once Per Week Over the Past Four Weeks
# In[9]:
# Find how many days ago our earliest datapoint is.
earliest_date = activity_stream_events_daily_df .select("date") .orderBy("date").first()[0]
earliest_datetime = datetime.datetime.strptime(earliest_date, "%Y-%m-%d")
days_ago = (datetime.datetime.today() - earliest_datetime).days
# In[10]:
# Create a dataframe of all the dates between now and days_ago
base = datetime.datetime.today()
date_list = [(datetime.datetime.today() - datetime.timedelta(days=x)).date() for x in range(0, days_ago)]
pandas_df = pd.DataFrame(date_list, columns=["date"])
date_list_df = sqlContext.createDataFrame(pandas_df)
sqlContext.registerDataFrameAsTable(date_list_df, "date_list")
# In[11]:
# Map each client to all possible dates
client_list_df = activity_stream_events_daily_df.select("client_id").distinct()
all_dates_clients_df = date_list_df.join(client_list_df)
sqlContext.registerDataFrameAsTable(all_dates_clients_df, "all_dates_clients")
# In[12]:
# Create a table of user interactions per day
interactions_per_user_per_day_df = activity_stream_events_daily_df .select("client_id", "date") .groupBy("client_id", "date") .count().orderBy("client_id", "date")
interactions_per_user_per_day_df = interactions_per_user_per_day_df.withColumn("date", col("date").cast("date"))
sqlContext.registerDataFrameAsTable(interactions_per_user_per_day_df, "interactions_per_user_per_day")
# In[13]:
interactions_per_user_per_day_df.show()
# In[14]:
all_users_dates_counts = sqlContext.sql(
"SELECT all_dates_clients.client_id, all_dates_clients.date, COALESCE(interactions_per_user_per_day.count, 0) AS count " +
"FROM all_dates_clients " +
"LEFT JOIN interactions_per_user_per_day " +
"ON all_dates_clients.client_id = interactions_per_user_per_day.client_id " +
"AND all_dates_clients.date = interactions_per_user_per_day.date " +
"ORDER BY client_id, date")
# In[15]:
all_users_dates_counts.cache()
# In[16]:
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
w = (Window()
.partitionBy(col("client_id"))
.orderBy(col("date").cast("timestamp").cast("long"))
.rangeBetween(-days(6), 0))
weekly_avgs = all_users_dates_counts.select(col("client_id"),
col("date").cast("timestamp"),
col("count"),
mean("count").over(w).alias("week_avg"))
# In[17]:
weekly_avgs.show()
# In[18]:
# The number of users who return at least once per week over the past 3 weeks
engaged_users = weekly_avgs .filter(col("date") >= date_sub(current_date(), 21)) .filter(col("date") <= date_sub(current_date(), 0)) .where(col("week_avg") > 0) .select("client_id") .groupBy("client_id") .count() .where(col("count") == 21) .select("client_id").distinct()
engaged_users.count()
# In[19]:
unengaged_users = activity_stream_stats_daily_df .where(col("date") >= date_sub(current_date(), 21)) .select("client_id").distinct() .subtract(engaged_users)
unengaged_users.count()
# In[21]:
dates = []
engaged = []
unengaged = []
for i in xrange(8):
engaged_users = weekly_avgs .filter(col("date") >= date_sub(current_date(), 7 * i + 21)) .filter(col("date") <= date_sub(current_date(), 7 * i)) .where(col("week_avg") > 0) .select("client_id") .groupBy("client_id") .count() .where(col("count") == 21) .select("client_id").distinct()
unengaged_users = activity_stream_stats_daily_df .filter(col("date") >= date_sub(current_date(), 7 * i + 21)) .filter(col("date") <= date_sub(current_date(), 7 * i)) .select("client_id").distinct() .subtract(engaged_users)
end_of_week = date.today() - timedelta(days=7 * i)
print end_of_week
print "Engaged Users: " + str(engaged_users.count())
print "Unengaged Users: " + str(unengaged_users.count())
print "Total Users: " + str(engaged_users.count() + unengaged_users.count())
dates.append(end_of_week)
engaged.append(engaged_users.count())
unengaged.append(unengaged_users.count())
# In[22]:
import plotly.plotly as py
import plotly.graph_objs as go
trace1 = go.Bar(
x=dates,
y=engaged,
name='Engaged Users'
)
trace2 = go.Bar(
x=dates,
y=unengaged,
name='Unengaged Users'
)
data = [trace1, trace2]
layout = go.Layout(
barmode='stack',
xaxis=dict(
title='3-Week Period End Date',
titlefont=dict(
family='Courier New, monospace',
size=18,
color='#7f7f7f'
)
),
yaxis=dict(
title='Number of Users',
titlefont=dict(
family='Courier New, monospace',
size=18,
color='#7f7f7f'
)
)
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='stacked-bar')
# # Aggregating the Various Component Metrics
# ## Loyalty Index
# In[20]:
# Users who haven't used activity stream at all in the past 7 days have a loyalty index of 0.
loyalty_index = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 7)) .select("client_id") .groupBy("client_id").count() .select("client_id", ((1 - (1 / col("count"))) * 100).alias("loyalty")) .orderBy(desc("loyalty"))
loyalty_index.show()
loyalty_index.count()
# ## Block and Delete Rate
# In[21]:
# Only includes sessions where session_id exists
events = activity_stream_events_daily_df .where((col("event") == "DELETE") | (col("event") == "BLOCK"))
neg_interaction_rate = activity_stream_stats_daily_df .where(col("session_id") != 'n/a') .where(col("session_id") != 'NaN') .select("client_id", "session_id") .join(events, activity_stream_stats_daily_df.session_id == events.session_id, "outer") .select(activity_stream_stats_daily_df.client_id.alias("stats_client_id"), activity_stream_stats_daily_df.session_id.alias("stats_session_id"), events.client_id.alias("events_client_id"), events.session_id.alias("events_session_id"))
neg_interaction_index = neg_interaction_rate.groupBy("stats_client_id") .agg((countDistinct(neg_interaction_rate.events_session_id) / countDistinct(neg_interaction_rate.stats_session_id) * 100).alias("neg_interaction_index")) .na.drop().orderBy(desc("neg_interaction_index")) .select(col("stats_client_id").alias("client_id"), "neg_interaction_index")
neg_interaction_index.show()
# ## Interaction Rate
# In[22]:
# Only includes sessions where session_id exists
positive_events = activity_stream_events_daily_df .where(col("event") != "DELETE") .where(col("event") != "BLOCK")
interaction_rate = activity_stream_stats_daily_df .where(col("session_id") != 'n/a') .where(col("session_id") != 'NaN') .select("client_id", "session_id") .join(positive_events, activity_stream_stats_daily_df.session_id == positive_events.session_id, "outer") .select(activity_stream_stats_daily_df.client_id.alias("stats_client_id"), activity_stream_stats_daily_df.session_id.alias("stats_session_id"), positive_events.client_id.alias("events_client_id"), positive_events.session_id.alias("events_session_id"))
interaction_index = interaction_rate.groupBy("stats_client_id") .agg((countDistinct(interaction_rate.events_session_id) / countDistinct(interaction_rate.stats_session_id) * 100).alias("interaction_index")) .na.drop().orderBy(desc("interaction_index")) .select(col("stats_client_id").alias("client_id"), "interaction_index")
interaction_index.show()
# ## Recency Index
# In[23]:
wSpec = Window .partitionBy(col("client_id")) .orderBy(desc("date"))
recency_index = activity_stream_events_daily_df .select("client_id", "date", coalesce(1 / datediff(col("date"), lead("date").over(wSpec)), lit(0.0)).alias("recency_index")) .groupBy("client_id", "date").max("recency_index").alias("recency_index") .orderBy("client_id", "date") .groupBy("client_id").avg("max(recency_index)").select("*")
recency_index = recency_index .select("client_id", (col(recency_index.columns[1]) * 100).alias("recency_index")) .orderBy(desc("recency_index"))
recency_index.show()
recency_index.count()
# ## Session Duration Index
# In[24]:
# Only includes sessions within the past 30 days
session_counts = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 30)) .groupBy("client_id").count() .select(col("client_id").alias("client_id1"), col("count").alias("count1"))
long_session_counts = activity_stream_stats_daily_df .where(col("date") > date_sub(current_date(), 30)) .where(col("session_duration") >= 5500) .groupBy("client_id").count() .select(col("client_id").alias("client_id2"), col("count").alias("count2"))
session_duration_index = session_counts .join(long_session_counts, session_counts.client_id1 == long_session_counts.client_id2) .select(session_counts.client_id1, (long_session_counts.count2 / session_counts.count1 * 100).alias("session_duration_index")) .orderBy(desc("session_duration_index"))
session_duration_index = session_duration_index.select(col("client_id1").alias("client_id"), "session_duration_index")
session_duration_index.show()
# ## Average Interaction Count
# In[25]:
# Compute avg interactions for each client_id
avg_interactions = interactions_per_user_per_day_df .groupBy("client_id") .avg("count") .select(col("client_id"), col("avg(count)").alias("avg_interactions"))
avg_interactions.show()
# ## Generate Engaged User Features Table
# In[26]:
# Merge with recency index
engaged_merged = engaged_users .join(recency_index, engaged_users.client_id == recency_index.client_id) .drop(engaged_users.client_id)
# Merge with loyalty index
engaged_merged = engaged_merged .join(loyalty_index, engaged_merged.client_id == loyalty_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index") .na.drop('any', None, ["client_id"]) .na.fill({'loyalty': 0})
# Merge with interaction index
engaged_merged = engaged_merged .join(interaction_index, engaged_merged.client_id == interaction_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'interaction_index': 0})
# Merge with session duration index
engaged_merged = engaged_merged .join(session_duration_index, engaged_merged.client_id == session_duration_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index") .na.drop('any', None, ["client_id"]) .na.fill({'session_duration_index': 0})
# Merge with avg interactions
engaged_merged = engaged_merged .join(avg_interactions, engaged_merged.client_id == avg_interactions.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions") .na.drop('any', None, ["client_id"]) .na.fill({'avg_interactions': 0})
# Merge with neg interaction index
engaged_merged = engaged_merged .join(neg_interaction_index, engaged_merged.client_id == neg_interaction_index.client_id, "outer") .select(engaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions", "neg_interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'neg_interaction_index': 0})
engaged_merged.show()
print engaged_merged.count()
# In[71]:
print engaged_merged.agg(avg(col("loyalty"))).take(1)
print engaged_merged.agg(avg(col("recency_index"))).take(1)
print engaged_merged.agg(avg(col("interaction_index"))).take(1)
print engaged_merged.agg(avg(col("neg_interaction_index"))).take(1)
print engaged_merged.agg(avg(col("session_duration_index"))).take(1)
# ## Generate Unengaged User Features Table
# In[27]:
# Merge with recency index
unengaged_merged = unengaged_users .join(recency_index, unengaged_users.client_id == recency_index.client_id) .drop(unengaged_users.client_id)
# Merge with loyalty index
unengaged_merged = unengaged_merged .join(loyalty_index, unengaged_merged.client_id == loyalty_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index") .na.drop('any', None, ["client_id"]) .na.fill({'loyalty': 0})
# Merge with interaction index
unengaged_merged = unengaged_merged .join(interaction_index, unengaged_merged.client_id == interaction_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'interaction_index': 0})
# Merge with session duration index
unengaged_merged = unengaged_merged .join(session_duration_index, unengaged_merged.client_id == session_duration_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index") .na.drop('any', None, ["client_id"]) .na.fill({'session_duration_index': 0})
# Merge with avg interactions
unengaged_merged = unengaged_merged .join(avg_interactions, unengaged_merged.client_id == avg_interactions.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions") .na.drop('any', None, ["client_id"]) .na.fill({'avg_interactions': 0})
# Merge with neg interaction index
unengaged_merged = unengaged_merged .join(neg_interaction_index, unengaged_merged.client_id == neg_interaction_index.client_id, "outer") .select(unengaged_merged.client_id.alias("client_id"), "loyalty", "recency_index", "interaction_index", "session_duration_index", "avg_interactions", "neg_interaction_index") .na.drop('any', None, ["client_id"]) .na.fill({'neg_interaction_index': 0})
unengaged_merged.show()
print unengaged_merged.count()
# In[72]:
print unengaged_merged.agg(avg(col("loyalty"))).take(1)
print unengaged_merged.agg(avg(col("recency_index"))).take(1)
print unengaged_merged.agg(avg(col("interaction_index"))).take(1)
print unengaged_merged.agg(avg(col("neg_interaction_index"))).take(1)
print unengaged_merged.agg(avg(col("session_duration_index"))).take(1)
# In[42]:
engaged_subset = engaged_merged.randomSplit([1.0, 6.0])
print engaged_subset[0].count()
print engaged_subset[1].count()
# In[43]:
unengaged_subset = unengaged_merged.randomSplit([8.0, 20.0])
print unengaged_subset[0].count()
print unengaged_subset[1].count()
# In[44]:
engaged_data = []
unengaged_data = []
for row in engaged_subset[1].collect():
engaged_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
for row in unengaged_subset[0].collect():
unengaged_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
# In[45]:
engaged_test_data = []
unengaged_test_data = []
for row in engaged_subset[0].collect():
engaged_test_data.append(LabeledPoint(0, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
for row in unengaged_subset[1].collect():
unengaged_test_data.append(LabeledPoint(1, [row.loyalty, row.recency_index, row.interaction_index, row.session_duration_index, row.neg_interaction_index]))
# ## Train an SVM
# In[46]:
data = sc.parallelize(engaged_data + unengaged_data)
mcm = SVMWithSGD.train(data, iterations=100, regParam=0.001)
# In[47]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = mcm.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = mcm.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# ## Train Logistic Regression
# In[66]:
lr = LogisticRegressionWithSGD.train(data)
# In[67]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = lr.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = lr.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# In[ ]:
# In[50]:
dt = DecisionTree.trainClassifier(data, 2, {})
# In[51]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = dt.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = dt.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# In[63]:
rf = RandomForest.trainClassifier(data, 2, {}, numTrees=1000)
# In[64]:
score_and_labels = []
test_correct_engaged = 0
for item in engaged_test_data:
prediction = rf.predict(item.features)
if prediction == item.label:
test_correct_engaged += 1
score_and_labels.append((float(prediction), item.label))
print "Engaged Accuracy: " + str(test_correct_engaged / float(len(engaged_test_data)))
test_correct_unengaged = 0
for item in unengaged_test_data:
prediction = rf.predict(item.features)
if prediction == item.label:
test_correct_unengaged += 1
score_and_labels.append((float(prediction), item.label))
print "Unengaged Accuracy: " + str(test_correct_unengaged / float(len(unengaged_test_data)))
print "Total Accuracy: " + str((test_correct_engaged + test_correct_unengaged) / float(len(unengaged_test_data) + len(engaged_test_data)))
metrics = MulticlassMetrics(sc.parallelize(score_and_labels))
# Statistics by class
labels = [float(1), float(0)]
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
# ## Let's see if there is a correlation between average number of interactions and the various engagement metrics
# In[ ]:
print engaged_merged.corr("recency_index", "avg_interactions")
print unengaged_merged.corr("recency_index", "avg_interactions")
# In[ ]:
print engaged_merged.corr("loyalty", "avg_interactions")
print unengaged_merged.corr("loyalty", "avg_interactions")
# In[ ]:
print engaged_merged.corr("session_duration_index", "avg_interactions")
print unengaged_merged.corr("session_duration_index", "avg_interactions")
# In[ ]:
print engaged_merged.corr("interaction_index", "avg_interactions")
print unengaged_merged.corr("interaction_index", "avg_interactions")
# In[ ]:
# In[ ]:
print engaged_merged.corr("interaction_index", "loyalty")
print unengaged_merged.corr("interaction_index", "loyalty")
# In[ ]:
print engaged_merged.corr("interaction_index", "recency_index")
print unengaged_merged.corr("interaction_index", "recency_index")
# In[ ]:
print engaged_merged.corr("interaction_index", "session_duration_index")
print unengaged_merged.corr("interaction_index", "session_duration_index")
# In[ ]:
print engaged_merged.corr("interaction_index", "neg_interaction_index")
print unengaged_merged.corr("interaction_index", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("loyalty", "recency_index")
print unengaged_merged.corr("loyalty", "recency_index")
# In[ ]:
print engaged_merged.corr("loyalty", "session_duration_index")
print unengaged_merged.corr("loyalty", "session_duration_index")
# In[ ]:
print engaged_merged.corr("loyalty", "neg_interaction_index")
print unengaged_merged.corr("loyalty", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("recency_index", "session_duration_index")
print unengaged_merged.corr("recency_index", "session_duration_index")
# In[ ]:
print engaged_merged.corr("recency_index", "neg_interaction_index")
print unengaged_merged.corr("recency_index", "neg_interaction_index")
# In[ ]:
print engaged_merged.corr("session_duration_index", "neg_interaction_index")
print unengaged_merged.corr("session_duration_index", "neg_interaction_index")
# In[ ]:
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment