Last active
May 21, 2019 19:00
-
-
Save graham-thomson/82a44160a1090e925756ea4bdf25abe2 to your computer and use it in GitHub Desktop.
Spark DF way to fill null feature rows and combine multiple feature vectors.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql import SparkSession\n", | |
"from pyspark.sql.functions import udf\n", | |
"from pyspark.sql.types import *\n", | |
"from pyspark.ml.feature import VectorAssembler\n", | |
"from pyspark.ml.linalg import Vectors, VectorUDT\n", | |
"from uuid import uuid4\n", | |
"import random\n", | |
"\n", | |
"spark = SparkSession.builder.getOrCreate()\n", | |
"\n", | |
"def generate_row():\n", | |
" return [str(uuid4()), \n", | |
" Vectors.dense([random.randint(0, 12) for x in range(10)]), \n", | |
" random.choice([None, Vectors.dense([random.randint(0, 10) for x in range(10)])])]\n", | |
"\n", | |
"@udf(returnType=VectorUDT())\n", | |
"def none_to_sparse(vec):\n", | |
" if vec == None:\n", | |
" return Vectors.sparse(10, [], [])\n", | |
" return vec" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------------------+--------------------+--------------------+\n", | |
"| idl| features1| features2|\n", | |
"+--------------------+--------------------+--------------------+\n", | |
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|[9.0,5.0,5.0,5.0,...|\n", | |
"|0420ee46-08b6-4f9...|[0.0,5.0,6.0,4.0,...| null|\n", | |
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|[2.0,6.0,10.0,8.0...|\n", | |
"|e09cdbf9-19b5-440...|[7.0,1.0,0.0,11.0...| null|\n", | |
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|[6.0,10.0,3.0,7.0...|\n", | |
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|[0.0,6.0,1.0,3.0,...|\n", | |
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|[3.0,9.0,1.0,0.0,...|\n", | |
"|4d24e373-115f-41f...|[9.0,4.0,3.0,4.0,...| null|\n", | |
"|c40a73fb-e966-415...|[3.0,7.0,4.0,9.0,...| null|\n", | |
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|[3.0,3.0,1.0,8.0,...|\n", | |
"+--------------------+--------------------+--------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"# generate mock data\n", | |
"df = spark.createDataFrame([generate_row() for x in range(10)], \n", | |
" schema=StructType([\n", | |
" StructField(\"idl\", StringType(), nullable=False),\n", | |
" StructField(\"features1\", VectorUDT(), nullable=False),\n", | |
" StructField(\"features2\", VectorUDT(), nullable=True)\n", | |
" ]))\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------------------+--------------------+--------------------+\n", | |
"| idl| features1| features2|\n", | |
"+--------------------+--------------------+--------------------+\n", | |
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|[9.0,5.0,5.0,5.0,...|\n", | |
"|0420ee46-08b6-4f9...|[0.0,5.0,6.0,4.0,...| (10,[],[])|\n", | |
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|[2.0,6.0,10.0,8.0...|\n", | |
"|e09cdbf9-19b5-440...|[7.0,1.0,0.0,11.0...| (10,[],[])|\n", | |
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|[6.0,10.0,3.0,7.0...|\n", | |
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|[0.0,6.0,1.0,3.0,...|\n", | |
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|[3.0,9.0,1.0,0.0,...|\n", | |
"|4d24e373-115f-41f...|[9.0,4.0,3.0,4.0,...| (10,[],[])|\n", | |
"|c40a73fb-e966-415...|[3.0,7.0,4.0,9.0,...| (10,[],[])|\n", | |
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|[3.0,3.0,1.0,8.0,...|\n", | |
"+--------------------+--------------------+--------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"# apply none_to_sparse UDF above to dataframe \n", | |
"# FYI: I've read that .select() is faster than .withColumn(), nothing to back that up though...\n", | |
"df = df.select(\"idl\", \"features1\", none_to_sparse(\"features2\").alias(\"features2\"))\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------------------+--------------------+\n", | |
"| idl| features|\n", | |
"+--------------------+--------------------+\n", | |
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|\n", | |
"|0420ee46-08b6-4f9...|(20,[1,2,3,4,5,6,...|\n", | |
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|\n", | |
"|e09cdbf9-19b5-440...|(20,[0,1,3,4,5,6,...|\n", | |
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|\n", | |
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|\n", | |
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|\n", | |
"|4d24e373-115f-41f...|(20,[0,1,2,3,4,5,...|\n", | |
"|c40a73fb-e966-415...|(20,[0,1,2,3,4,5,...|\n", | |
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|\n", | |
"+--------------------+--------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"# using the VectorAssembler to combine two feature vectors into one\n", | |
"va = VectorAssembler(inputCols=[\"features1\", \"features2\"], outputCol=\"features\")\n", | |
"df = va.transform(df).select(\"idl\", \"features\")\n", | |
"df.show()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python3 (sys)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.4" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment