Skip to content

Instantly share code, notes, and snippets.

@florianheinemann
Created May 24, 2020 14:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save florianheinemann/86cee8a369362a5545628c52a537fcba to your computer and use it in GitHub Desktop.
Save florianheinemann/86cee8a369362a5545628c52a537fcba to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# CMOT Preprocessing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting everything up"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting Spark application\n"
]
},
{
"data": {
"text/html": [
"<table>\n",
"<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>5</td><td>application_1590257372527_0006</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://ip-172-31-6-97.eu-central-1.compute.internal:20888/proxy/application_1590257372527_0006/\">Link</a></td><td><a target=\"_blank\" href=\"http://ip-172-31-6-40.eu-central-1.compute.internal:8042/node/containerlogs/container_1590257372527_0006_01_000001/livy\">Link</a></td><td>✔</td></tr></table>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"SparkSession available as 'spark'.\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import sys\n",
"from awsglue.transforms import *\n",
"from awsglue.utils import getResolvedOptions\n",
"from pyspark.context import SparkContext\n",
"from pyspark.sql.functions import when, lit, col, udf, substring\n",
"from awsglue.context import GlueContext\n",
"from awsglue.job import Job\n",
"from functools import reduce\n",
"\n",
"glueContext = GlueContext(SparkContext.getOrCreate())"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Count: 5114662\n",
"root\n",
"|-- shipment_no: long\n",
"|-- container_deployment_relative_no: long\n",
"|-- shipment_container_sequence_no: long\n",
"|-- op_ref_port_date: string\n",
"|-- industry label: string\n",
"|-- operative_ssy_code: string\n",
"|-- operative_ssy_direction_code: string\n",
"|-- operative_direction_code: string\n",
"|-- operative_ssy_lead_trade_management_code: string\n",
"|-- operative_voyage_no: long\n",
"|-- segmentation validity adjusted: string\n",
"|-- marketing_routing_party_top_bco_nvo_type: string\n",
"|-- marketing_routing_party_name_matchcode: string\n",
"|-- marketing_routing_party_sales_hier_area_code/name: string\n",
"|-- marketing_routing_party_sales_hier_office_code/name: string\n",
"|-- marketing_routing_party_top_global_account_program: string\n",
"|-- marketing_routing_party_sales_hier_subregion_code/name: string\n",
"|-- container_usage_classification: string\n",
"|-- equipment_size_type_marketing_sales_group_1_code: string\n",
"|-- geo_from_subregion_group_code_name: string\n",
"|-- geo_from_subregion_code_name: string\n",
"|-- geo_from_area_code_name: string\n",
"|-- start_terminal_location_code: string\n",
"|-- end_terminal_location_code: string\n",
"|-- geo_to_subregion_group_code_name: string\n",
"|-- result_related_voyage_order_shipsystem_relation: string\n",
"|-- result_related_voyage_order_shipsystem_code: string\n",
"|-- ropssy booked boxes: double\n",
"|-- ropssy booked cl2: double\n",
"|-- ropssy booked container gross weight: double\n",
"|-- ropssy booked teu: double\n",
"|-- full cl2: double\n",
"|-- full icl: double\n",
"|-- start_terminal_subregion_group_code_name: string\n",
"|-- end_terminal_subregion_group_code_name: string"
]
}
],
"source": [
"salesDataDynF = glueContext.create_dynamic_frame.from_catalog(database=\"bia-cmot-raw-data-v2\", table_name=\"cmot_ropssy_export\")\n",
"print(\"Count: \" + str(salesDataDynF.count()))\n",
"salesDataDynF.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
"|-- trade: string\n",
"|-- ssy: string\n",
"|-- dir: string\n",
"|-- dp_voyage: string\n",
"|-- rate_validity: string\n",
"|-- industry: string\n",
"|-- mr_name_matchcode: string\n",
"|-- mr_sales_hier_subregion: string\n",
"|-- mr_sales_hier_area: string\n",
"|-- mr_sales_hier_office: string\n",
"|-- mr_bco_nvo: string\n",
"|-- mr_global_account_program: string\n",
"|-- container_usage: string\n",
"|-- container_type: string\n",
"|-- geo_from_subregion_group: string\n",
"|-- geo_from_subregion: string\n",
"|-- geo_from_area: string\n",
"|-- pol_subregion_group: string\n",
"|-- pol: string\n",
"|-- pod_subregion_group: string\n",
"|-- rr_relation: string\n",
"|-- pod: string\n",
"|-- geo_to_subregion_group: string\n",
"|-- no_boxes: double\n",
"|-- no_teus: double\n",
"|-- gwt: double\n",
"|-- cl2_usd: double\n",
"|-- icl_usd: double\n",
"\n",
"5114662"
]
}
],
"source": [
"salesDataMapped = ApplyMapping.apply(frame = salesDataDynF, mappings = [(\"operative_ssy_lead_trade_management_code\", \"string\", \"trade\", \"string\"), (\"operative_ssy_code\", \"string\", \"ssy\", \"string\"), (\"operative_direction_code\", \"string\", \"dir\", \"string\"), (\"operative_voyage_no\", \"long\", \"dp_voyage\", \"string\"), (\"segmentation validity adjusted\", \"string\", \"rate_validity\", \"string\"), (\"industry label\", \"string\", \"industry\", \"string\"), (\"marketing_routing_party_name_matchcode\", \"string\", \"mr_name_matchcode\", \"string\"), (\"marketing_routing_party_sales_hier_subregion_code/name\", \"string\", \"mr_sales_hier_subregion\", \"string\"), (\"marketing_routing_party_sales_hier_area_code/name\", \"string\", \"mr_sales_hier_area\", \"string\"), (\"marketing_routing_party_sales_hier_office_code/name\", \"string\", \"mr_sales_hier_office\", \"string\"), (\"marketing_routing_party_top_bco_nvo_type\", \"string\", \"mr_bco_nvo\", \"string\"), (\"marketing_routing_party_top_global_account_program\", \"string\", \"mr_global_account_program\", \"string\"), (\"container_usage_classification\", \"string\", \"container_usage\", \"string\"), (\"equipment_size_type_marketing_sales_group_1_code\", \"string\", \"container_type\", \"string\"), (\"geo_from_subregion_group_code_name\", \"string\", \"geo_from_subregion_group\", \"string\"), (\"geo_from_subregion_code_name\", \"string\", \"geo_from_subregion\", \"string\"), (\"geo_from_area_code_name\", \"string\", \"geo_from_area\", \"string\"), (\"start_terminal_subregion_group_code_name\", \"string\", \"pol_subregion_group\", \"string\"), (\"start_terminal_location_code\", \"string\", \"pol\", \"string\"), (\"end_terminal_subregion_group_code_name\", \"string\", \"pod_subregion_group\", \"string\"), (\"result_related_voyage_order_shipsystem_relation\", \"string\", \"rr_relation\", \"string\"), (\"end_terminal_location_code\", \"string\", \"pod\", \"string\"), (\"geo_to_subregion_group_code_name\", \"string\", \"geo_to_subregion_group\", \"string\"), (\"ropssy booked boxes\", \"double\", \"no_boxes\", \"double\"), (\"ropssy booked teu\", \"double\", \"no_teus\", \"double\"), (\"ropssy booked container gross weight\", \"double\", \"gwt\", \"double\"), (\"full cl2\", \"double\", \"cl2_usd\", \"double\"), (\"full icl\", \"double\", \"icl_usd\", \"double\")], transformation_ctx = \"applymapping1\")\n",
"salesDataMapped.printSchema()\n",
"print(salesDataMapped.count())"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"5114662"
]
}
],
"source": [
"salesDataDF = salesDataMapped.toDF()\n",
"print(salesDataDF.count())"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"def replaceInvalidValuesByNull(val):\n",
" if val == \"\" or val == \"#NA\" or val == \"n/a\" or val == \"n/r / not available\" \\\n",
" or val == \"!!!!! / n/r\" or val == \"!!!!!!!! / n/r\" or val == \"!!! / n/r\" \\\n",
" or val == \"??? / Not available\" or val == \"not available / n/a\" or val == \"??\":\n",
" return None\n",
" else:\n",
" return val"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"5114662\n",
"5104204"
]
}
],
"source": [
"replaceInvalidValuesByNull_udf = udf(replaceInvalidValuesByNull)\n",
"salesDataCleaned = salesDataDF\n",
"for col_name in salesDataCleaned.columns:\n",
" salesDataCleaned = salesDataCleaned.withColumn(col_name, replaceInvalidValuesByNull_udf(col(col_name)))\n",
"salesDataCleaned = salesDataCleaned.withColumn(\"trade\", when(col(\"trade\") == \"na\", None).otherwise(col(\"trade\")))\n",
"print(salesDataCleaned.count())\n",
"salesDataCleaned = salesDataCleaned.filter(col(\"trade\").isNotNull() & col(\"ssy\").isNotNull() & col(\"dir\").isNotNull() & col(\"dp_voyage\").isNotNull())\n",
"salesDataCleaned = salesDataCleaned.withColumn(\"container_type\", when(col(\"container_type\") == \"45HC\", \"40HC\").otherwise(substring(\"container_type\", 0, 2)))\n",
"print(salesDataCleaned.count())"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+\n",
"|container_type|\n",
"+--------------+\n",
"| 40|\n",
"| 40HC|\n",
"| 20|\n",
"+--------------+\n",
"\n",
"root\n",
" |-- container_type: string (nullable = true)"
]
}
],
"source": [
"salesDataCleaned = salesDataCleaned.groupBy(\"container_type\").max()\n",
"salesDataCleaned.show()\n",
"salesDataCleaned.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Sparkmagic (PySpark)",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 3
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment