Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save graham-thomson/82a44160a1090e925756ea4bdf25abe2 to your computer and use it in GitHub Desktop.
Save graham-thomson/82a44160a1090e925756ea4bdf25abe2 to your computer and use it in GitHub Desktop.
Spark DF way to fill null feature rows and combine multiple feature vectors.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import udf\n",
"from pyspark.sql.types import *\n",
"from pyspark.ml.feature import VectorAssembler\n",
"from pyspark.ml.linalg import Vectors, VectorUDT\n",
"from uuid import uuid4\n",
"import random\n",
"\n",
"spark = SparkSession.builder.getOrCreate()\n",
"\n",
"def generate_row():\n",
" return [str(uuid4()), \n",
" Vectors.dense([random.randint(0, 12) for x in range(10)]), \n",
" random.choice([None, Vectors.dense([random.randint(0, 10) for x in range(10)])])]\n",
"\n",
"@udf(returnType=VectorUDT())\n",
"def none_to_sparse(vec):\n",
" if vec == None:\n",
" return Vectors.sparse(10, [], [])\n",
" return vec"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+--------------------+\n",
"| idl| features1| features2|\n",
"+--------------------+--------------------+--------------------+\n",
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|[9.0,5.0,5.0,5.0,...|\n",
"|0420ee46-08b6-4f9...|[0.0,5.0,6.0,4.0,...| null|\n",
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|[2.0,6.0,10.0,8.0...|\n",
"|e09cdbf9-19b5-440...|[7.0,1.0,0.0,11.0...| null|\n",
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|[6.0,10.0,3.0,7.0...|\n",
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|[0.0,6.0,1.0,3.0,...|\n",
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|[3.0,9.0,1.0,0.0,...|\n",
"|4d24e373-115f-41f...|[9.0,4.0,3.0,4.0,...| null|\n",
"|c40a73fb-e966-415...|[3.0,7.0,4.0,9.0,...| null|\n",
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|[3.0,3.0,1.0,8.0,...|\n",
"+--------------------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"# generate mock data\n",
"df = spark.createDataFrame([generate_row() for x in range(10)], \n",
" schema=StructType([\n",
" StructField(\"idl\", StringType(), nullable=False),\n",
" StructField(\"features1\", VectorUDT(), nullable=False),\n",
" StructField(\"features2\", VectorUDT(), nullable=True)\n",
" ]))\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+--------------------+\n",
"| idl| features1| features2|\n",
"+--------------------+--------------------+--------------------+\n",
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|[9.0,5.0,5.0,5.0,...|\n",
"|0420ee46-08b6-4f9...|[0.0,5.0,6.0,4.0,...| (10,[],[])|\n",
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|[2.0,6.0,10.0,8.0...|\n",
"|e09cdbf9-19b5-440...|[7.0,1.0,0.0,11.0...| (10,[],[])|\n",
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|[6.0,10.0,3.0,7.0...|\n",
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|[0.0,6.0,1.0,3.0,...|\n",
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|[3.0,9.0,1.0,0.0,...|\n",
"|4d24e373-115f-41f...|[9.0,4.0,3.0,4.0,...| (10,[],[])|\n",
"|c40a73fb-e966-415...|[3.0,7.0,4.0,9.0,...| (10,[],[])|\n",
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|[3.0,3.0,1.0,8.0,...|\n",
"+--------------------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"# apply none_to_sparse UDF above to dataframe \n",
"# FYI: I've read that .select() is faster than .withColumn(), nothing to back that up though...\n",
"df = df.select(\"idl\", \"features1\", none_to_sparse(\"features2\").alias(\"features2\"))\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+\n",
"| idl| features|\n",
"+--------------------+--------------------+\n",
"|54efe347-4b64-405...|[9.0,4.0,2.0,7.0,...|\n",
"|0420ee46-08b6-4f9...|(20,[1,2,3,4,5,6,...|\n",
"|4c502852-ae76-4ce...|[7.0,8.0,12.0,9.0...|\n",
"|e09cdbf9-19b5-440...|(20,[0,1,3,4,5,6,...|\n",
"|c51180e8-250c-47b...|[3.0,11.0,12.0,12...|\n",
"|b640fba4-255d-4c7...|[3.0,4.0,11.0,7.0...|\n",
"|7c982edf-5f16-41d...|[2.0,4.0,5.0,0.0,...|\n",
"|4d24e373-115f-41f...|(20,[0,1,2,3,4,5,...|\n",
"|c40a73fb-e966-415...|(20,[0,1,2,3,4,5,...|\n",
"|1dcfd555-b516-4c4...|[11.0,12.0,11.0,1...|\n",
"+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"# using the VectorAssembler to combine two feature vectors into one\n",
"va = VectorAssembler(inputCols=[\"features1\", \"features2\"], outputCol=\"features\")\n",
"df = va.transform(df).select(\"idl\", \"features\")\n",
"df.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python3 (sys)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment