Skip to content

Instantly share code, notes, and snippets.

@robrich
Created June 4, 2020 00:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save robrich/2d5a49e75f77074ccf6e001eb5e0cb77 to your computer and use it in GitHub Desktop.
Save robrich/2d5a49e75f77074ccf6e001eb5e0cb77 to your computer and use it in GitHub Desktop.
MemSQL Spark Connector and MySQL JDBC
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Spark ETL with JDBC"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Load the tpch database into MemSQL: https://docs.memsql.com/v7.0/guides/data-ingest/pipelines/step-1/\n",
"\n",
"```sql\n",
"CREATE DATABASE tpch;\n",
"USE tpch;\n",
"\n",
"CREATE TABLE `lineitem` (\n",
" `l_orderkey` bigint(11) NOT NULL,\n",
" `l_partkey` int(11) NOT NULL,\n",
" `l_suppkey` int(11) NOT NULL,\n",
" `l_linenumber` int(11) NOT NULL,\n",
" `l_quantity` decimal(15,2) NOT NULL,\n",
" `l_extendedprice` decimal(15,2) NOT NULL,\n",
" `l_discount` decimal(15,2) NOT NULL,\n",
" `l_tax` decimal(15,2) NOT NULL,\n",
" `l_returnflag` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,\n",
" `l_linestatus` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,\n",
" `l_shipdate` date NOT NULL,\n",
" `l_commitdate` date NOT NULL,\n",
" `l_receiptdate` date NOT NULL,\n",
" `l_shipinstruct` char(25) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,\n",
" `l_shipmode` char(10) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,\n",
" `l_comment` varchar(44) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,\n",
" SHARD KEY (`l_orderkey`) USING CLUSTERED COLUMNSTORE\n",
");\n",
"\n",
"CREATE OR REPLACE PIPELINE tpch_100_lineitem\n",
" AS LOAD DATA S3 'memsql-tpch-dataset/sf_100/lineitem/'\n",
" config '{\"region\":\"us-east-1\"}'\n",
" SKIP DUPLICATE KEY ERRORS\n",
" INTO TABLE lineitem\n",
" FIELDS TERMINATED BY '|'\n",
" LINES TERMINATED BY '|\\n';\n",
"\n",
"START ALL PIPELINES;\n",
"STOP ALL PIPELINES;\n",
"\n",
"select count(*) from lineitem;\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get spark context setup and connected to MemSQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install findspark pyspark pandas"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"# https://search.maven.org/artifact/org.mariadb.jdbc/mariadb-java-client\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages \"org.mariadb.jdbc:mariadb-java-client:2.6.0\" pyspark-shell'\n",
"\n",
"import findspark\n",
"findspark.init()\n",
"\n",
"import pyspark\n",
"from pyspark import SparkConf, SparkContext\n",
"from pyspark.sql import SparkSession\n",
"\n",
"sc = SparkContext(appName=\"SparkApp\")\n",
"spark = SparkSession(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Setup reading data from the table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import DataFrameReader, SQLContext\n",
"\n",
"#set variable to be used to connect the database\n",
"machine = \"localhost\"\n",
"database = \"tpch\"\n",
"table = \"lineitem\"\n",
"\n",
"url = f\"jdbc:mysql://{machine}:3306/{database}\"\n",
"properties = {'user': 'root', 'password': '', 'driver': 'org.mariadb.jdbc.Driver'}\n",
"sqlContext = SQLContext(sc)\n",
"df = DataFrameReader(sqlContext).jdbc(url=url, table=table, properties=properties)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Validate input dataframe"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.display import display, HTML\n",
"\n",
"def printDf(sprkDF,records): \n",
" return HTML(sprkDF.limit(records).toPandas().to_html())\n",
"\n",
"# show first 3 records\n",
"printDf(df, 3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Transform data on the way through"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.functions import UserDefinedFunction\n",
"from pyspark.sql import types\n",
"\n",
"perUnitUdf = UserDefinedFunction( \\\n",
" lambda price, quantity: price / quantity, \\\n",
" types.DoubleType() \\\n",
")\n",
"\n",
"df2 = df.withColumn('per_unit', perUnitUdf(df.l_extendedprice, df.l_quantity))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Write the data to a new table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"%%time\n",
"table2 = \"lineitem2\"\n",
"# modes: 'overwrite', 'append', 'ignore', 'error', 'errorifexists'\n",
"df2.write \\\n",
" .mode(\"overwrite\") \\\n",
" .jdbc(url=url, table=table2, properties=properties)\n",
"# creates table if not exit"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now check out the new data\n",
"```sql\n",
"select * from lineitem2 limit 10;\n",
"\n",
"select ( select count(*) from lineitem ) as orig, ( select count(*) from lineitem2 ) as copy;\n",
"```"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment