Created May 18, 2021
"name": "Spark_Apache_Sedona.ipynb",
"<a href=\"\" target=\"_parent\"><img src=\"\" alt=\"Open In Colab\"/></a>"
"# Install Spark"
"!apt-get install openjdk-11-jdk-headless -qq > /dev/null\n",
"!wget -q\n",
"!tar xf spark-3.0.2-bin-hadoop2.7.tgz\n",
"!pip install -q findspark"
"# Install Apache Sedona and dependencies"
"!pip install shapely\n",
"!pip install attrs\n",
"!pip install apache-sedona"
"Requirement already satisfied: shapely in /usr/local/lib/python3.7/dist-packages (1.7.1)\n",
"Requirement already satisfied: attrs in /usr/local/lib/python3.7/dist-packages (21.2.0)\n",
"Collecting apache-sedona\n",
"\u001b[?25l Downloading (67kB)\n",
"\u001b[K |████████████████████████████████| 71kB 3.9MB/s \n",
"\u001b[?25hCollecting pyspark<3.1.0\n",
"\u001b[?25l Downloading (204.8MB)\n",
"\u001b[K |████████████████████████████████| 204.8MB 51kB/s \n",
"\u001b[?25hRequirement already satisfied: shapely in /usr/local/lib/python3.7/dist-packages (from apache-sedona) (1.7.1)\n",
"Requirement already satisfied: attrs in /usr/local/lib/python3.7/dist-packages (from apache-sedona) (21.2.0)\n",
"Collecting py4j==0.10.9\n",
"\u001b[?25l Downloading (198kB)\n",
"\u001b[K |████████████████████████████████| 204kB 42.0MB/s \n",
"\u001b[?25hBuilding wheels for collected packages: pyspark\n",
" Building wheel for pyspark ( ... \u001b[?25l\u001b[?25hdone\n",
" Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=4aadc827253c8a86653ab441f2e0afa0d94d57d6293bbbe425ad611c161eb877\n",
" Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16\n",
"Successfully built pyspark\n",
"Installing collected packages: py4j, pyspark, apache-sedona\n",
"Successfully installed apache-sedona-1.0.0 py4j-0.10.9 pyspark-3.0.2\n"
"# Set location of Java and Spark\n"
"import os\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-11-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-3.0.2-bin-hadoop2.7\""
"# Load packages"
"from pyspark.sql import SparkSession\n",
"from pyspark import SparkFiles\n",
"from sedona.register import SedonaRegistrator\n",
"from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
"import findspark\n",
"# Create SparkSession"
"spark = SparkSession. \\\n",
" builder. \\\n",
" appName('sedonaTest'). \\\n",
" config(\"spark.serializer\", KryoSerializer.getName). \\\n",
" config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName). \\\n",
" config('spark.jars.packages',\n",
" 'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'\n",
" 'org.datasyslab:geotools-wrapper:geotools-24.0'). \\\n",
" getOrCreate()\n",
"# Read data\n",
"The raw data is a text file containing latitude/longitidue coordinates of cities worldwide."
"url = \"\"\n",
"world_cities =\"file://\"+SparkFiles.get(\"worldcities.csv\"), header=True, inferSchema= True)\n",
"#Convert lat/long to geometric points\n",
"The Apache Sedona constructor ST_Point takes Decimal typed values X,Y and constructs a geometric point."
"# create Spark DF containing geometric representation of lat/long coordinates (column name \"Geom\")\n",
"geom_df = spark.sql(\n",
" \"\"\"\n",
" SELECT *, ST_Point(CAST(world_cities.lng AS Decimal(24,20)), CAST( AS Decimal(24,20))) As Geom\n",
" FROM world_cities\n",
" \"\"\"\n",
" \n",
"# write a local, temporary table for use with Spark SQL\n",
"# Distance calculations\n",
"## All points to a single query points\n",
"Compute the Euclidean distances between all cities and a single query point. Results are ordered by smallest distance to query point.\n",
"Notation: *N x 1*\n",
"The ST_Distance function takes two geometric inputs and computes the Euclidean distance."
"knn_query = spark.sql(\n",
" \"\"\"\n",
" SELECT Geom, city, admin_name, ST_Distance(ST_Point(-78.858060, 43.062770), Geom) AS distance \n",
" FROM geom_df \n",
" ORDER BY distance\n",
" \"\"\"\n",
"## All pairwise points\n",
"Compute the distances between all pairs of cities. This is done by creating a second DataFrame and computing distances between geometric coordinates in the first and second DataFrame using the ST_Distance function. In this toy example, the original DataFrame is replicated. In a real scenario, the first DataFrame may contain member coordinates and the second provider coordinates.\n",
"Notation: *N x N*"
"geom_df2 = spark.sql(\n",
" \"\"\"\n",
" SELECT *, Geom As Geom2\n",
" FROM geom_df \n",
" \"\"\"\n",
" \n",
"knn_all = spark.sql(\n",
" \"\"\"\n",
" SELECT CONCAT(, ',', as Location1,\n",
" CONCAT(, ',', as Location2,\n",
" ST_Distance(geom_df.Geom, geom_df2.Geom) As Distance\n",
" FROM geom_df, geom_df2\n",
" WHERE ST_Distance(geom_df.Geom, geom_df2.Geom) < 0.1\n",
" \"\"\"\n",
"# Cities and their top 3 closest neighboring cities"
"top_3 = spark.sql(\n",
" \"\"\"\n",
" SELECT *\n",
" FROM (\n",
" SELECT *, DENSE_RANK() OVER (PARTITION BY Location1 ORDER BY Distance) as rank\n",
" FROM knn_all\n",
" ) \n",
" WHERE rank <= 3;\n",
" \"\"\"\n",
