-
-
Save arifmarias/e5afc8dd078b1b5069e864e0aad24681 to your computer and use it in GitHub Desktop.
PySpark-CDC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"nbformat": 4, | |
"nbformat_minor": 0, | |
"metadata": { | |
"colab": { | |
"name": "PySpark-CDC", | |
"provenance": [], | |
"collapsed_sections": [], | |
"authorship_tag": "ABX9TyOSr+3CfjahPy2ARBrvh25V", | |
"include_colab_link": true | |
}, | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3" | |
} | |
}, | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "view-in-github", | |
"colab_type": "text" | |
}, | |
"source": [ | |
"<a href=\"https://colab.research.google.com/gist/arifmarias/e5afc8dd078b1b5069e864e0aad24681/pyspark-cdc.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "FaY0Nm6Skjkw", | |
"colab_type": "code", | |
"colab": {} | |
}, | |
"source": [ | |
"import sys\n", | |
"import os\n", | |
"from pyspark.sql import *\n", | |
"from pyspark import SparkConf, SparkContext, SQLContext\n", | |
"from pyspark.sql import HiveContext\n", | |
"from pyspark.sql.types import *" | |
], | |
"execution_count": 0, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "lA0Hm9XdkuV6", | |
"colab_type": "code", | |
"colab": {} | |
}, | |
"source": [ | |
"## write to snappy compressed output\n", | |
"conf = (SparkConf()\n", | |
" .setAppName(\"spark_cdc\")\n", | |
" .set(\"spark.dynamicAllocation.enabled\", \"true\")\n", | |
" .set(\"spark.shuffle.service.enabled\", \"true\")\n", | |
" .set(\"spark.rdd.compress\", \"true\"))\n", | |
"sc = SparkContext(conf = conf)\n", | |
"\n", | |
"sqlContext = HiveContext(sc)" | |
], | |
"execution_count": 0, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "Al3ovZawkzsQ", | |
"colab_type": "code", | |
"colab": {} | |
}, | |
"source": [ | |
"## read parquet file generated by the Sqoop incremental extract from MySQL and imported into HDFS\n", | |
"path = \"hdfs://localhost:8080/landing/staging_customer_update_spark/*parquet*\"\n", | |
"parquetFile = sqlContext.read.parquet(path)\n", | |
"parquetFile.registerTempTable(\"customer_extract\");\n", | |
"\n", | |
"\n", | |
"sql = \"DROP TABLE IF EXISTS customer_update_spark\"\n", | |
"sqlContext.sql(sql)\n", | |
"\n", | |
"sql = \"\"\"\n", | |
"CREATE TABLE customer_update_spark\n", | |
"(\n", | |
" cust_no int\n", | |
" ,birth_date date\n", | |
" ,first_name string\n", | |
" ,last_name string\n", | |
" ,gender string\n", | |
" ,join_date date\n", | |
" ,created_date timestamp\n", | |
" ,modified_date timestamp\n", | |
")\n", | |
"STORED AS PARQUET\n", | |
"\n", | |
"\"\"\"\n", | |
"sqlContext.sql(sql)" | |
], | |
"execution_count": 0, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "f35DHTgIkz-l", | |
"colab_type": "code", | |
"colab": {} | |
}, | |
"source": [ | |
"## get those records that did not change\n", | |
"## these are those records from the existing Dimension table that are not in the Sqoop extract\n", | |
"\n", | |
"sql = \"\"\"\n", | |
"INSERT INTO TABLE customer_update_spark\n", | |
"SELECT\n", | |
"a.cust_no,\n", | |
"a.birth_date,\n", | |
"a.first_name,\n", | |
"a.last_name,\n", | |
"a.gender,\n", | |
"a.join_date,\n", | |
"a.created_date,\n", | |
"a.modified_date\n", | |
"FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no\n", | |
"WHERE b.cust_no IS NULL\n", | |
"\"\"\"\n", | |
"\n", | |
"sqlContext.sql(sql)" | |
], | |
"execution_count": 0, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "Yvrkp5e3k0H3", | |
"colab_type": "code", | |
"colab": {} | |
}, | |
"source": [ | |
"## get the changed records from the Parquet extract generated from Sqoop\n", | |
"## the dates in the Parquet file will be in epoch time with milliseconds\n", | |
"## this will be a 13 digit number\n", | |
"## we don't need milliseconds so only get first 10 digits and not all 13\n", | |
"\n", | |
"## for birth_date and join date convert to date in format YYYY-MM-DD\n", | |
"## for created_date and modified date convert to format YYYY-MM-DD HH:MI:SS\n", | |
"\n", | |
"sql = \"\"\"\n", | |
"INSERT INTO customer_update_spark\n", | |
"SELECT\n", | |
"a.cust_no,\n", | |
"TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT))) AS birth_date,\n", | |
"a.first_name,\n", | |
"a.last_name,\n", | |
"a.gender,\n", | |
"TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.join_date, 1,10) AS INT))) AS join_date,\n", | |
"FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT)) AS created_date,\n", | |
"FROM_UNIXTIME(CAST(SUBSTR(a.modified_date, 1,10) AS INT)) AS modified_date\n", | |
"FROM customer_extract a\n", | |
"\"\"\"\n", | |
"\n", | |
"sqlContext.sql(sql)" | |
], | |
"execution_count": 0, | |
"outputs": [] | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment