Skip to content

Instantly share code, notes, and snippets.

@arifmarias
Last active April 15, 2020 20:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arifmarias/e5afc8dd078b1b5069e864e0aad24681 to your computer and use it in GitHub Desktop.
Save arifmarias/e5afc8dd078b1b5069e864e0aad24681 to your computer and use it in GitHub Desktop.
PySpark-CDC
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "PySpark-CDC",
"provenance": [],
"collapsed_sections": [],
"authorship_tag": "ABX9TyOSr+3CfjahPy2ARBrvh25V",
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/arifmarias/e5afc8dd078b1b5069e864e0aad24681/pyspark-cdc.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"metadata": {
"id": "FaY0Nm6Skjkw",
"colab_type": "code",
"colab": {}
},
"source": [
"import sys\n",
"import os\n",
"from pyspark.sql import *\n",
"from pyspark import SparkConf, SparkContext, SQLContext\n",
"from pyspark.sql import HiveContext\n",
"from pyspark.sql.types import *"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "lA0Hm9XdkuV6",
"colab_type": "code",
"colab": {}
},
"source": [
"## write to snappy compressed output\n",
"conf = (SparkConf()\n",
" .setAppName(\"spark_cdc\")\n",
" .set(\"spark.dynamicAllocation.enabled\", \"true\")\n",
" .set(\"spark.shuffle.service.enabled\", \"true\")\n",
" .set(\"spark.rdd.compress\", \"true\"))\n",
"sc = SparkContext(conf = conf)\n",
"\n",
"sqlContext = HiveContext(sc)"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "Al3ovZawkzsQ",
"colab_type": "code",
"colab": {}
},
"source": [
"## read parquet file generated by the Sqoop incremental extract from MySQL and imported into HDFS\n",
"path = \"hdfs://localhost:8080/landing/staging_customer_update_spark/*parquet*\"\n",
"parquetFile = sqlContext.read.parquet(path)\n",
"parquetFile.registerTempTable(\"customer_extract\");\n",
"\n",
"\n",
"sql = \"DROP TABLE IF EXISTS customer_update_spark\"\n",
"sqlContext.sql(sql)\n",
"\n",
"sql = \"\"\"\n",
"CREATE TABLE customer_update_spark\n",
"(\n",
" cust_no int\n",
" ,birth_date date\n",
" ,first_name string\n",
" ,last_name string\n",
" ,gender string\n",
" ,join_date date\n",
" ,created_date timestamp\n",
" ,modified_date timestamp\n",
")\n",
"STORED AS PARQUET\n",
"\n",
"\"\"\"\n",
"sqlContext.sql(sql)"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "f35DHTgIkz-l",
"colab_type": "code",
"colab": {}
},
"source": [
"## get those records that did not change\n",
"## these are those records from the existing Dimension table that are not in the Sqoop extract\n",
"\n",
"sql = \"\"\"\n",
"INSERT INTO TABLE customer_update_spark\n",
"SELECT\n",
"a.cust_no,\n",
"a.birth_date,\n",
"a.first_name,\n",
"a.last_name,\n",
"a.gender,\n",
"a.join_date,\n",
"a.created_date,\n",
"a.modified_date\n",
"FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no\n",
"WHERE b.cust_no IS NULL\n",
"\"\"\"\n",
"\n",
"sqlContext.sql(sql)"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "Yvrkp5e3k0H3",
"colab_type": "code",
"colab": {}
},
"source": [
"## get the changed records from the Parquet extract generated from Sqoop\n",
"## the dates in the Parquet file will be in epoch time with milliseconds\n",
"## this will be a 13 digit number\n",
"## we don't need milliseconds so only get first 10 digits and not all 13\n",
"\n",
"## for birth_date and join date convert to date in format YYYY-MM-DD\n",
"## for created_date and modified date convert to format YYYY-MM-DD HH:MI:SS\n",
"\n",
"sql = \"\"\"\n",
"INSERT INTO customer_update_spark\n",
"SELECT\n",
"a.cust_no,\n",
"TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT))) AS birth_date,\n",
"a.first_name,\n",
"a.last_name,\n",
"a.gender,\n",
"TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.join_date, 1,10) AS INT))) AS join_date,\n",
"FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT)) AS created_date,\n",
"FROM_UNIXTIME(CAST(SUBSTR(a.modified_date, 1,10) AS INT)) AS modified_date\n",
"FROM customer_extract a\n",
"\"\"\"\n",
"\n",
"sqlContext.sql(sql)"
],
"execution_count": 0,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment