Skip to content

Instantly share code, notes, and snippets.

@afraenkel
Last active August 14, 2018 19:01
Show Gist options
  • Save afraenkel/04b21d11e4bded0d63252e52bd8c4d4a to your computer and use it in GitHub Desktop.
Save afraenkel/04b21d11e4bded0d63252e52bd8c4d4a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Requires spark >= 2.3\n",
"# Requires PyArrow (pip install pyarrow)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext, SQLContext\n",
"\n",
"import pyspark.sql.types as T\n",
"import pyspark.sql.functions as F\n",
"\n",
"from pyspark.sql.functions import pandas_udf\n",
"from pyspark.sql.functions import PandasUDFType"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"sc = SparkContext()\n",
"sqlc = SQLContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>group_col</th>\n",
" <th>column1</th>\n",
" <th>column2</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>a</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>c</td>\n",
" <td>3</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>e</td>\n",
" <td>5</td>\n",
" <td>6</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>a</td>\n",
" <td>2</td>\n",
" <td>7</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>a</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>e</td>\n",
" <td>3</td>\n",
" <td>6</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" group_col column1 column2\n",
"0 a 1 1\n",
"1 c 3 4\n",
"2 e 5 6\n",
"3 a 2 7\n",
"4 a 3 2\n",
"5 e 3 6"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"schema = T.StructType(\n",
" [\n",
" T.StructField('group_col', T.StringType()),\n",
" T.StructField('column1', T.IntegerType()),\n",
" T.StructField('column2', T.IntegerType())\n",
" ])\n",
"\n",
"df = sqlc.createDataFrame(\n",
" [\n",
" ['a', 1, 1], \n",
" ['c', 3, 4], \n",
" ['e', 5, 6],\n",
" ['a', 2, 7],\n",
" ['a', 3, 2],\n",
" ['e', 3, 6]\n",
" ],\n",
" schema=schema)\n",
"\n",
"df.toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# make schema for the return dataframe of the udf\n",
"udf_schema = T.StructType(df.schema.fields + [T.StructField('size', T.IntegerType())])\n",
"\n",
"# define a function that inputs a dataframe (corresponding to grouped data)\n",
"# and outputs a dataframe with the specified data\n",
"# In this case, the function appends the size of the grouped dataframe as a column.\n",
"\n",
"@pandas_udf(udf_schema, PandasUDFType.GROUPED_MAP)\n",
"def add_count(grouped_df):\n",
" return grouped_df.assign(size=len(grouped_df))"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>group_col</th>\n",
" <th>column1</th>\n",
" <th>column2</th>\n",
" <th>size</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>e</td>\n",
" <td>5</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>e</td>\n",
" <td>3</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>c</td>\n",
" <td>3</td>\n",
" <td>4</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>a</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>a</td>\n",
" <td>2</td>\n",
" <td>7</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>a</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" group_col column1 column2 size\n",
"0 e 5 6 2\n",
"1 e 3 6 2\n",
"2 c 3 4 1\n",
"3 a 1 1 3\n",
"4 a 2 7 3\n",
"5 a 3 2 3"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.groupBy('group_col').apply(add_count).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment