Skip to content

Instantly share code, notes, and snippets.

View oneryalcin's full-sized avatar

Mehmet Öner Yalçın oneryalcin

View GitHub Profile
@oneryalcin
oneryalcin / sparkify_10_skew_fix.py
Created September 23, 2019 23:10
10 Sparkify Skew fix
f, axes = plt.subplots(2, 3, figsize=(14, 7), sharex=False)
sns.distplot( joined_pandas["sessionCount"] , color="skyblue", ax=axes[0, 0])
sns.distplot( joined_pandas["meanSongCount"] , color="olive", ax=axes[0, 1])
sns.distplot( joined_pandas["sessionsFreqDay"] , color="gold", ax=axes[0, 2])
# Skew handling
sns.distplot( np.log(joined_pandas["sessionCount"]) , color="skyblue", ax=axes[1, 0])
sns.distplot( np.sqrt(joined_pandas["meanSongCount"]) , color="olive", ax=axes[1, 1])
sns.distplot( np.sqrt(joined_pandas["sessionsFreqDay"]) , color="gold", ax=axes[1, 2])
@oneryalcin
oneryalcin / sparkify_9_inexer_onehotencoder.py
Created September 23, 2019 23:04
9 StringIndexer and OneHotEncoder
joined = StringIndexer(inputCol='gender', outputCol='gender_idx')\
.fit(joined)\
.transform(joined)
joined = StringIndexer(inputCol='level', outputCol='level_idx')\
.fit(joined)\
.transform(joined)
joined = OneHotEncoderEstimator(inputCols=['gender_idx', 'level_idx'],
outputCols=['gender_dummy','level_dummy'])\
@oneryalcin
oneryalcin / sparkify_8_joined_features.py
Created September 23, 2019 22:48
8 Sparkify Joined Features
joined = user_features\
.join(churn_data_summary,
on=['userId'],
how='left')\
.join(user_engagement,
on=['userId'],
how='left')\
.join(listen_freq,
on=['userId'],
how='left')\
@oneryalcin
oneryalcin / sparkify_7_sql.py
Created September 23, 2019 22:19
7 Sparkify SQL
# Show that we can do the same calculation above using SQL
data.createOrReplaceTempView('sparkify')
sub_query = """
SELECT
userId,
sessionId,
max(itemInSession) as itemCount
FROM
sparkify
@oneryalcin
oneryalcin / sparkify_6_avg_song_count_per_session.py
Created September 23, 2019 22:14
6 Sparkify Session Count and Avg Song Count/Session
user_engagement = data\
.groupBy('userId', 'sessionId')\
.agg(F.max('itemInSession').alias('itemCount'))\
.groupBy('userId')\
.agg({"itemCount": "mean", "sessionId": "count"})\
.withColumnRenamed('count(sessionId)', 'sessionCount')\
.withColumnRenamed('avg(itemCount)', 'meanSongCount')\
.orderBy('userId')
user_engagement.show(10)
@oneryalcin
oneryalcin / sparkify_5_listen_freq.py
Created September 23, 2019 22:05
5 Sparkify listen_freq
# Create a new aggreated dataframe called listen_freq
# (stands for listening frequency) for each user
listen_freq = data.select('userId','sessionId', 'timeStamp')\
.groupBy('userId','sessionId')\
.agg(F.min('timeStamp').alias('sessionTime'))\
.orderBy('userId', 'sessionId')\
.groupBy('userId')\
.agg(F.min('sessionTime').alias('minSessionTime'),
F.max('sessionTime').alias('maxSessionTime'),
@oneryalcin
oneryalcin / sparkify_4_null_values_stats.py
Created September 23, 2019 21:13
sparkify_4_null_values_stats
# First let's have a look if we have any NAN values in our dataset
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).head().asDict()
>> {'artist': 0,
'auth': 0,
'firstName': 0,
'gender': 0,
'itemInSession': 0,
'lastName': 0,
'length': 0,
'level': 0,
@oneryalcin
oneryalcin / sparkify_3_read_data.py
Last active September 23, 2019 21:07
3 Sparkify Read Data
# Read data into spark.
# Note: Ideally data should be in a schema supported format like parquet,
# which also supports partitioning, something very important while ingesting big data.
# Also data may be placed in a distributed filesystem like HDFS or in a cloud
# provider storage bucket like AWS S3 / Google Cloud Storage for faster reads.
# here we only read from local disk.
data = spark.read.json('mini_sparkify_event_data.json')
# How many user activity rows do we have?
data.count()
@oneryalcin
oneryalcin / sparkify_2_spark_context.py
Last active September 23, 2019 20:58
2 Sparkify create spark context
# create a Spark session, if not there is or get an existing one
spark = SparkSession \
.builder \
.appName("Sparkify The music streaming platform churn detection") \
.getOrCreate()
# Check the current Spark Config
spark.sparkContext.getConf().getAll()
>> [('spark.app.id', 'local-1569248217329'),
@oneryalcin
oneryalcin / sparkify_1_import_libs.py
Created September 23, 2019 20:42
1 Sparkify Import libs
# import libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import count, when, isnan, isnull, desc_nulls_first, desc, \
from_unixtime, col, dayofweek, dayofyear, hour, to_date, month
import pyspark.sql.functions as F
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
# sc = SparkContext(appName="Project_workspace")