Created
May 22, 2018 15:38
-
-
Save vpipkt/9fcc009c4d9e7b72fa571a6fcb77da12 to your computer and use it in GitHub Desktop.
cluster and example operations in pyspark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"nbformat": 4, "cells": [{"execution_count": 1, "source": "df = dataframe()", "metadata": {"collapsed": true}, "cell_type": "code", "outputs": []}, {"execution_count": 2, "source": "df.printSchema()", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": [{"name": "stdout", "text": "root\n |-- fixed acidity: double (nullable = true)\n |-- volatile acidity: double (nullable = true)\n |-- citric acid: double (nullable = true)\n |-- residual sugar: double (nullable = true)\n |-- chlorides: double (nullable = true)\n |-- free sulfur dioxide: double (nullable = true)\n |-- total sulfur dioxide: double (nullable = true)\n |-- density: double (nullable = true)\n |-- pH: double (nullable = true)\n |-- sulphates: double (nullable = true)\n |-- alcohol: double (nullable = true)\n |-- quality: double (nullable = true)\n |-- features: vector (nullable = true)\n |-- prediction: integer (nullable = true)\n |-- centroid: vector (nullable = true)\n\n", "output_type": "stream"}]}, {"source": "## To plot K vs your Ray Turi here is K", "metadata": {}, "cell_type": "markdown"}, {"execution_count": 9, "source": "K = df.agg(F.max(df.prediction))\\\n .collect()[0][0] + 1", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 10, "source": "K", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": [{"execution_count": 10, "data": {"text/plain": "4"}, "metadata": {}, "output_type": "execute_result"}]}, {"source": "## Example to cluster center distance", "metadata": {}, "cell_type": "markdown"}, {"execution_count": 3, "source": "from pyspark.sql.types import *\nimport pyspark.sql.functions as F", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 4, "source": "from scipy.spatial import distance\ndist_udf = F.udf(lambda x: float(distance.euclidean(x[0], x[1])), FloatType())", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 5, "source": "distances = df.withColumn('dist_from_ctr',\n dist_udf(F.array(df.features, df.centroid)))", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 6, "source": "distances.select(distances.prediction, distances.dist_from_ctr).show()", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": [{"name": "stdout", "text": "+----------+-------------+\n|prediction|dist_from_ctr|\n+----------+-------------+\n| 0| 0.0|\n| 2| 16.404589|\n| 2| 0.0|\n| 2| 7.227447|\n| 0| 0.0|\n| 2| 14.165186|\n| 2| 5.071906|\n| 0| 13.633969|\n| 0| 16.131256|\n| 1| 36.0852|\n| 2| 11.0845175|\n| 1| 36.0852|\n| 2| 5.610877|\n| 0| 5.522331|\n| 3| 91.9762|\n| 3| 94.392975|\n| 1| 37.393963|\n| 2| 2.5086203|\n| 0| 7.513262|\n| 2| 3.0544722|\n+----------+-------------+\nonly showing top 20 rows\n\n", "output_type": "stream"}]}, {"source": "## Inferred cluster centres ", "metadata": {}, "cell_type": "markdown"}, {"execution_count": 11, "source": "#oops ya that won't work. wish it did!\n#features_long = df.select(F.explode(df.features).alias('feature','value'))", "metadata": {"collapsed": true}, "cell_type": "code", "outputs": []}, {"execution_count": 12, "source": "vec_apply_udf = F.udf(lambda x: float(x[0].values[x[1]]), FloatType())", "metadata": {"collapsed": true}, "cell_type": "code", "outputs": []}, {"execution_count": 21, "source": "feats = df.select( \\\n df.prediction,\n vec_apply_udf(F.struct(df.features, F.lit(0))).alias('f0'),\n vec_apply_udf(F.struct(df.features, F.lit(1))).alias('f1'),\n vec_apply_udf(F.struct(df.features, F.lit(2))).alias('f2'),\n vec_apply_udf(F.struct(df.features, F.lit(3))).alias('f3'),\n vec_apply_udf(F.struct(df.features, F.lit(4))).alias('f4'),\n vec_apply_udf(F.struct(df.features, F.lit(5))).alias('f5')\n )", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 22, "source": "centroids = feats.groupBy(feats.prediction).agg(\\\n F.mean(feats.f0).alias('f0'),\n F.mean(feats.f1).alias('f1'),\n F.mean(feats.f2).alias('f2'),\n F.mean(feats.f3).alias('f3'),\n F.mean(feats.f4).alias('f4'),\n F.mean(feats.f5).alias('f5'),\n )", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": []}, {"execution_count": 24, "source": "centroids.toPandas()", "metadata": {"collapsed": false}, "cell_type": "code", "outputs": [{"execution_count": 24, "data": {"text/plain": " prediction f0 f1 f2 f3 f4 f5\n0 1 8.084375 0.548887 0.283125 3.016797 0.090621 24.550781\n1 3 8.029630 0.562469 0.316790 3.345679 0.089444 31.364198\n2 2 8.176952 0.521657 0.256781 2.402667 0.090796 19.646667\n3 0 8.534871 0.521085 0.271832 2.381072 0.083782 8.472185", "text/html": "<div>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>prediction</th>\n <th>f0</th>\n <th>f1</th>\n <th>f2</th>\n <th>f3</th>\n <th>f4</th>\n <th>f5</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>1</td>\n <td>8.084375</td>\n <td>0.548887</td>\n <td>0.283125</td>\n <td>3.016797</td>\n <td>0.090621</td>\n <td>24.550781</td>\n </tr>\n <tr>\n <th>1</th>\n <td>3</td>\n <td>8.029630</td>\n <td>0.562469</td>\n <td>0.316790</td>\n <td>3.345679</td>\n <td>0.089444</td>\n <td>31.364198</td>\n </tr>\n <tr>\n <th>2</th>\n <td>2</td>\n <td>8.176952</td>\n <td>0.521657</td>\n <td>0.256781</td>\n <td>2.402667</td>\n <td>0.090796</td>\n <td>19.646667</td>\n </tr>\n <tr>\n <th>3</th>\n <td>0</td>\n <td>8.534871</td>\n <td>0.521085</td>\n <td>0.271832</td>\n <td>2.381072</td>\n <td>0.083782</td>\n <td>8.472185</td>\n </tr>\n </tbody>\n</table>\n</div>"}, "metadata": {}, "output_type": "execute_result"}]}, {"source": "Now want to create our same Spark dataframe of id and ml.linalg.Vector type for above distance calc...\n\nOr could try to collect all the features into Pandas too", "metadata": {"collapsed": true}, "cell_type": "markdown"}, {"execution_count": null, "source": "", "metadata": {"collapsed": true}, "cell_type": "code", "outputs": []}], "metadata": {"language_info": {"name": "python", "codemirror_mode": {"name": "ipython", "version": 2}, "file_extension": ".py", "version": "2.7.13", "nbconvert_exporter": "python", "mimetype": "text/x-python", "pygments_lexer": "ipython2"}, "kernelspec": {"name": "forwarding_kernel_py", "language": "python", "display_name": "PySpark"}}, "nbformat_minor": 0} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment