Skip to content

Instantly share code, notes, and snippets.

@vykhand
Created May 12, 2016 10:30
Show Gist options
  • Save vykhand/1f2484ff14fbbf805234160cf90668b4 to your computer and use it in GitHub Desktop.
Save vykhand/1f2484ff14fbbf805234160cf90668b4 to your computer and use it in GitHub Desktop.
Spark SQL leakage solution for Kaggle Expedia competition
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Expedia Kaggle competition\n",
"### Author: [Andrey Vykhodtsev](https://www.kaggle.com/vykhand)\n",
"### Date: 09.05.2016\n",
"\n",
"\n",
"## Exploiting information leak with Apache Spark and Pandas \n",
"\n",
"This script is prepared for [Expedia Kaggle Competition](https://www.kaggle.com/c/expedia-hotel-recommendations)\n",
"\n",
"Details about information leak can be found here: [Data leak forum post](https://www.kaggle.com/c/expedia-hotel-recommendations/forums/t/20345/data-leak)\n",
"\n",
"From the perspective of logic, this is a direct copy of the [ZFTurbo's](https://www.kaggle.com/zfturbo) script:\n",
"\n",
"https://www.kaggle.com/zfturbo/expedia-hotel-recommendations/leakage-solution/run/231363\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Preparation\n",
"\n",
"In order to run this notebook, you need to have Apache Spark and Hive installed.\n",
"\n",
"I use [IBM Apache Hadoop](https://www.ibm.com/support/knowledgecenter/SSPT3X_4.0.0/com.ibm.swg.im.infosphere.biginsights.install.doc/doc/bi_install_iop_biginsights.html) distribution and standalone [Apache Spark 1.6.1](spark.apache.org)\n",
"\n",
"This notebook assumes that spark is located in /opt/spark\n",
"\n",
"This notebook also relies on [Spark-CSV](https://github.com/databricks/spark-csv) package developed by DataBricks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up Spark\n",
"\n",
"I use manual setup from the plain vanilla python kernel:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import sys\n",
"import os\n",
"import pandas as pd\n",
"import numpy as np\n",
"sys.path.append('/opt/spark/python')\n",
"os.environ['SPARK_HOME']='/opt/spark'\n",
"os.environ['HADOOP_CONF_DIR']='/etc/hadoop/conf'\n",
"os.environ['HIVE_CONF_DIR']='/etc/hive/conf'\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[8] --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell'"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark import SparkContext, SparkConf, HiveContext, SQLContext"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"conf = SparkConf()\n",
"conf.setAppName('Expedia')\n",
"conf.set(\"spark.driver.memory\", \"12g\")\n",
"conf.set(\"spark.executor.memory\", \"8g\")\n",
"\n",
"sc = SparkContext(conf=conf)\n",
"sqlContext = HiveContext(sc)\n",
"#sqlContext = SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also install a separate kernel that uses pyspark shell settings. Instruction [can be found here](http://thepowerofdata.io/configuring-jupyteripython-notebook-to-work-with-pyspark-1-4-0). Note that some library names have changed, here is my config:\n",
"\n",
"```\n",
"{\n",
" \"display_name\": \"pySpark (Spark 1.6.1)\",\n",
" \"language\": \"python\",\n",
" \"argv\": [\n",
" \"python\",\n",
" \"-m\",\n",
" \"IPython.kernel\",\n",
" \"-f\",\n",
" \"{connection_file}\"\n",
" ],\n",
" \"env\": {\n",
" \"SPARK_HOME\": \"/opt/spark\",\n",
" \"PYTHONPATH\": \"/opt/spark/python/:/opt/spark/python/lib/py4j-0.9-src.zip\",\n",
" \"PYTHONSTARTUP\": \"/opt/spark/python/pyspark/shell.py\",\n",
" \"PYSPARK_SUBMIT_ARGS\": \"--master local --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell\"\n",
" }\n",
"}\n",
"```\n",
"\n",
"### Checking contexts"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"data": {
"text/plain": [
"'1.6.1'"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc.version"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<pyspark.sql.context.HiveContext at 0x7ffb3ec67ef0>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sqlContext"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading data\n",
"\n",
"I put files into HDFS but it is possible to read local files too, use [file:///<your filepath>]()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 88 ms, sys: 8 ms, total: 96 ms\n",
"Wall time: 2min 35s\n"
]
}
],
"source": [
"%%time\n",
"train = (sqlContext\n",
" .read\n",
" .format('com.databricks.spark.csv')\n",
" .options(header='true', inferschema='true')\n",
" .load('hdfs:///projects/kaggle-expedia/input/train.csv'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Inferring schema is a lengthy process (takes 2.5 min on my machine). You can monitor it using Spark Monitor [address:4040]()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- date_time: timestamp (nullable = true)\n",
" |-- site_name: integer (nullable = true)\n",
" |-- posa_continent: integer (nullable = true)\n",
" |-- user_location_country: integer (nullable = true)\n",
" |-- user_location_region: integer (nullable = true)\n",
" |-- user_location_city: integer (nullable = true)\n",
" |-- orig_destination_distance: double (nullable = true)\n",
" |-- user_id: integer (nullable = true)\n",
" |-- is_mobile: integer (nullable = true)\n",
" |-- is_package: integer (nullable = true)\n",
" |-- channel: integer (nullable = true)\n",
" |-- srch_ci: string (nullable = true)\n",
" |-- srch_co: string (nullable = true)\n",
" |-- srch_adults_cnt: integer (nullable = true)\n",
" |-- srch_children_cnt: integer (nullable = true)\n",
" |-- srch_rm_cnt: integer (nullable = true)\n",
" |-- srch_destination_id: integer (nullable = true)\n",
" |-- srch_destination_type_id: integer (nullable = true)\n",
" |-- is_booking: integer (nullable = true)\n",
" |-- cnt: integer (nullable = true)\n",
" |-- hotel_continent: integer (nullable = true)\n",
" |-- hotel_country: integer (nullable = true)\n",
" |-- hotel_market: integer (nullable = true)\n",
" |-- hotel_cluster: integer (nullable = true)\n",
"\n"
]
}
],
"source": [
"train.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### as you see many methods are similar to Pandas"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"[('date_time', 'timestamp'),\n",
" ('site_name', 'int'),\n",
" ('posa_continent', 'int'),\n",
" ('user_location_country', 'int'),\n",
" ('user_location_region', 'int'),\n",
" ('user_location_city', 'int'),\n",
" ('orig_destination_distance', 'double'),\n",
" ('user_id', 'int'),\n",
" ('is_mobile', 'int'),\n",
" ('is_package', 'int'),\n",
" ('channel', 'int'),\n",
" ('srch_ci', 'string'),\n",
" ('srch_co', 'string'),\n",
" ('srch_adults_cnt', 'int'),\n",
" ('srch_children_cnt', 'int'),\n",
" ('srch_rm_cnt', 'int'),\n",
" ('srch_destination_id', 'int'),\n",
" ('srch_destination_type_id', 'int'),\n",
" ('is_booking', 'int'),\n",
" ('cnt', 'int'),\n",
" ('hotel_continent', 'int'),\n",
" ('hotel_country', 'int'),\n",
" ('hotel_market', 'int'),\n",
" ('hotel_cluster', 'int')]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"StructType(List(StructField(date_time,TimestampType,true),StructField(site_name,IntegerType,true),StructField(posa_continent,IntegerType,true),StructField(user_location_country,IntegerType,true),StructField(user_location_region,IntegerType,true),StructField(user_location_city,IntegerType,true),StructField(orig_destination_distance,DoubleType,true),StructField(user_id,IntegerType,true),StructField(is_mobile,IntegerType,true),StructField(is_package,IntegerType,true),StructField(channel,IntegerType,true),StructField(srch_ci,StringType,true),StructField(srch_co,StringType,true),StructField(srch_adults_cnt,IntegerType,true),StructField(srch_children_cnt,IntegerType,true),StructField(srch_rm_cnt,IntegerType,true),StructField(srch_destination_id,IntegerType,true),StructField(srch_destination_type_id,IntegerType,true),StructField(is_booking,IntegerType,true),StructField(cnt,IntegerType,true),StructField(hotel_continent,IntegerType,true),StructField(hotel_country,IntegerType,true),StructField(hotel_market,IntegerType,true),StructField(hotel_cluster,IntegerType,true)))"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.schema"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Saving schema in case we need it later, so we don't run inferschema once more"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pickle\n",
"pickle.dump(train.schema, open('../train.schema.p','wb'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### actual caching happens when we first time access the data"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: string, srch_co: string, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.cache()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 104 ms, sys: 0 ns, total: 104 ms\n",
"Wall time: 3min 11s\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>date_time</th>\n",
" <th>site_name</th>\n",
" <th>posa_continent</th>\n",
" <th>user_location_country</th>\n",
" <th>user_location_region</th>\n",
" <th>user_location_city</th>\n",
" <th>orig_destination_distance</th>\n",
" <th>user_id</th>\n",
" <th>is_mobile</th>\n",
" <th>is_package</th>\n",
" <th>...</th>\n",
" <th>srch_children_cnt</th>\n",
" <th>srch_rm_cnt</th>\n",
" <th>srch_destination_id</th>\n",
" <th>srch_destination_type_id</th>\n",
" <th>is_booking</th>\n",
" <th>cnt</th>\n",
" <th>hotel_continent</th>\n",
" <th>hotel_country</th>\n",
" <th>hotel_market</th>\n",
" <th>hotel_cluster</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2014-08-11 07:46:59</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2014-08-11 08:22:12</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2014-08-11 08:24:33</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2014-08-09 18:05:16</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>35390</td>\n",
" <td>913.1932</td>\n",
" <td>93</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>14984</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1457</td>\n",
" <td>80</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2014-08-09 18:08:18</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>35390</td>\n",
" <td>913.6259</td>\n",
" <td>93</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>14984</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1457</td>\n",
" <td>21</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>5 rows × 24 columns</p>\n",
"</div>"
],
"text/plain": [
" date_time site_name posa_continent user_location_country \\\n",
"0 2014-08-11 07:46:59 2 3 66 \n",
"1 2014-08-11 08:22:12 2 3 66 \n",
"2 2014-08-11 08:24:33 2 3 66 \n",
"3 2014-08-09 18:05:16 2 3 66 \n",
"4 2014-08-09 18:08:18 2 3 66 \n",
"\n",
" user_location_region user_location_city orig_destination_distance \\\n",
"0 348 48862 2234.2641 \n",
"1 348 48862 2234.2641 \n",
"2 348 48862 2234.2641 \n",
"3 442 35390 913.1932 \n",
"4 442 35390 913.6259 \n",
"\n",
" user_id is_mobile is_package ... srch_children_cnt \\\n",
"0 12 0 1 ... 0 \n",
"1 12 0 1 ... 0 \n",
"2 12 0 0 ... 0 \n",
"3 93 0 0 ... 0 \n",
"4 93 0 0 ... 0 \n",
"\n",
" srch_rm_cnt srch_destination_id srch_destination_type_id is_booking cnt \\\n",
"0 1 8250 1 0 3 \n",
"1 1 8250 1 1 1 \n",
"2 1 8250 1 0 1 \n",
"3 1 14984 1 0 1 \n",
"4 1 14984 1 0 1 \n",
"\n",
" hotel_continent hotel_country hotel_market hotel_cluster \n",
"0 2 50 628 1 \n",
"1 2 50 628 1 \n",
"2 2 50 628 1 \n",
"3 2 50 1457 80 \n",
"4 2 50 1457 21 \n",
"\n",
"[5 rows x 24 columns]"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time \n",
"train.limit(5).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### let's save train as Parquet file for later reuse"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"train.write.parquet('/projects/kaggle-expedia/input/train.parquet')\n",
"train.unpersist()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: string, srch_co: string, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train = sqlContext.read.load('/projects/kaggle-expedia/input/train.parquet')\n",
"train.cache()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 48 ms, sys: 8 ms, total: 56 ms\n",
"Wall time: 1min 21s\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>date_time</th>\n",
" <th>site_name</th>\n",
" <th>posa_continent</th>\n",
" <th>user_location_country</th>\n",
" <th>user_location_region</th>\n",
" <th>user_location_city</th>\n",
" <th>orig_destination_distance</th>\n",
" <th>user_id</th>\n",
" <th>is_mobile</th>\n",
" <th>is_package</th>\n",
" <th>...</th>\n",
" <th>srch_children_cnt</th>\n",
" <th>srch_rm_cnt</th>\n",
" <th>srch_destination_id</th>\n",
" <th>srch_destination_type_id</th>\n",
" <th>is_booking</th>\n",
" <th>cnt</th>\n",
" <th>hotel_continent</th>\n",
" <th>hotel_country</th>\n",
" <th>hotel_market</th>\n",
" <th>hotel_cluster</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2014-08-11 07:46:59</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2014-08-11 08:22:12</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2014-08-11 08:24:33</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>48862</td>\n",
" <td>2234.2641</td>\n",
" <td>12</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2014-08-09 18:05:16</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>35390</td>\n",
" <td>913.1932</td>\n",
" <td>93</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>14984</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1457</td>\n",
" <td>80</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2014-08-09 18:08:18</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>35390</td>\n",
" <td>913.6259</td>\n",
" <td>93</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>14984</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1457</td>\n",
" <td>21</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>5 rows × 24 columns</p>\n",
"</div>"
],
"text/plain": [
" date_time site_name posa_continent user_location_country \\\n",
"0 2014-08-11 07:46:59 2 3 66 \n",
"1 2014-08-11 08:22:12 2 3 66 \n",
"2 2014-08-11 08:24:33 2 3 66 \n",
"3 2014-08-09 18:05:16 2 3 66 \n",
"4 2014-08-09 18:08:18 2 3 66 \n",
"\n",
" user_location_region user_location_city orig_destination_distance \\\n",
"0 348 48862 2234.2641 \n",
"1 348 48862 2234.2641 \n",
"2 348 48862 2234.2641 \n",
"3 442 35390 913.1932 \n",
"4 442 35390 913.6259 \n",
"\n",
" user_id is_mobile is_package ... srch_children_cnt \\\n",
"0 12 0 1 ... 0 \n",
"1 12 0 1 ... 0 \n",
"2 12 0 0 ... 0 \n",
"3 93 0 0 ... 0 \n",
"4 93 0 0 ... 0 \n",
"\n",
" srch_rm_cnt srch_destination_id srch_destination_type_id is_booking cnt \\\n",
"0 1 8250 1 0 3 \n",
"1 1 8250 1 1 1 \n",
"2 1 8250 1 0 1 \n",
"3 1 14984 1 0 1 \n",
"4 1 14984 1 0 1 \n",
"\n",
" hotel_continent hotel_country hotel_market hotel_cluster \n",
"0 2 50 628 1 \n",
"1 2 50 628 1 \n",
"2 2 50 628 1 \n",
"3 2 50 1457 80 \n",
"4 2 50 1457 21 \n",
"\n",
"[5 rows x 24 columns]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"train.limit(5).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Now reading takes half as much time - 1.2 min vs 2.6 min and it is much better compressed:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3.8 G /projects/kaggle-expedia/input/train.csv\r\n",
"544.6 M /projects/kaggle-expedia/input/train.parquet\r\n"
]
}
],
"source": [
"!hadoop fs -du -h /projects/kaggle-expedia/input"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now let's get to work \n",
"\n",
"Our plan is simple - we will build 3 aggregates with Spark SQL, then we will merge them together leaving only 5 most important rows. We will use windowed analytic functions to achieve this"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pyspark.sql.types as typs\n",
"from pyspark.sql import Window\n",
"from pyspark.sql.functions import desc, rowNumber, denseRank\n",
"w1 = Window.partitionBy(\"user_location_city\", \"orig_destination_distance\").orderBy(desc('cnt'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### An aggregate of hotel clusters aggregated by user_location_city and orig_destination_distance, then assigned ranks by the count"
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"agg_ulc_odd_hc = (train\n",
" # omitting empty orig_destination_distance\n",
" .filter(train['orig_destination_distance'].isNotNull())\n",
" # changing the datatype to integer as I *think* join on integer should be faster\n",
" # though this is something to be tested\n",
" .withColumn('orig_destination_distance',\n",
" ( train['orig_destination_distance']*1e5).cast(typs.IntegerType()))\n",
" .select(['user_location_city', 'orig_destination_distance', 'hotel_cluster', 'is_booking'])\n",
" .groupBy('user_location_city', 'orig_destination_distance', 'hotel_cluster')\n",
" .count()\n",
" #rename count to cnt\n",
" .withColumnRenamed('count', 'cnt')\n",
" #windowed aggregate - assigns numbers in order of descending count\n",
" .select('*', rowNumber().over(w1).alias('rn'))\n",
" .filter('rn <= 5')\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[user_location_city: int, orig_destination_distance: int, hotel_cluster: int, cnt: bigint, rn: int]"
]
},
"execution_count": 76,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_ulc_odd_hc.cache()"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 18.4 s\n"
]
},
{
"data": {
"text/plain": [
"10105676"
]
},
"execution_count": 77,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"agg_ulc_odd_hc.count()"
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12 ms, sys: 0 ns, total: 12 ms\n",
"Wall time: 290 ms\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>user_location_city</th>\n",
" <th>orig_destination_distance</th>\n",
" <th>hotel_cluster</th>\n",
" <th>cnt</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>14</td>\n",
" <td>16512629</td>\n",
" <td>62</td>\n",
" <td>4</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>14</td>\n",
" <td>129810629</td>\n",
" <td>57</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>14</td>\n",
" <td>410116029</td>\n",
" <td>92</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>14</td>\n",
" <td>428735229</td>\n",
" <td>18</td>\n",
" <td>2</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>14</td>\n",
" <td>987809629</td>\n",
" <td>81</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>21</td>\n",
" <td>13909970</td>\n",
" <td>20</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>21</td>\n",
" <td>29756570</td>\n",
" <td>30</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>21</td>\n",
" <td>37319370</td>\n",
" <td>85</td>\n",
" <td>3</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>21</td>\n",
" <td>51158570</td>\n",
" <td>77</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>21</td>\n",
" <td>55512570</td>\n",
" <td>16</td>\n",
" <td>2</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" user_location_city orig_destination_distance hotel_cluster cnt rn\n",
"0 14 16512629 62 4 1\n",
"1 14 129810629 57 1 1\n",
"2 14 410116029 92 1 1\n",
"3 14 428735229 18 2 1\n",
"4 14 987809629 81 1 1\n",
"5 21 13909970 20 1 1\n",
"6 21 29756570 30 1 1\n",
"7 21 37319370 85 3 1\n",
"8 21 51158570 77 1 1\n",
"9 21 55512570 16 2 1"
]
},
"execution_count": 78,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"agg_ulc_odd_hc.limit(10).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.filter(train['srch_destination_id'].isNull()).count()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.filter(train['hotel_country'].isNull()).count()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"train.filter(train['hotel_market'].isNull()).count()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"formula1 = 3+17*train['is_booking']\n",
"formula2 = 2+5*train['is_booking']"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"w2 = Window.partitionBy('srch_destination_id', 'hotel_country', 'hotel_market').orderBy(desc('sum_wb'))"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"agg_best_search_dest_ctry = (train\n",
" .filter('year(date_time) = 2014')\n",
" .select('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster', (formula1).alias('wb'))\n",
" .groupby('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster')\n",
" .sum('wb')\n",
" .withColumnRenamed('sum(wb)', 'sum_wb')\n",
" .orderBy(desc('sum_wb'))\n",
" .select('*', rowNumber().over(w2).alias(\"rn\"))\n",
" .filter('rn <= 5')\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[srch_destination_id: int, hotel_country: int, hotel_market: int, hotel_cluster: int, sum_wb: bigint, rn: int]"
]
},
"execution_count": 90,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_best_search_dest_ctry.cache()"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 536 ms\n"
]
},
{
"data": {
"text/plain": [
"174673"
]
},
"execution_count": 91,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"agg_best_search_dest_ctry.count()"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>srch_destination_id</th>\n",
" <th>hotel_country</th>\n",
" <th>hotel_market</th>\n",
" <th>hotel_cluster</th>\n",
" <th>sum_wb</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>267</td>\n",
" <td>70</td>\n",
" <td>1926</td>\n",
" <td>44</td>\n",
" <td>26</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>267</td>\n",
" <td>70</td>\n",
" <td>1926</td>\n",
" <td>30</td>\n",
" <td>21</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>267</td>\n",
" <td>70</td>\n",
" <td>1926</td>\n",
" <td>3</td>\n",
" <td>9</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>267</td>\n",
" <td>70</td>\n",
" <td>1926</td>\n",
" <td>93</td>\n",
" <td>9</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>304</td>\n",
" <td>82</td>\n",
" <td>229</td>\n",
" <td>3</td>\n",
" <td>160</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>330</td>\n",
" <td>119</td>\n",
" <td>2066</td>\n",
" <td>53</td>\n",
" <td>42</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>811</td>\n",
" <td>127</td>\n",
" <td>1281</td>\n",
" <td>85</td>\n",
" <td>79</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>811</td>\n",
" <td>127</td>\n",
" <td>1281</td>\n",
" <td>57</td>\n",
" <td>15</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>965</td>\n",
" <td>203</td>\n",
" <td>243</td>\n",
" <td>30</td>\n",
" <td>32</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>965</td>\n",
" <td>203</td>\n",
" <td>243</td>\n",
" <td>53</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>10</th>\n",
" <td>965</td>\n",
" <td>203</td>\n",
" <td>243</td>\n",
" <td>60</td>\n",
" <td>3</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>11</th>\n",
" <td>965</td>\n",
" <td>203</td>\n",
" <td>243</td>\n",
" <td>85</td>\n",
" <td>3</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>12</th>\n",
" <td>1156</td>\n",
" <td>106</td>\n",
" <td>753</td>\n",
" <td>62</td>\n",
" <td>799</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>13</th>\n",
" <td>1156</td>\n",
" <td>106</td>\n",
" <td>753</td>\n",
" <td>43</td>\n",
" <td>788</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>14</th>\n",
" <td>1156</td>\n",
" <td>106</td>\n",
" <td>753</td>\n",
" <td>78</td>\n",
" <td>585</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15</th>\n",
" <td>1156</td>\n",
" <td>106</td>\n",
" <td>753</td>\n",
" <td>29</td>\n",
" <td>554</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <td>1156</td>\n",
" <td>106</td>\n",
" <td>753</td>\n",
" <td>77</td>\n",
" <td>522</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>17</th>\n",
" <td>1227</td>\n",
" <td>50</td>\n",
" <td>426</td>\n",
" <td>40</td>\n",
" <td>23</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>18</th>\n",
" <td>1227</td>\n",
" <td>50</td>\n",
" <td>426</td>\n",
" <td>15</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>1298</td>\n",
" <td>63</td>\n",
" <td>1146</td>\n",
" <td>36</td>\n",
" <td>35</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" srch_destination_id hotel_country hotel_market hotel_cluster sum_wb \\\n",
"0 267 70 1926 44 26 \n",
"1 267 70 1926 30 21 \n",
"2 267 70 1926 3 9 \n",
"3 267 70 1926 93 9 \n",
"4 304 82 229 3 160 \n",
"5 330 119 2066 53 42 \n",
"6 811 127 1281 85 79 \n",
"7 811 127 1281 57 15 \n",
"8 965 203 243 30 32 \n",
"9 965 203 243 53 3 \n",
"10 965 203 243 60 3 \n",
"11 965 203 243 85 3 \n",
"12 1156 106 753 62 799 \n",
"13 1156 106 753 43 788 \n",
"14 1156 106 753 78 585 \n",
"15 1156 106 753 29 554 \n",
"16 1156 106 753 77 522 \n",
"17 1227 50 426 40 23 \n",
"18 1227 50 426 15 3 \n",
"19 1298 63 1146 36 35 \n",
"\n",
" rn \n",
"0 1 \n",
"1 2 \n",
"2 3 \n",
"3 4 \n",
"4 1 \n",
"5 1 \n",
"6 1 \n",
"7 2 \n",
"8 1 \n",
"9 2 \n",
"10 3 \n",
"11 4 \n",
"12 1 \n",
"13 2 \n",
"14 3 \n",
"15 4 \n",
"16 5 \n",
"17 1 \n",
"18 2 \n",
"19 1 "
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_best_search_dest_ctry.limit(20).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"w3 = Window.partitionBy('srch_destination_id').orderBy(desc('sum_wb'))"
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[srch_destination_id: int, hotel_cluster: int, sum_wb: bigint, rn: int]"
]
},
"execution_count": 94,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_best_search_dest_2 = (train\n",
" .select('srch_destination_id', 'hotel_cluster', (formula1).alias('wb'))\n",
" .groupby('srch_destination_id', 'hotel_cluster')\n",
" .sum('wb')\n",
" .withColumnRenamed('sum(wb)', 'sum_wb')\n",
" .orderBy(desc('sum_wb'))\n",
" .select('*', rowNumber().over(w3).alias(\"rn\"))\n",
" .filter('rn <= 5'))\n",
"agg_best_search_dest_2.cache()"
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4 ms, sys: 0 ns, total: 4 ms\n",
"Wall time: 6.56 s\n"
]
},
{
"data": {
"text/plain": [
"184373"
]
},
"execution_count": 95,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"agg_best_search_dest_2.count()"
]
},
{
"cell_type": "code",
"execution_count": 96,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>srch_destination_id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>sum_wb</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>31</td>\n",
" <td>62</td>\n",
" <td>15</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>31</td>\n",
" <td>25</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>31</td>\n",
" <td>82</td>\n",
" <td>6</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>31</td>\n",
" <td>5</td>\n",
" <td>3</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>31</td>\n",
" <td>30</td>\n",
" <td>3</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>231</td>\n",
" <td>60</td>\n",
" <td>36</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>231</td>\n",
" <td>30</td>\n",
" <td>29</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>231</td>\n",
" <td>85</td>\n",
" <td>18</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>231</td>\n",
" <td>3</td>\n",
" <td>12</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>231</td>\n",
" <td>53</td>\n",
" <td>9</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>10</th>\n",
" <td>431</td>\n",
" <td>46</td>\n",
" <td>1554</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>11</th>\n",
" <td>431</td>\n",
" <td>44</td>\n",
" <td>376</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>12</th>\n",
" <td>431</td>\n",
" <td>60</td>\n",
" <td>316</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>13</th>\n",
" <td>431</td>\n",
" <td>57</td>\n",
" <td>285</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>14</th>\n",
" <td>431</td>\n",
" <td>61</td>\n",
" <td>283</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15</th>\n",
" <td>631</td>\n",
" <td>78</td>\n",
" <td>308</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <td>631</td>\n",
" <td>82</td>\n",
" <td>182</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>17</th>\n",
" <td>631</td>\n",
" <td>30</td>\n",
" <td>153</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>18</th>\n",
" <td>631</td>\n",
" <td>62</td>\n",
" <td>134</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>631</td>\n",
" <td>20</td>\n",
" <td>117</td>\n",
" <td>5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" srch_destination_id hotel_cluster sum_wb rn\n",
"0 31 62 15 1\n",
"1 31 25 6 2\n",
"2 31 82 6 3\n",
"3 31 5 3 4\n",
"4 31 30 3 5\n",
"5 231 60 36 1\n",
"6 231 30 29 2\n",
"7 231 85 18 3\n",
"8 231 3 12 4\n",
"9 231 53 9 5\n",
"10 431 46 1554 1\n",
"11 431 44 376 2\n",
"12 431 60 316 3\n",
"13 431 57 285 4\n",
"14 431 61 283 5\n",
"15 631 78 308 1\n",
"16 631 82 182 2\n",
"17 631 30 153 3\n",
"18 631 62 134 4\n",
"19 631 20 117 5"
]
},
"execution_count": 96,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_best_search_dest_2.limit(20).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 225,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[hotel_cluster: int, sum_wb: bigint]"
]
},
"execution_count": 225,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_popular_hotel_cluster = (train\n",
" .select('hotel_cluster', (formula2).alias('wb'))\n",
" .groupby('hotel_cluster')\n",
" .sum('wb')\n",
" .orderBy(desc('sum(wb)'))\n",
" .withColumnRenamed('sum(wb)', 'sum_wb'))\n",
"agg_popular_hotel_cluster.cache()"
]
},
{
"cell_type": "code",
"execution_count": 226,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 8 ms, total: 8 ms\n",
"Wall time: 216 ms\n"
]
},
{
"data": {
"text/plain": [
"100"
]
},
"execution_count": 226,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"agg_popular_hotel_cluster.count()"
]
},
{
"cell_type": "code",
"execution_count": 227,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hotel_cluster</th>\n",
" <th>sum_wb</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>91</td>\n",
" <td>2692300</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>48</td>\n",
" <td>1934951</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>41</td>\n",
" <td>1794361</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>64</td>\n",
" <td>1649553</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>59</td>\n",
" <td>1473627</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>42</td>\n",
" <td>1468825</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>5</td>\n",
" <td>1454058</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>65</td>\n",
" <td>1449310</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>98</td>\n",
" <td>1423161</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>18</td>\n",
" <td>1368863</td>\n",
" </tr>\n",
" <tr>\n",
" <th>10</th>\n",
" <td>21</td>\n",
" <td>1357534</td>\n",
" </tr>\n",
" <tr>\n",
" <th>11</th>\n",
" <td>28</td>\n",
" <td>1333922</td>\n",
" </tr>\n",
" <tr>\n",
" <th>12</th>\n",
" <td>46</td>\n",
" <td>1302061</td>\n",
" </tr>\n",
" <tr>\n",
" <th>13</th>\n",
" <td>95</td>\n",
" <td>1294157</td>\n",
" </tr>\n",
" <tr>\n",
" <th>14</th>\n",
" <td>25</td>\n",
" <td>1293042</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15</th>\n",
" <td>82</td>\n",
" <td>1279110</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <td>70</td>\n",
" <td>1275599</td>\n",
" </tr>\n",
" <tr>\n",
" <th>17</th>\n",
" <td>50</td>\n",
" <td>1251829</td>\n",
" </tr>\n",
" <tr>\n",
" <th>18</th>\n",
" <td>62</td>\n",
" <td>1240263</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>16</td>\n",
" <td>1239116</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hotel_cluster sum_wb\n",
"0 91 2692300\n",
"1 48 1934951\n",
"2 41 1794361\n",
"3 64 1649553\n",
"4 59 1473627\n",
"5 42 1468825\n",
"6 5 1454058\n",
"7 65 1449310\n",
"8 98 1423161\n",
"9 18 1368863\n",
"10 21 1357534\n",
"11 28 1333922\n",
"12 46 1302061\n",
"13 95 1294157\n",
"14 25 1293042\n",
"15 82 1279110\n",
"16 70 1275599\n",
"17 50 1251829\n",
"18 62 1240263\n",
"19 16 1239116"
]
},
"execution_count": 227,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"agg_popular_hotel_cluster.limit(20).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12 ms, sys: 4 ms, total: 16 ms\n",
"Wall time: 26 s\n"
]
}
],
"source": [
"%%time\n",
"test = (sqlContext\n",
" .read\n",
" .format('com.databricks.spark.csv')\n",
" .options(header='true', inferschema='true')\n",
" .load('hdfs:///projects/kaggle-expedia/input/test.csv'))"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 16 ms, sys: 0 ns, total: 16 ms\n",
"Wall time: 40.6 s\n"
]
}
],
"source": [
"%%time\n",
"test.write.parquet('/projects/kaggle-expedia/input/test.parquet')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### for the next time we can load like this"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4 ms, sys: 0 ns, total: 4 ms\n",
"Wall time: 249 ms\n"
]
}
],
"source": [
"%%time\n",
"test = sqlContext.read.load('/projects/kaggle-expedia/input/test.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>date_time</th>\n",
" <th>site_name</th>\n",
" <th>posa_continent</th>\n",
" <th>user_location_country</th>\n",
" <th>user_location_region</th>\n",
" <th>user_location_city</th>\n",
" <th>orig_destination_distance</th>\n",
" <th>user_id</th>\n",
" <th>is_mobile</th>\n",
" <th>...</th>\n",
" <th>srch_ci</th>\n",
" <th>srch_co</th>\n",
" <th>srch_adults_cnt</th>\n",
" <th>srch_children_cnt</th>\n",
" <th>srch_rm_cnt</th>\n",
" <th>srch_destination_id</th>\n",
" <th>srch_destination_type_id</th>\n",
" <th>hotel_continent</th>\n",
" <th>hotel_country</th>\n",
" <th>hotel_market</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>2015-09-03 17:09:54</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>174</td>\n",
" <td>37449</td>\n",
" <td>553905670</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>2016-05-19</td>\n",
" <td>2016-05-23</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>12243</td>\n",
" <td>6</td>\n",
" <td>6</td>\n",
" <td>204</td>\n",
" <td>27</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1</td>\n",
" <td>2015-09-24 17:38:35</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>174</td>\n",
" <td>37449</td>\n",
" <td>587329230</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>...</td>\n",
" <td>2016-05-12</td>\n",
" <td>2016-05-15</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>14474</td>\n",
" <td>7</td>\n",
" <td>6</td>\n",
" <td>204</td>\n",
" <td>1540</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2</td>\n",
" <td>2015-06-07 15:53:02</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>142</td>\n",
" <td>17440</td>\n",
" <td>397597760</td>\n",
" <td>20</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-07-26</td>\n",
" <td>2015-07-27</td>\n",
" <td>4</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>11353</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>699</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>3</td>\n",
" <td>2015-09-14 14:49:10</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>258</td>\n",
" <td>34156</td>\n",
" <td>150859750</td>\n",
" <td>28</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-09-14</td>\n",
" <td>2015-09-16</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>4</td>\n",
" <td>2015-07-17 09:32:04</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>467</td>\n",
" <td>36345</td>\n",
" <td>6679130</td>\n",
" <td>50</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-07-22</td>\n",
" <td>2015-07-23</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>11812</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>538</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>5</td>\n",
" <td>2015-07-21 11:58:45</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>311</td>\n",
" <td>48189</td>\n",
" <td>35985210</td>\n",
" <td>51</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-07-22</td>\n",
" <td>2015-07-24</td>\n",
" <td>4</td>\n",
" <td>0</td>\n",
" <td>2</td>\n",
" <td>11827</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>447</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>6</td>\n",
" <td>2015-07-29 07:58:39</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>311</td>\n",
" <td>48189</td>\n",
" <td>23734650</td>\n",
" <td>51</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-08-02</td>\n",
" <td>2015-08-03</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8271</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>696</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>7</td>\n",
" <td>2015-08-01 20:13:15</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>348</td>\n",
" <td>24811</td>\n",
" <td>21657850</td>\n",
" <td>51</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-08-03</td>\n",
" <td>2015-08-04</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8291</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>191</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>8</td>\n",
" <td>2015-11-07 12:29:09</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>311</td>\n",
" <td>48189</td>\n",
" <td>233767540</td>\n",
" <td>51</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2015-12-30</td>\n",
" <td>2015-12-31</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>8250</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>628</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>9</td>\n",
" <td>2015-11-08 16:21:37</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>311</td>\n",
" <td>48189</td>\n",
" <td>253979950</td>\n",
" <td>51</td>\n",
" <td>0</td>\n",
" <td>...</td>\n",
" <td>2016-01-02</td>\n",
" <td>2016-01-03</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>9145</td>\n",
" <td>1</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>364</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>10 rows × 22 columns</p>\n",
"</div>"
],
"text/plain": [
" id date_time site_name posa_continent user_location_country \\\n",
"0 0 2015-09-03 17:09:54 2 3 66 \n",
"1 1 2015-09-24 17:38:35 2 3 66 \n",
"2 2 2015-06-07 15:53:02 2 3 66 \n",
"3 3 2015-09-14 14:49:10 2 3 66 \n",
"4 4 2015-07-17 09:32:04 2 3 66 \n",
"5 5 2015-07-21 11:58:45 2 3 66 \n",
"6 6 2015-07-29 07:58:39 2 3 66 \n",
"7 7 2015-08-01 20:13:15 2 3 66 \n",
"8 8 2015-11-07 12:29:09 2 3 66 \n",
"9 9 2015-11-08 16:21:37 2 3 66 \n",
"\n",
" user_location_region user_location_city orig_destination_distance \\\n",
"0 174 37449 553905670 \n",
"1 174 37449 587329230 \n",
"2 142 17440 397597760 \n",
"3 258 34156 150859750 \n",
"4 467 36345 6679130 \n",
"5 311 48189 35985210 \n",
"6 311 48189 23734650 \n",
"7 348 24811 21657850 \n",
"8 311 48189 233767540 \n",
"9 311 48189 253979950 \n",
"\n",
" user_id is_mobile ... srch_ci srch_co srch_adults_cnt \\\n",
"0 1 1 ... 2016-05-19 2016-05-23 2 \n",
"1 1 1 ... 2016-05-12 2016-05-15 2 \n",
"2 20 0 ... 2015-07-26 2015-07-27 4 \n",
"3 28 0 ... 2015-09-14 2015-09-16 2 \n",
"4 50 0 ... 2015-07-22 2015-07-23 2 \n",
"5 51 0 ... 2015-07-22 2015-07-24 4 \n",
"6 51 0 ... 2015-08-02 2015-08-03 2 \n",
"7 51 0 ... 2015-08-03 2015-08-04 2 \n",
"8 51 0 ... 2015-12-30 2015-12-31 2 \n",
"9 51 0 ... 2016-01-02 2016-01-03 2 \n",
"\n",
" srch_children_cnt srch_rm_cnt srch_destination_id \\\n",
"0 0 1 12243 \n",
"1 0 1 14474 \n",
"2 0 1 11353 \n",
"3 0 1 8250 \n",
"4 0 1 11812 \n",
"5 0 2 11827 \n",
"6 0 1 8271 \n",
"7 0 1 8291 \n",
"8 0 1 8250 \n",
"9 0 1 9145 \n",
"\n",
" srch_destination_type_id hotel_continent hotel_country hotel_market \n",
"0 6 6 204 27 \n",
"1 7 6 204 1540 \n",
"2 1 2 50 699 \n",
"3 1 2 50 628 \n",
"4 1 2 50 538 \n",
"5 1 2 50 447 \n",
"6 1 2 50 696 \n",
"7 1 2 50 191 \n",
"8 1 2 50 628 \n",
"9 1 2 50 364 \n",
"\n",
"[10 rows x 22 columns]"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test = (test.withColumn('orig_destination_distance', (test['orig_destination_distance']*1e5).cast(typs.IntegerType())))\n",
"test.cache()\n",
"test.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Some stats about test"
]
},
{
"cell_type": "code",
"execution_count": 305,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2528243"
]
},
"execution_count": 305,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### all ids are unique"
]
},
{
"cell_type": "code",
"execution_count": 306,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2528243"
]
},
"execution_count": 306,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_test_ids = test.select('id').distinct().count()\n",
"n_test_ids"
]
},
{
"cell_type": "code",
"execution_count": 322,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>srch_destination_id</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>8250</td>\n",
" <td>62718</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>8267</td>\n",
" <td>41098</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>8279</td>\n",
" <td>23141</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>8745</td>\n",
" <td>21162</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>8268</td>\n",
" <td>20906</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" srch_destination_id count\n",
"0 8250 62718\n",
"1 8267 41098\n",
"2 8279 23141\n",
"3 8745 21162\n",
"4 8268 20906"
]
},
"execution_count": 322,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test.groupby('srch_destination_id').count().orderBy(desc('count')).limit(5).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Joining first aggregate that represents data leak"
]
},
{
"cell_type": "code",
"execution_count": 101,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: int, hotel_cluster: int, rn: int]"
]
},
"execution_count": 101,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_1 = (test.join(agg_ulc_odd_hc, ['user_location_city', 'orig_destination_distance'], 'inner')\n",
" .select('id', 'hotel_cluster', 'rn')\n",
" .orderBy('id', 'rn')\n",
" )\n",
"test_join_1.cache()"
]
},
{
"cell_type": "code",
"execution_count": 102,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 8 ms, sys: 4 ms, total: 12 ms\n",
"Wall time: 9.31 s\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2</td>\n",
" <td>91</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>3</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>4</td>\n",
" <td>50</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>4</td>\n",
" <td>51</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>8</td>\n",
" <td>88</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>25</td>\n",
" <td>98</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>25</td>\n",
" <td>95</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>26</td>\n",
" <td>31</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>27</td>\n",
" <td>39</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>29</td>\n",
" <td>71</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id hotel_cluster rn\n",
"0 2 91 1\n",
"1 3 1 1\n",
"2 4 50 1\n",
"3 4 51 2\n",
"4 8 88 1\n",
"5 25 98 1\n",
"6 25 95 2\n",
"7 26 31 1\n",
"8 27 39 1\n",
"9 29 71 1"
]
},
"execution_count": 102,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"test_join_1.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### How many IDs are matched by the first join"
]
},
{
"cell_type": "code",
"execution_count": 304,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"852711"
]
},
"execution_count": 304,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_ids_j1 = test_join_1.select('id').distinct().count()\n",
"n_ids_j1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Percentage of test records affected by dataleak - 1/3rd"
]
},
{
"cell_type": "code",
"execution_count": 310,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"33.72741465120244"
]
},
"execution_count": 310,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_ids_j1 / n_test_ids * 100"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### joining 2nd aggregate"
]
},
{
"cell_type": "code",
"execution_count": 103,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: int, hotel_cluster: int, rn: int]"
]
},
"execution_count": 103,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_2 = (test.join(agg_best_search_dest_ctry, ['srch_destination_id', 'hotel_country', 'hotel_market'], 'inner')\n",
" .select('id', 'hotel_cluster', (10*agg_best_search_dest_ctry['rn']).alias('rn'))\n",
" .orderBy('id', 'rn')\n",
" )\n",
"test_join_2.cache()"
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"12326935"
]
},
"execution_count": 105,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_2.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Almost all test ids have some match with the second aggreagate"
]
},
{
"cell_type": "code",
"execution_count": 311,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"2510819"
]
},
"execution_count": 311,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_ids_j2 = test_join_2.select('id').distinct().count()\n",
"n_ids_j2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Majority of IDs have 5 or more matches "
]
},
{
"cell_type": "code",
"execution_count": 315,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>count</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>20625</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>22951</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3</td>\n",
" <td>23621</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>4</td>\n",
" <td>28565</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>5</td>\n",
" <td>2415057</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" count count\n",
"0 1 20625\n",
"1 2 22951\n",
"2 3 23621\n",
"3 4 28565\n",
"4 5 2415057"
]
},
"execution_count": 315,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_2.groupby('id').count().groupby('count').count().toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### a quick look:"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>10</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>0</td>\n",
" <td>37</td>\n",
" <td>20</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>0</td>\n",
" <td>55</td>\n",
" <td>30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>0</td>\n",
" <td>22</td>\n",
" <td>40</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>0</td>\n",
" <td>11</td>\n",
" <td>50</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1</td>\n",
" <td>5</td>\n",
" <td>10</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>2</td>\n",
" <td>31</td>\n",
" <td>20</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>2</td>\n",
" <td>96</td>\n",
" <td>30</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>2</td>\n",
" <td>91</td>\n",
" <td>40</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id hotel_cluster rn\n",
"0 0 5 10\n",
"1 0 37 20\n",
"2 0 55 30\n",
"3 0 22 40\n",
"4 0 11 50\n",
"5 1 5 10\n",
"6 2 0 10\n",
"7 2 31 20\n",
"8 2 96 30\n",
"9 2 91 40"
]
},
"execution_count": 106,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_2.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### joining with 3rd aggregate"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: int, hotel_cluster: int, rn: int]"
]
},
"execution_count": 109,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_3 = (test.join(agg_best_search_dest_2, ['srch_destination_id'])\n",
" .select('id', 'hotel_cluster', (100*agg_best_search_dest_2['rn']).alias('rn'))\n",
" .orderBy('id', 'rn')\n",
" )\n",
"test_join_3.cache()"
]
},
{
"cell_type": "code",
"execution_count": 110,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
"Wall time: 17.8 s\n"
]
},
{
"data": {
"text/plain": [
"12380153"
]
},
"execution_count": 110,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"test_join_3.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### how many ids matched"
]
},
{
"cell_type": "code",
"execution_count": 317,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2514207"
]
},
"execution_count": 317,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_ids_3 = test_join_3.select('id').distinct().count()\n",
"n_ids_3"
]
},
{
"cell_type": "code",
"execution_count": 111,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>100</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>0</td>\n",
" <td>37</td>\n",
" <td>200</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>0</td>\n",
" <td>55</td>\n",
" <td>300</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>0</td>\n",
" <td>11</td>\n",
" <td>400</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>0</td>\n",
" <td>22</td>\n",
" <td>500</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1</td>\n",
" <td>5</td>\n",
" <td>100</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>100</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>2</td>\n",
" <td>31</td>\n",
" <td>200</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>2</td>\n",
" <td>96</td>\n",
" <td>300</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>2</td>\n",
" <td>91</td>\n",
" <td>400</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id hotel_cluster rn\n",
"0 0 5 100\n",
"1 0 37 200\n",
"2 0 55 300\n",
"3 0 11 400\n",
"4 0 22 500\n",
"5 1 5 100\n",
"6 2 0 100\n",
"7 2 31 200\n",
"8 2 96 300\n",
"9 2 91 400"
]
},
"execution_count": 111,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_join_3.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### looking at the IDs that didnot match any of the aggregates"
]
},
{
"cell_type": "code",
"execution_count": 332,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: int]"
]
},
"execution_count": 332,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"not_matched_ids = (test.select('id')\n",
" .subtract(test_join_1.select('id').distinct())\n",
" .subtract(test_join_2.select('id').distinct())\n",
" .subtract(test_join_3.select('id').distinct())\n",
" )\n",
"not_matched_ids.cache()"
]
},
{
"cell_type": "code",
"execution_count": 333,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"11960"
]
},
"execution_count": 333,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"not_matched_ids.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### a glimpse at those ids that have not matched"
]
},
{
"cell_type": "code",
"execution_count": 336,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>date_time</th>\n",
" <th>site_name</th>\n",
" <th>posa_continent</th>\n",
" <th>user_location_country</th>\n",
" <th>user_location_region</th>\n",
" <th>user_location_city</th>\n",
" <th>orig_destination_distance</th>\n",
" <th>user_id</th>\n",
" <th>is_mobile</th>\n",
" <th>is_package</th>\n",
" <th>channel</th>\n",
" <th>srch_ci</th>\n",
" <th>srch_co</th>\n",
" <th>srch_adults_cnt</th>\n",
" <th>srch_children_cnt</th>\n",
" <th>srch_rm_cnt</th>\n",
" <th>srch_destination_id</th>\n",
" <th>srch_destination_type_id</th>\n",
" <th>hotel_continent</th>\n",
" <th>hotel_country</th>\n",
" <th>hotel_market</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>286</td>\n",
" <td>2015-09-13 08:37:02</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>215</td>\n",
" <td>780</td>\n",
" <td>38000</td>\n",
" <td>NaN</td>\n",
" <td>1038</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2015-11-05</td>\n",
" <td>2015-11-06</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65671</td>\n",
" <td>6</td>\n",
" <td>3</td>\n",
" <td>106</td>\n",
" <td>781</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>357</td>\n",
" <td>2015-08-30 13:54:04</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>51</td>\n",
" <td>9527</td>\n",
" <td>NaN</td>\n",
" <td>1243</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>2015-12-19</td>\n",
" <td>2015-12-21</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>13679</td>\n",
" <td>4</td>\n",
" <td>3</td>\n",
" <td>126</td>\n",
" <td>232</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>445</td>\n",
" <td>2015-06-01 01:12:57</td>\n",
" <td>28</td>\n",
" <td>1</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>18617</td>\n",
" <td>NaN</td>\n",
" <td>1462</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2015-05-31</td>\n",
" <td>2015-06-01</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>44373</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1654</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>458</td>\n",
" <td>2015-06-21 01:22:42</td>\n",
" <td>28</td>\n",
" <td>1</td>\n",
" <td>66</td>\n",
" <td>442</td>\n",
" <td>467</td>\n",
" <td>NaN</td>\n",
" <td>1462</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2015-06-20</td>\n",
" <td>2015-06-21</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65106</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" <td>50</td>\n",
" <td>1640</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>627</td>\n",
" <td>2015-09-14 19:23:45</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>167</td>\n",
" <td>51</td>\n",
" <td>11261</td>\n",
" <td>NaN</td>\n",
" <td>1939</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2015-10-27</td>\n",
" <td>2015-10-28</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>51983</td>\n",
" <td>7</td>\n",
" <td>3</td>\n",
" <td>106</td>\n",
" <td>160</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>748</td>\n",
" <td>2015-12-01 11:58:37</td>\n",
" <td>11</td>\n",
" <td>3</td>\n",
" <td>205</td>\n",
" <td>354</td>\n",
" <td>13320</td>\n",
" <td>7883610</td>\n",
" <td>2471</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>2015-12-01</td>\n",
" <td>2015-12-02</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>41557</td>\n",
" <td>6</td>\n",
" <td>2</td>\n",
" <td>198</td>\n",
" <td>395</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>913</td>\n",
" <td>2015-11-02 15:59:39</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>50</td>\n",
" <td>5224</td>\n",
" <td>NaN</td>\n",
" <td>3011</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>2016-01-14</td>\n",
" <td>2016-01-15</td>\n",
" <td>5</td>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>65450</td>\n",
" <td>5</td>\n",
" <td>0</td>\n",
" <td>63</td>\n",
" <td>1687</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>1055</td>\n",
" <td>2015-07-24 12:19:40</td>\n",
" <td>8</td>\n",
" <td>4</td>\n",
" <td>77</td>\n",
" <td>824</td>\n",
" <td>22219</td>\n",
" <td>3914870</td>\n",
" <td>3373</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>2</td>\n",
" <td>2015-12-13</td>\n",
" <td>2015-12-14</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>34219</td>\n",
" <td>4</td>\n",
" <td>0</td>\n",
" <td>63</td>\n",
" <td>1473</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>1110</td>\n",
" <td>2015-04-18 11:41:56</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>174</td>\n",
" <td>28950</td>\n",
" <td>596498030</td>\n",
" <td>3606</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>9</td>\n",
" <td>2015-06-02</td>\n",
" <td>2015-06-03</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>18380</td>\n",
" <td>7</td>\n",
" <td>6</td>\n",
" <td>105</td>\n",
" <td>892</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>1222</td>\n",
" <td>2015-06-10 16:42:24</td>\n",
" <td>28</td>\n",
" <td>1</td>\n",
" <td>68</td>\n",
" <td>480</td>\n",
" <td>2861</td>\n",
" <td>NaN</td>\n",
" <td>3962</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2015-10-14</td>\n",
" <td>2015-10-15</td>\n",
" <td>4</td>\n",
" <td>0</td>\n",
" <td>2</td>\n",
" <td>58246</td>\n",
" <td>4</td>\n",
" <td>5</td>\n",
" <td>39</td>\n",
" <td>803</td>\n",
" </tr>\n",
" <tr>\n",
" <th>10</th>\n",
" <td>1253</td>\n",
" <td>2015-03-22 13:06:19</td>\n",
" <td>17</td>\n",
" <td>1</td>\n",
" <td>133</td>\n",
" <td>24</td>\n",
" <td>46388</td>\n",
" <td>NaN</td>\n",
" <td>4051</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>4</td>\n",
" <td>2015-04-01</td>\n",
" <td>2015-04-02</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>59282</td>\n",
" <td>4</td>\n",
" <td>6</td>\n",
" <td>204</td>\n",
" <td>27</td>\n",
" </tr>\n",
" <tr>\n",
" <th>11</th>\n",
" <td>1265</td>\n",
" <td>2015-08-29 00:46:45</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>50</td>\n",
" <td>5703</td>\n",
" <td>NaN</td>\n",
" <td>4119</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>3</td>\n",
" <td>2015-09-11</td>\n",
" <td>2015-09-12</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65114</td>\n",
" <td>6</td>\n",
" <td>3</td>\n",
" <td>0</td>\n",
" <td>798</td>\n",
" </tr>\n",
" <tr>\n",
" <th>12</th>\n",
" <td>1277</td>\n",
" <td>2015-10-07 18:21:59</td>\n",
" <td>13</td>\n",
" <td>1</td>\n",
" <td>63</td>\n",
" <td>480</td>\n",
" <td>38276</td>\n",
" <td>12640490</td>\n",
" <td>4132</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>2015-10-10</td>\n",
" <td>2015-10-11</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>2</td>\n",
" <td>21148</td>\n",
" <td>6</td>\n",
" <td>6</td>\n",
" <td>204</td>\n",
" <td>1756</td>\n",
" </tr>\n",
" <tr>\n",
" <th>13</th>\n",
" <td>1336</td>\n",
" <td>2015-11-18 17:17:50</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>50</td>\n",
" <td>5703</td>\n",
" <td>NaN</td>\n",
" <td>4339</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>4</td>\n",
" <td>2016-02-09</td>\n",
" <td>2016-02-12</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>57265</td>\n",
" <td>7</td>\n",
" <td>6</td>\n",
" <td>70</td>\n",
" <td>135</td>\n",
" </tr>\n",
" <tr>\n",
" <th>14</th>\n",
" <td>1471</td>\n",
" <td>2015-11-05 19:33:19</td>\n",
" <td>24</td>\n",
" <td>2</td>\n",
" <td>66</td>\n",
" <td>331</td>\n",
" <td>38729</td>\n",
" <td>668695860</td>\n",
" <td>4745</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>2015-11-23</td>\n",
" <td>2015-11-25</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65683</td>\n",
" <td>6</td>\n",
" <td>3</td>\n",
" <td>106</td>\n",
" <td>783</td>\n",
" </tr>\n",
" <tr>\n",
" <th>15</th>\n",
" <td>1561</td>\n",
" <td>2015-06-26 14:40:36</td>\n",
" <td>8</td>\n",
" <td>4</td>\n",
" <td>77</td>\n",
" <td>824</td>\n",
" <td>15015</td>\n",
" <td>51145430</td>\n",
" <td>4995</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>2015-08-28</td>\n",
" <td>2015-09-01</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65083</td>\n",
" <td>6</td>\n",
" <td>0</td>\n",
" <td>63</td>\n",
" <td>1256</td>\n",
" </tr>\n",
" <tr>\n",
" <th>16</th>\n",
" <td>1980</td>\n",
" <td>2015-06-12 12:27:09</td>\n",
" <td>30</td>\n",
" <td>4</td>\n",
" <td>195</td>\n",
" <td>991</td>\n",
" <td>22648</td>\n",
" <td>NaN</td>\n",
" <td>6297</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>2015-08-13</td>\n",
" <td>2015-08-21</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>62958</td>\n",
" <td>4</td>\n",
" <td>0</td>\n",
" <td>63</td>\n",
" <td>966</td>\n",
" </tr>\n",
" <tr>\n",
" <th>17</th>\n",
" <td>2002</td>\n",
" <td>2015-12-21 15:26:20</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>174</td>\n",
" <td>26232</td>\n",
" <td>775607240</td>\n",
" <td>6366</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>2016-03-05</td>\n",
" <td>2016-03-07</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>57273</td>\n",
" <td>1</td>\n",
" <td>3</td>\n",
" <td>182</td>\n",
" <td>800</td>\n",
" </tr>\n",
" <tr>\n",
" <th>18</th>\n",
" <td>2455</td>\n",
" <td>2015-10-02 16:19:00</td>\n",
" <td>2</td>\n",
" <td>3</td>\n",
" <td>66</td>\n",
" <td>335</td>\n",
" <td>54248</td>\n",
" <td>NaN</td>\n",
" <td>8193</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>10</td>\n",
" <td>2015-12-07</td>\n",
" <td>2015-12-15</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>65531</td>\n",
" <td>1</td>\n",
" <td>4</td>\n",
" <td>53</td>\n",
" <td>2102</td>\n",
" </tr>\n",
" <tr>\n",
" <th>19</th>\n",
" <td>2871</td>\n",
" <td>2015-06-14 10:46:47</td>\n",
" <td>13</td>\n",
" <td>1</td>\n",
" <td>46</td>\n",
" <td>157</td>\n",
" <td>34868</td>\n",
" <td>82354160</td>\n",
" <td>9503</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2015-09-04</td>\n",
" <td>2015-09-06</td>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>44399</td>\n",
" <td>3</td>\n",
" <td>6</td>\n",
" <td>204</td>\n",
" <td>1773</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id date_time site_name posa_continent \\\n",
"0 286 2015-09-13 08:37:02 2 3 \n",
"1 357 2015-08-30 13:54:04 24 2 \n",
"2 445 2015-06-01 01:12:57 28 1 \n",
"3 458 2015-06-21 01:22:42 28 1 \n",
"4 627 2015-09-14 19:23:45 24 2 \n",
"5 748 2015-12-01 11:58:37 11 3 \n",
"6 913 2015-11-02 15:59:39 24 2 \n",
"7 1055 2015-07-24 12:19:40 8 4 \n",
"8 1110 2015-04-18 11:41:56 2 3 \n",
"9 1222 2015-06-10 16:42:24 28 1 \n",
"10 1253 2015-03-22 13:06:19 17 1 \n",
"11 1265 2015-08-29 00:46:45 24 2 \n",
"12 1277 2015-10-07 18:21:59 13 1 \n",
"13 1336 2015-11-18 17:17:50 24 2 \n",
"14 1471 2015-11-05 19:33:19 24 2 \n",
"15 1561 2015-06-26 14:40:36 8 4 \n",
"16 1980 2015-06-12 12:27:09 30 4 \n",
"17 2002 2015-12-21 15:26:20 2 3 \n",
"18 2455 2015-10-02 16:19:00 2 3 \n",
"19 2871 2015-06-14 10:46:47 13 1 \n",
"\n",
" user_location_country user_location_region user_location_city \\\n",
"0 215 780 38000 \n",
"1 3 51 9527 \n",
"2 66 442 18617 \n",
"3 66 442 467 \n",
"4 167 51 11261 \n",
"5 205 354 13320 \n",
"6 3 50 5224 \n",
"7 77 824 22219 \n",
"8 66 174 28950 \n",
"9 68 480 2861 \n",
"10 133 24 46388 \n",
"11 3 50 5703 \n",
"12 63 480 38276 \n",
"13 3 50 5703 \n",
"14 66 331 38729 \n",
"15 77 824 15015 \n",
"16 195 991 22648 \n",
"17 66 174 26232 \n",
"18 66 335 54248 \n",
"19 46 157 34868 \n",
"\n",
" orig_destination_distance user_id is_mobile is_package channel \\\n",
"0 NaN 1038 0 0 10 \n",
"1 NaN 1243 0 0 5 \n",
"2 NaN 1462 0 0 10 \n",
"3 NaN 1462 0 0 1 \n",
"4 NaN 1939 0 0 10 \n",
"5 7883610 2471 0 0 0 \n",
"6 NaN 3011 0 0 5 \n",
"7 3914870 3373 0 0 2 \n",
"8 596498030 3606 0 0 9 \n",
"9 NaN 3962 0 0 10 \n",
"10 NaN 4051 0 0 4 \n",
"11 NaN 4119 0 0 3 \n",
"12 12640490 4132 0 0 0 \n",
"13 NaN 4339 0 0 4 \n",
"14 668695860 4745 0 0 1 \n",
"15 51145430 4995 0 0 5 \n",
"16 NaN 6297 0 0 5 \n",
"17 775607240 6366 0 0 0 \n",
"18 NaN 8193 1 1 10 \n",
"19 82354160 9503 0 0 10 \n",
"\n",
" srch_ci srch_co srch_adults_cnt srch_children_cnt srch_rm_cnt \\\n",
"0 2015-11-05 2015-11-06 2 0 1 \n",
"1 2015-12-19 2015-12-21 1 0 1 \n",
"2 2015-05-31 2015-06-01 2 0 1 \n",
"3 2015-06-20 2015-06-21 2 0 1 \n",
"4 2015-10-27 2015-10-28 2 0 1 \n",
"5 2015-12-01 2015-12-02 2 0 1 \n",
"6 2016-01-14 2016-01-15 5 0 5 \n",
"7 2015-12-13 2015-12-14 2 0 1 \n",
"8 2015-06-02 2015-06-03 2 0 1 \n",
"9 2015-10-14 2015-10-15 4 0 2 \n",
"10 2015-04-01 2015-04-02 1 0 1 \n",
"11 2015-09-11 2015-09-12 1 0 1 \n",
"12 2015-10-10 2015-10-11 2 0 2 \n",
"13 2016-02-09 2016-02-12 1 0 1 \n",
"14 2015-11-23 2015-11-25 1 0 1 \n",
"15 2015-08-28 2015-09-01 1 0 1 \n",
"16 2015-08-13 2015-08-21 2 0 1 \n",
"17 2016-03-05 2016-03-07 2 0 1 \n",
"18 2015-12-07 2015-12-15 2 0 1 \n",
"19 2015-09-04 2015-09-06 2 0 1 \n",
"\n",
" srch_destination_id srch_destination_type_id hotel_continent \\\n",
"0 65671 6 3 \n",
"1 13679 4 3 \n",
"2 44373 6 2 \n",
"3 65106 6 2 \n",
"4 51983 7 3 \n",
"5 41557 6 2 \n",
"6 65450 5 0 \n",
"7 34219 4 0 \n",
"8 18380 7 6 \n",
"9 58246 4 5 \n",
"10 59282 4 6 \n",
"11 65114 6 3 \n",
"12 21148 6 6 \n",
"13 57265 7 6 \n",
"14 65683 6 3 \n",
"15 65083 6 0 \n",
"16 62958 4 0 \n",
"17 57273 1 3 \n",
"18 65531 1 4 \n",
"19 44399 3 6 \n",
"\n",
" hotel_country hotel_market \n",
"0 106 781 \n",
"1 126 232 \n",
"2 50 1654 \n",
"3 50 1640 \n",
"4 106 160 \n",
"5 198 395 \n",
"6 63 1687 \n",
"7 63 1473 \n",
"8 105 892 \n",
"9 39 803 \n",
"10 204 27 \n",
"11 0 798 \n",
"12 204 1756 \n",
"13 70 135 \n",
"14 106 783 \n",
"15 63 1256 \n",
"16 63 966 \n",
"17 182 800 \n",
"18 53 2102 \n",
"19 204 1773 "
]
},
"execution_count": 336,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pd.options.display.max_columns = 500\n",
"test.join(not_matched_ids, on = 'id', how='left_semi').limit(20).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### generating reminder"
]
},
{
"cell_type": "code",
"execution_count": 343,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>rn</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>527449</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1662907</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1804545</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1662903</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1742843</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>544801</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>2236073</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>668247</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>2018289</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>1819951</td>\n",
" <td>91</td>\n",
" <td>999</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id hotel_cluster rn\n",
"0 527449 91 999\n",
"1 1662907 91 999\n",
"2 1804545 91 999\n",
"3 1662903 91 999\n",
"4 1742843 91 999\n",
"5 544801 91 999\n",
"6 2236073 91 999\n",
"7 668247 91 999\n",
"8 2018289 91 999\n",
"9 1819951 91 999"
]
},
"execution_count": 343,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_remainder = not_matched_ids.join(agg_popular_hotel_cluster.limit(5)).selectExpr('id', 'hotel_cluster', '999 as rn')\n",
"test_remainder.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Now merging all the records together"
]
},
{
"cell_type": "code",
"execution_count": 344,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: int, hotel_cluster: int, rn: int, rn_all: int]"
]
},
"execution_count": 344,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"w4 = Window.partitionBy('id').orderBy('rn')\n",
"test_union = (test_join_1\n",
" .unionAll(test_join_2)\n",
" .unionAll(test_join_3)\n",
" .unionAll(test_remainder)\n",
" .select('*', rowNumber().over(w4).alias('rn_all'))\n",
" .filter('rn_all <= 5')\n",
" .orderBy('id', 'rn_all'))\n",
"test_union.cache()"
]
},
{
"cell_type": "code",
"execution_count": 345,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4 ms, sys: 4 ms, total: 8 ms\n",
"Wall time: 23.4 s\n"
]
},
{
"data": {
"text/plain": [
"12566494"
]
},
"execution_count": 345,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"test_union.count()"
]
},
{
"cell_type": "code",
"execution_count": 346,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>hotel_cluster</th>\n",
" <th>rn</th>\n",
" <th>rn_all</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>5</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>0</td>\n",
" <td>37</td>\n",
" <td>20</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>0</td>\n",
" <td>55</td>\n",
" <td>30</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>0</td>\n",
" <td>22</td>\n",
" <td>40</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>0</td>\n",
" <td>11</td>\n",
" <td>50</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>1</td>\n",
" <td>5</td>\n",
" <td>10</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>1</td>\n",
" <td>5</td>\n",
" <td>100</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>2</td>\n",
" <td>91</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>2</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>2</td>\n",
" <td>31</td>\n",
" <td>20</td>\n",
" <td>3</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id hotel_cluster rn rn_all\n",
"0 0 5 10 1\n",
"1 0 37 20 2\n",
"2 0 55 30 3\n",
"3 0 22 40 4\n",
"4 0 11 50 5\n",
"5 1 5 10 1\n",
"6 1 5 100 2\n",
"7 2 91 1 1\n",
"8 2 0 10 2\n",
"9 2 31 20 3"
]
},
"execution_count": 346,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_union.limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Testing how many ids have less than 5 hotel_clusters"
]
},
{
"cell_type": "code",
"execution_count": 348,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"35465"
]
},
"execution_count": 348,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_union.groupBy('id').count().filter('count < 5').count()"
]
},
{
"cell_type": "code",
"execution_count": 347,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>4631</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>29431</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>51431</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>54231</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>105231</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>110631</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>148231</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>169831</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>196031</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>203431</td>\n",
" <td>4</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id count\n",
"0 4631 4\n",
"1 29431 3\n",
"2 51431 4\n",
"3 54231 4\n",
"4 105231 2\n",
"5 110631 2\n",
"6 148231 3\n",
"7 169831 1\n",
"8 196031 4\n",
"9 203431 4"
]
},
"execution_count": 347,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_union.groupBy('id').count().filter('count < 5').limit(10).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generating submission\n",
"\n",
"### To concatenate clusters together I could not come up with the better idea than to convert the result to RDD, append most popular clusters and convert back to DataFrame\n",
"\n",
"This operation is also slow. Need to think how to do it differently"
]
},
{
"cell_type": "code",
"execution_count": 349,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[91, 48, 41, 64, 59]"
]
},
"execution_count": 349,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"top5_hotels = (agg_popular_hotel_cluster.limit(5).rdd.keys().collect())\n",
"top5_hotels"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### So-called \"broadcast\" variable - value that is transferred to all the nodes"
]
},
{
"cell_type": "code",
"execution_count": 350,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<pyspark.broadcast.Broadcast at 0x7fdb88d6dbe0>"
]
},
"execution_count": 350,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"top5_bc = sc.broadcast(top5_hotels)\n",
"top5_bc"
]
},
{
"cell_type": "code",
"execution_count": 351,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import Row"
]
},
{
"cell_type": "code",
"execution_count": 352,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 36.1 s, sys: 244 ms, total: 36.3 s\n",
"Wall time: 1min 40s\n"
]
}
],
"source": [
"%%time\n",
"submission = (test_union\n",
" # .filter('id in (0,1,2)')\n",
" .orderBy('id', 'rn_all')\n",
" .rdd\n",
" .map(lambda x: (x.id, [x.hotel_cluster,]))\n",
" .reduceByKey(add)\n",
" .mapValues(lambda x: (x + top5_bc.value)[:5])\n",
" .mapValues(lambda x: \" \".join([str(i) for i in x]))\n",
" .map(lambda x: Row(id = x[0], hotel_cluster = x[1]))\n",
" .toDF()\n",
"# .orderBy('id')\n",
" ).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 353,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"submission.set_index('id', inplace=True)\n",
"submission.sort_index(inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 354,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(2528243, 1)"
]
},
"execution_count": 354,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"submission.shape"
]
},
{
"cell_type": "code",
"execution_count": 355,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hotel_cluster</th>\n",
" </tr>\n",
" <tr>\n",
" <th>id</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>5 37 55 22 11</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>5 5 91 48 41</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>91 0 31 96 91</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1 1 45 79 24</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>50 51 91 2 59</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hotel_cluster\n",
"id \n",
"0 5 37 55 22 11\n",
"1 5 5 91 48 41\n",
"2 91 0 31 96 91\n",
"3 1 1 45 79 24\n",
"4 50 51 91 2 59"
]
},
"execution_count": 355,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"submission.head()"
]
},
{
"cell_type": "code",
"execution_count": 356,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"submission.to_csv('../submissions/leakage_spark.csv')"
]
},
{
"cell_type": "code",
"execution_count": 358,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"!rm ../submissions/leakage_spark.csv.gz\n",
"!gzip ../submissions/leakage_spark.csv"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### The submission scores 0.49604 on Public LB, just on par with the source script"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## All-in-one script\n",
"#### You can play with formula 1 and formula2 to achieve better results"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import sys\n",
"import os\n",
"import pandas as pd\n",
"import numpy as np\n",
"sys.path.append('/opt/spark/python')\n",
"os.environ['SPARK_HOME']='/opt/spark'\n",
"os.environ['HADOOP_CONF_DIR']='/etc/hadoop/conf'\n",
"os.environ['HIVE_CONF_DIR']='/etc/hive/conf'\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[8] --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell'\n",
"\n",
"from pyspark import SparkContext, SparkConf, HiveContext, SQLContext\n",
"import pyspark.sql.types as typs\n",
"from pyspark.sql import Window, Row\n",
"from pyspark.sql.functions import desc, rowNumber, denseRank\n",
"from operator import add"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"conf = SparkConf()\n",
"conf.setAppName('Expedia')\n",
"conf.set(\"spark.driver.memory\", \"12g\")\n",
"conf.set(\"spark.executor.memory\", \"8g\")\n",
"\n",
"sc = SparkContext(conf=conf)\n",
"sqlContext = HiveContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def run_solution():\n",
" train = sqlContext.read.load('/projects/kaggle-expedia/input/train.parquet')\n",
" train.cache()\n",
" \n",
" ### changed according to best performing script\n",
" formula1 = 3+12*train['is_booking']\n",
" formula2 = 3+5*train['is_booking']\n",
" \n",
" \n",
" w1 = Window.partitionBy(\"user_location_city\", \"orig_destination_distance\").orderBy(desc('cnt'))\n",
" ## aggregate 1\n",
" agg_ulc_odd_hc = (train\n",
" # omitting empty orig_destination_distance\n",
" .filter(train['orig_destination_distance'].isNotNull())\n",
" # changing the datatype to integer as I *think* join on integer should be faster\n",
" # though this is something to be tested\n",
" .withColumn('orig_destination_distance',\n",
" ( train['orig_destination_distance']*1e5).cast(typs.IntegerType()))\n",
" .select(['user_location_city', 'orig_destination_distance', 'hotel_cluster', 'is_booking'])\n",
" .groupBy('user_location_city', 'orig_destination_distance', 'hotel_cluster')\n",
" .count()\n",
" #rename count to cnt\n",
" .withColumnRenamed('count', 'cnt')\n",
" #windowed aggregate - assigns numbers in order of descending count\n",
" .select('*', rowNumber().over(w1).alias('rn'))\n",
" .filter('rn <= 5')\n",
" )\n",
" agg_ulc_odd_hc.cache()\n",
" # aggregate 2\n",
" w2 = Window.partitionBy('srch_destination_id', 'hotel_country', 'hotel_market').orderBy(desc('sum_wb'))\n",
" agg_best_search_dest_ctry = (train\n",
" .filter('year(date_time) = 2014')\n",
" .select('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster', (formula1).alias('wb'))\n",
" .groupby('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster')\n",
" .sum('wb')\n",
" .withColumnRenamed('sum(wb)', 'sum_wb')\n",
" .orderBy(desc('sum_wb'))\n",
" .select('*', rowNumber().over(w2).alias(\"rn\"))\n",
" .filter('rn <= 5')\n",
" )\n",
" agg_best_search_dest_ctry.cache()\n",
" \n",
" # aggregate 3\n",
" w3 = Window.partitionBy('srch_destination_id').orderBy(desc('sum_wb'))\n",
" agg_best_search_dest_2 = (train\n",
" .select('srch_destination_id', 'hotel_cluster', (formula1).alias('wb'))\n",
" .groupby('srch_destination_id', 'hotel_cluster')\n",
" .sum('wb')\n",
" .withColumnRenamed('sum(wb)', 'sum_wb')\n",
" .orderBy(desc('sum_wb'))\n",
" .select('*', rowNumber().over(w3).alias(\"rn\"))\n",
" .filter('rn <= 5'))\n",
" agg_best_search_dest_2.cache()\n",
" \n",
" #most popular hotels\n",
" agg_popular_hotel_cluster = (train\n",
" .select('hotel_cluster', (formula2).alias('wb'))\n",
" .groupby('hotel_cluster')\n",
" .sum('wb')\n",
" .orderBy(desc('sum(wb)'))\n",
" .withColumnRenamed('sum(wb)', 'sum_wb'))\n",
" agg_popular_hotel_cluster.cache()\n",
" \n",
" # broadcasting top5 hotels\n",
" top5_hotels = (agg_popular_hotel_cluster.limit(5).rdd.keys().collect())\n",
" top5_bc = sc.broadcast(top5_hotels)\n",
" \n",
" #flushing train from cache\n",
" train.unpersist()\n",
" \n",
" test = sqlContext.read.load('/projects/kaggle-expedia/input/test.parquet')\n",
" test = (test.withColumn('orig_destination_distance', (test['orig_destination_distance']*1e5).cast(typs.IntegerType())))\n",
" test.cache()\n",
" \n",
" test_join_1 = (test.join(agg_ulc_odd_hc, ['user_location_city', 'orig_destination_distance'], 'inner')\n",
" .select('id', 'hotel_cluster', 'rn')\n",
" .orderBy('id', 'rn')\n",
" )\n",
" test_join_2 = (test.join(agg_best_search_dest_ctry, ['srch_destination_id', 'hotel_country', 'hotel_market'], 'inner')\n",
" .select('id', 'hotel_cluster', (10*agg_best_search_dest_ctry['rn']).alias('rn'))\n",
" .orderBy('id', 'rn')\n",
" )\n",
" test_join_3 = (test.join(agg_best_search_dest_2, ['srch_destination_id'])\n",
" .select('id', 'hotel_cluster', (100*agg_best_search_dest_2['rn']).alias('rn'))\n",
" .orderBy('id', 'rn')\n",
" )\n",
" not_matched_ids = (test.select('id')\n",
" .subtract(test_join_1.select('id').distinct())\n",
" .subtract(test_join_2.select('id').distinct())\n",
" .subtract(test_join_3.select('id').distinct())\n",
" )\n",
" test_remainder = not_matched_ids.join(agg_popular_hotel_cluster.limit(5)).selectExpr('id', 'hotel_cluster', '999 as rn')\n",
" # merging results together\n",
" w4 = Window.partitionBy('id').orderBy('rn')\n",
" test_union = (test_join_1\n",
" .unionAll(test_join_2)\n",
" .unionAll(test_join_3)\n",
" .unionAll(test_remainder)\n",
" .select('*', rowNumber().over(w4).alias('rn_all'))\n",
" .filter('rn_all <= 5')\n",
" .orderBy('id', 'rn_all'))\n",
" \n",
"\n",
" submission = (test_union\n",
" # .filter('id in (0,1,2)')\n",
" .orderBy('id', 'rn_all')\n",
" .rdd\n",
" .map(lambda x: (x.id, [x.hotel_cluster,]))\n",
" .reduceByKey(add)\n",
" .mapValues(lambda x: (x + top5_bc.value)[:5])\n",
" .mapValues(lambda x: \" \".join([str(i) for i in x]))\n",
" .map(lambda x: Row(id = x[0], hotel_cluster = x[1]))\n",
" .toDF()\n",
" # .orderBy('id')\n",
" ).toPandas()\n",
" \n",
" # generating submission file\n",
" submission.set_index('id', inplace=True)\n",
" submission.sort_index(inplace=True)\n",
" \n",
" return submission"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### it takes a whole 7min of time to run the solition alltogether. I am sure this performance is not optimal. Something to work on\n",
"#### as mentioned earlier, you can monitor jobs on [spark_machine:4040]() by default"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 31.9 s, sys: 840 ms, total: 32.7 s\n",
"Wall time: 7min 48s\n"
]
}
],
"source": [
"%%time\n",
"sub = run_solution()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hotel_cluster</th>\n",
" </tr>\n",
" <tr>\n",
" <th>id</th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>5 37 55 22 11</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>5 5 91 48 41</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>91 0 31 96 91</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1 1 45 79 24</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>50 51 91 2 59</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hotel_cluster\n",
"id \n",
"0 5 37 55 22 11\n",
"1 5 5 91 48 41\n",
"2 91 0 31 96 91\n",
"3 1 1 45 79 24\n",
"4 50 51 91 2 59"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sub.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"submission.to_csv('../submissions/leakage_spark.csv')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"!rm ../submissions/leakage_spark.csv.gz\n",
"!gzip ../submissions/leakage_spark.csv"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment