Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
%pyspark
#read in datafile
data = sc.textFile('s3://bucket/clustering/zep_tracks.csv')
#read as rdd
import csv
rdd = data.mapPartitions(lambda x: csv.reader(x))
#convert to dataframe
dataframe = rdd.toDF(['artist','artist_id','album','album_id','track','track_id','track_number','track_length',
'preview_url','danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness',
'liveness','valence','tempo','duration_ms','time_signature'])
print dataframe.show(n=2)
#%pyspark
print dataframe.dtypes
%pyspark
#reformat columns as required
dataframe=dataframe.withColumn('danceability', dataframe.danceability.cast("Float"))
dataframe=dataframe.withColumn('energy', dataframe.energy.cast("Float"))
dataframe=dataframe.withColumn('key', dataframe.key.cast("Float"))
dataframe=dataframe.withColumn('loudness', dataframe.loudness.cast("Float"))
dataframe=dataframe.withColumn('mode', dataframe.mode.cast("Float"))
dataframe=dataframe.withColumn('speechiness', dataframe.speechiness.cast("Float"))
dataframe=dataframe.withColumn('acousticness', dataframe.acousticness.cast("Float"))
dataframe=dataframe.withColumn('instrumentalness', dataframe.instrumentalness.cast("Float"))
dataframe=dataframe.withColumn('liveness', dataframe.liveness.cast("Float"))
dataframe=dataframe.withColumn('valence', dataframe.valence.cast("Float"))
dataframe=dataframe.withColumn('tempo', dataframe.tempo.cast("Float"))
dataframe=dataframe.withColumn('duration_ms', dataframe.duration_ms.cast("Float"))
dataframe=dataframe.withColumn('time_signature', dataframe.time_signature.cast("Float"))
print dataframe.dtypes
#looking at the data
%pyspark
dataframe.filter(dataframe.energy>1).show()
%pyspark
import pyspark.sql.functions as func
dataframe.groupBy(dataframe.album).avg("loudness").select("album", func.col("avg(loudness)").alias("avg_loudness")).orderBy("avg_loudness", ascending=False).show(truncate=False)
%pyspark
dataframe.createOrReplaceTempView("sqltable")
sqlDF = spark.sql("select album, avg(energy) as avg_energy from sqltable group by album order by avg(energy) desc")
sqlDF.show()
%sql
select album, avg(danceability) as avg_danceability
from sqltable
group by album
order by avg(danceability) desc
%pyspark
#assemble features for clustering
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','duration_ms','time_signature'], outputCol="features")
cluster_vars = assembler.transform(dataframe)
%pyspark
#scale features for clustering
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=True)
scalerModel = scaler.fit(cluster_vars)
cluster_vars_scaled = scalerModel.transform(cluster_vars)
#run the clustering
%pyspark
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="scaledFeatures").setK(9).setSeed(1)
kmeans_model = kmeans.fit(cluster_vars_scaled)
print kmeans_model.computeCost(cluster_vars_scaled)
from pyspark.ml.clustering import BisectingKMeans
bkm = BisectingKMeans(featuresCol="features").setK(9).setSeed(1)
bkm_model = bkm.fit(cluster_vars_scaled)
print bkm_model.computeCost(cluster_vars_scaled)
from pyspark.ml.clustering import GaussianMixture
gmm = GaussianMixture(featuresCol="scaledFeatures").setK(9)
gmm_model = gmm.fit(cluster_vars_scaled)
#print gmm_model.computeCost(cluster_vars)
%pyspark
kmeans_pred = kmeans_model.transform(cluster_vars_scaled).select("scaledFeatures", "prediction").toDF("scaledFeatures", "kmeans_pred")
bkm_pred = bkm_model.transform(cluster_vars_scaled).select("features", "prediction").toDF("features", "bkm_pred")
gmm_pred = gmm_model.transform(cluster_vars_scaled).select("scaledFeatures", "prediction").toDF("scaledFeatures", "gmm_pred")
output=cluster_vars_scaled.join(kmeans_pred, on="scaledFeatures")
output=output.join(bkm_pred, on="features")
output=output.join(gmm_pred, on="scaledFeatures")
print output.dtypes
output.select("track", "kmeans_pred", "bkm_pred", "gmm_pred").show(n=5)
%pyspark
output.write.csv('s3://bucket/clustering/cluster_output.csv')
%pyspark
output.select("track", 'album', 'danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','duration_ms','time_signature', 'kmeans_pred', 'bkm_pred', 'gmm_pred').createOrReplaceTempView("sqloutput")
%sql
select * from sqloutput limit 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment