Skip to content

Instantly share code, notes, and snippets.

@DarquesM
Last active May 31, 2018 08:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DarquesM/38d466f5506b96a9765ef461dbd46071 to your computer and use it in GitHub Desktop.
Save DarquesM/38d466f5506b96a9765ef461dbd46071 to your computer and use it in GitHub Desktop.
A bunch of Pyspark snippets
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fetch first/last occurence of a value \n",
"\n",
"This notebook shows how to find the first or last desired value in a grouped column without using window functions and report the corresponding value in another column\n",
"\n",
"In this example, let's consider the following data : the speed of 2 cars (2 IDs) is recorded as a function of time. We want to find the first time these cars reach a speed of 4 and the last time a speed of 6.\n",
"\n",
"Two methods are used : \n",
"- Sort column and get first desired value\n",
"- Filter by speed and get the min or max of time directly\n",
"\n",
"As expected, using sort is slower.\n",
"\n",
"Also note that depending on the dataset size, the first method may give a wrong answer, maybe due to the sort or the grouping. If using the sort, a good practice may be to groupBy several possible columns."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql import functions as F\n",
"from pyspark.sql import Window"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING: calling FoundryManager with a sparkSession is deprecated, use a spark context instead\n"
]
}
],
"source": [
"fm = get_foundry_manager()\n",
"#Create Spark context\n",
"sc = fm._sql_context"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Keep only phases 4 & 5"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---+-----+\n",
"|Time| ID|Speed|\n",
"+----+---+-----+\n",
"| 1|10A| 4|\n",
"| 2|10A| 4|\n",
"| 3|10A| 4|\n",
"| 4|10A| 4|\n",
"| 5|10A| 5|\n",
"| 6|10A| 5|\n",
"| 7|10A| 6|\n",
"| 8|10A| 7|\n",
"| 9|58R| 3|\n",
"| 10|58R| 4|\n",
"| 11|58R| 4|\n",
"| 12|58R| 5|\n",
"| 13|58R| 5|\n",
"| 14|58R| 5|\n",
"| 15|58R| 5|\n",
"| 16|58R| 6|\n",
"+----+---+-----+\n",
"\n"
]
}
],
"source": [
"# Sample DataFrame\n",
"a = sc.createDataFrame([(1, '10A', 4), \n",
" (2, '10A', 4),\n",
" (3, '10A', 4),\n",
" (4, '10A', 4),\n",
" (5, '10A', 5),\n",
" (6, '10A', 5),\n",
" (7, '10A', 6),\n",
" (8, '10A', 7),\n",
" (9, '58R', 3),\n",
" (10, '58R', 4),\n",
" (11, '58R', 4),\n",
" (12, '58R', 5),\n",
" (13, '58R', 5),\n",
" (14, '58R', 5),\n",
" (15, '58R', 5),\n",
" (16, '58R', 6)]\n",
" , ['Time', 'ID', 'Speed'])\n",
"a.show()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-------------+\n",
"| ID|First_Speed_4|\n",
"+---+-------------+\n",
"|10A| 1|\n",
"|58R| 10|\n",
"+---+-------------+\n",
"\n",
"Execution time with sort: 1.59730792046 s\n",
"+---+-------------+\n",
"| ID|First_Speed_4|\n",
"+---+-------------+\n",
"|10A| 1|\n",
"|58R| 10|\n",
"+---+-------------+\n",
"\n",
"Execution time without sort: 0.929090976715 s\n",
"+---+------------+\n",
"| ID|Last_Speed_6|\n",
"+---+------------+\n",
"|10A| 7|\n",
"|58R| 16|\n",
"+---+------------+\n",
"\n",
"Execution time with sort: 1.48199009895 s\n",
"+---+------------+\n",
"| ID|Last_Speed_6|\n",
"+---+------------+\n",
"|10A| 7|\n",
"|58R| 16|\n",
"+---+------------+\n",
"\n",
"Execution time without sort: 1.10506987572 s\n"
]
}
],
"source": [
"# All starts grouped by ID\n",
"t_init = time.time()\n",
"starts = a.filter(a[\"Speed\"] == 4.0)\\\n",
" .sort('Time')\\\n",
" .groupBy(\"ID\")\\\n",
" .agg(F.first(\"Time\").alias(\"First_Speed_4\")) \n",
"starts.show()\n",
"print(\"Execution time with sort: {0} s\".format(time.time()-t_init))\n",
"\n",
"t_init = time.time()\n",
"\n",
"starts = a.filter(a[\"Speed\"] == 4.0)\\\n",
" .groupBy(\"ID\")\\\n",
" .agg(F.min(\"Time\").alias(\"First_Speed_4\"))\n",
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as F\n",
"from pyspark.sql import Window"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING: calling FoundryManager with a sparkSession is deprecated, use a spark context instead\n"
]
}
],
"source": [
"fm = get_foundry_manager()\n",
"sc = fm._sql_context"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"## Sample DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"a = sc.createDataFrame([(1, 1, 16), \n",
" (2, 1, 100),\n",
" (3, 2, 150),\n",
" (4, 2, 150),\n",
" (5, 2, 6),\n",
" (6, 3, 99),\n",
" (7, 3, 101),\n",
" (8, 4, 103),\n",
" (9, 4, 12),\n",
" (10, 4, 18),\n",
" (11, 4, 47),\n",
" (12, 4, 44)]\n",
" , ['ID', 'Phase', 'FF_1'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are two ways of using windows : \n",
"\n",
"* Define the window and the range at the same time, or\n",
"* Define the window and use pyspark.sql functions "
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"window_w_range = Window.partitionBy('Phase')\\\n",
" .orderBy('ID')\\\n",
" .rowsBetween(0, 1)\n",
"window_wo_range = Window.orderBy('ID')"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-----+----+-----+\n",
"| ID|Phase|FF_1| Avg|\n",
"+---+-----+----+-----+\n",
"| 1| 1| 16| 58.0|\n",
"| 2| 1| 100|100.0|\n",
"| 6| 3| 99|100.0|\n",
"| 7| 3| 101|101.0|\n",
"| 3| 2| 150|150.0|\n",
"| 4| 2| 150| 78.0|\n",
"| 5| 2| 6| 6.0|\n",
"| 8| 4| 103| 57.5|\n",
"| 9| 4| 12| 15.0|\n",
"| 10| 4| 18| 32.5|\n",
"| 11| 4| 47| 45.5|\n",
"| 12| 4| 44| 44.0|\n",
"+---+-----+----+-----+\n",
"\n"
]
}
],
"source": [
"a.withColumn(\"Avg\", F.mean(a['FF_1']).over(window_w_range)).show()"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-----+----+----+\n",
"| ID|Phase|FF_1| sum|\n",
"+---+-----+----+----+\n",
"| 1| 1| 16| 116|\n",
"| 2| 1| 100| 250|\n",
"| 3| 2| 150| 300|\n",
"| 4| 2| 150| 156|\n",
"| 5| 2| 6| 105|\n",
"| 6| 3| 99| 200|\n",
"| 7| 3| 101| 204|\n",
"| 8| 4| 103| 115|\n",
"| 9| 4| 12| 30|\n",
"| 10| 4| 18| 65|\n",
"| 11| 4| 47| 91|\n",
"| 12| 4| 44|null|\n",
"+---+-----+----+----+\n",
"\n"
]
}
],
"source": [
"def func():\n",
" result = F.col('FF_1') + F.lead(F.col('FF_1'), 1).over(window_wo_range)\n",
" return result\n",
"a.withColumn(\"sum\", func()).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
"starts.show()\n",
"print(\"Execution time without sort: {0} s\".format(time.time()-t_init))\n",
"\n",
"# All stops grouped by ID and MSN\n",
"t_init = time.time()\n",
"stops = a.filter(a[\"Speed\"] == 6.0)\\\n",
" .sort('Time')\\\n",
" .groupBy(\"ID\")\\\n",
" .agg(F.last(\"Time\").alias(\"Last_Speed_6\")) \n",
"stops.show()\n",
"print(\"Execution time with sort: {0} s\".format(time.time()-t_init))\n",
"t_init = time.time()\n",
"stops = a.filter(a[\"Speed\"] == 6.0)\\\n",
" .groupBy(\"ID\")\\\n",
" .agg(F.max(\"Time\").alias(\"Last_Speed_6\")) \n",
"stops.show()\n",
"print(\"Execution time without sort: {0} s\".format(time.time()-t_init))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment