Skip to content

Instantly share code, notes, and snippets.

@edevil
Forked from mr1azl/Haversine_SparkSQL.ipynb
Created August 17, 2016 15:23
Show Gist options
  • Save edevil/e76b13d0fe1efb1d15dfe37fbe44abd5 to your computer and use it in GitHub Desktop.
Save edevil/e76b13d0fe1efb1d15dfe37fbe44abd5 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import Row\n",
"import pyspark.sql.functions as F\n"
]
},
{
"cell_type": "code",
"execution_count": 99,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = sc.parallelize([Row(uid=\"49B4361512443A4DA\",tm=1462838368,lat=37.00,lon=-7.054599)\n",
" ,Row(uid=\"49B4361512443A4DA\",tm=1462838468,lat=38.00,lon=-8.054599)\n",
" ,Row(uid=\"49B4361512443A4DA\",tm=1462838568,lat=39.00,lon=-8.054599)]).toDF()"
]
},
{
"cell_type": "code",
"execution_count": 100,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 100,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.cache().count()"
]
},
{
"cell_type": "code",
"execution_count": 102,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"windowSpec = Window.partitionBy(df['uid']).orderBy(df['tm'].desc())"
]
},
{
"cell_type": "code",
"execution_count": 103,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"lat2=F.lag(df[\"lat\"]).over(windowSpec).alias('lat2')\n",
"lon2=F.lag(df[\"lon\"]).over(windowSpec).alias('lon2')"
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df1 = df.select('*',lat2,lon2)"
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+----------+-----------------+----+---------+\n",
"| lat| lon| tm| uid|lat2| lon2|\n",
"+----+---------+----------+-----------------+----+---------+\n",
"|39.0|-8.054599|1462838568|49B4361512443A4DA|null| null|\n",
"|38.0|-8.054599|1462838468|49B4361512443A4DA|39.0|-8.054599|\n",
"|37.0|-7.054599|1462838368|49B4361512443A4DA|38.0|-8.054599|\n",
"+----+---------+----------+-----------------+----+---------+\n",
"\n"
]
}
],
"source": [
"df1.show()"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"lat1=F.toRadians(\"lat\").alias(\"lat1\")\n",
"lon1=F.toRadians(\"lon\").alias(\"lon1\")\n",
"lat2=F.toRadians(\"lat2\").alias(\"lat2\")\n",
"lon2=F.toRadians(\"lon2\").alias(\"lon2\")\n",
"dlon = lon2 - lon1 \n",
"dlat = lat2 - lat1 \n",
"a = F.sin(dlat/2)**2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon/2)**2\n",
"c = F.lit(2) * F.asin(F.sqrt(a)) \n",
"r = F.lit(6371)\n",
"distance = (c * r).alias('dist')"
]
},
{
"cell_type": "code",
"execution_count": 107,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df3 = df1.select('*',distance)"
]
},
{
"cell_type": "code",
"execution_count": 108,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+----------+-----------------+----+---------+------------------+\n",
"| lat| lon| tm| uid|lat2| lon2| dist|\n",
"+----+---------+----------+-----------------+----+---------+------------------+\n",
"|39.0|-8.054599|1462838568|49B4361512443A4DA|null| null| null|\n",
"|38.0|-8.054599|1462838468|49B4361512443A4DA|39.0|-8.054599|111.19492664455889|\n",
"|37.0|-7.054599|1462838368|49B4361512443A4DA|38.0|-8.054599|141.93627980150382|\n",
"+----+---------+----------+-----------------+----+---------+------------------+\n",
"\n"
]
}
],
"source": [
"df3.show()"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------------+------------------+\n",
"| uid| sum(dist)|\n",
"+-----------------+------------------+\n",
"|49B4361512443A4DA|253.13120644606272|\n",
"+-----------------+------------------+\n",
"\n"
]
}
],
"source": [
"df3.groupby(\"uid\").agg(F.sum('dist')).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment