Skip to content

Instantly share code, notes, and snippets.

@j450h1
Created June 12, 2020 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save j450h1/d249054ccb738283c4019b281335a442 to your computer and use it in GitHub Desktop.
Save j450h1/d249054ccb738283c4019b281335a442 to your computer and use it in GitHub Desktop.
def aggregate_to_user_level(df):
"""
Aggregate the selected features to the user level
"""
exprs = [\
sparkMax(col('churn')).alias('churn')\
,sparkMax(col('Gender')).alias('gender')\
,sparkMax(col('level')).alias('subscription_level')\
,sparkMax(col('device_type')).alias('device_type')\
,sparkMax(when(col("page") == 'Upgrade', 1).otherwise(0)).alias('page_upgraded')
,sparkMax(when(col("page") == 'Downgrade', 1).otherwise(0)).alias('page_downgraded')
,count(when(col("auth") == 'Logged In', True)).alias('auth_logged_in_cnt')\
,count(when(col("auth") == 'Logged Out', True)).alias('auth_logged_out_cnt')\
,count(when(col("auth") == 'Guest', True)).alias('auth_guest_cnt')\
,count(when(col("status") == '404', True)).alias('status_404_cnt')\
,count(when(col("status") == '307', True)).alias('status_307_cnt')\
,count(when(col("page") == 'Next Song', True)).alias('page_next_song_cnt')
,count(when(col("page") == 'Thumbs Up', True)).alias('page_thumbs_up_cnt')
,count(when(col("page") == 'Thumbs Down', True)).alias('page_thumbs_down_cnt')
,count(when(col("page") == 'Add to Playlist', True)).alias('page_playlist_cnt')
,count(when(col("page") == 'Add Friend', True)).alias('page_friend_cnt')
,count(when(col("page") == 'Roll Advert', True)).alias('page_roll_ad_cnt')
,count(when(col("page") == 'Logout', True)).alias('page_logout_cnt')
,count(when(col("page") == 'Help', True)).alias('page_help_cnt')
,countDistinct('artist').alias('artist_cnt')\
,countDistinct('song').alias('song_cnt')\
,countDistinct('sessionId').alias('session_cnt')\
]
# Additional feature engineering
df = df.withColumn("device_type",\
expr("CASE WHEN rlike(userAgent, '(Windows|Macintosh|Linux)') THEN 'desktop' \
WHEN rlike(userAgent, 'iP') THEN 'mobile' ELSE 'other' END AS device_type"))
user_df = df.groupBy('userId')\
.agg(*exprs)
# Remove data with null values - needs to be added to pipeline
user_df = user_df.where(col("gender").isNotNull()) #remove when gender is not specified
return user_df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment