Created
September 5, 2020 18:00
-
-
Save zorteran/8ca73ff33c2daee2a5e517c25866bf1f to your computer and use it in GitHub Desktop.
Simple ETL joining data from MySQL and MongoDB and wriiting to Cassandra
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Requirement already satisfied: findspark in /opt/conda/lib/python3.8/site-packages (1.3.0)\r\n" | |
] | |
} | |
], | |
"source": [ | |
"!pip install findspark" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,mysql:mysql-connector-java:5.1.49 pyspark-shell'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import findspark\n", | |
"findspark.init()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import pyspark\n", | |
"from pyspark.sql import SparkSession" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"spark = (SparkSession.builder\n", | |
" .appName('simple_etl')\n", | |
" .config('spark.mongodb.input.uri', 'mongodb://root:example@mongo/school.students?authSource=admin')\n", | |
" .config('spark.mongodb.output.uri', 'mongodb://root:example@mongo/school.students?authSource=admin')\n", | |
" .config(\"spark.cassandra.connection.host\", \"cassandra\")\n", | |
" .config(\"spark.cassandra.auth.username\", \"cassandra\")\n", | |
" .config(\"spark.cassandra.auth.password\", \"cassandra\")\n", | |
" .config(\"spark.sql.extensions\", \"com.datastax.spark.connector.CassandraSparkExtensions\")\n", | |
" .config(\"spark.sql.catalog.casscatalog\",\"com.datastax.spark.connector.datasource.CassandraCatalog\")\n", | |
" .getOrCreate())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Extract" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Read from MySql" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"groups = (spark.read\n", | |
" .format(\"jdbc\")\n", | |
" .option(\"url\", \"jdbc:mysql://mysql/school\")\n", | |
" .option(\"driver\", \"com.mysql.jdbc.Driver\")\n", | |
" .option(\"dbtable\", \"groups\")\n", | |
" .option(\"user\", \"user\")\n", | |
" .option(\"password\", \"password\")\n", | |
" .load())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------+------------+-----------+-------------------+\n", | |
"|group_id|group_number|description|something_important|\n", | |
"+--------+------------+-----------+-------------------+\n", | |
"| 1| 1A| some group| 100|\n", | |
"| 2| 2A| some group| 102|\n", | |
"| 3| 3A| some group| 101|\n", | |
"| 4| 1B| some group| 123|\n", | |
"| 5| 2B| some group| 133|\n", | |
"| 6| 3B| some group| 144|\n", | |
"+--------+------------+-----------+-------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"groups.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Read from Mongo" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"students = spark.read.format(\"mongo\").load()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------------------+---+-----+--------+-------------------+--------+\n", | |
"| _id|age|group| name| skills| surname|\n", | |
"+--------------------+---+-----+--------+-------------------+--------+\n", | |
"|[5f537b2945e17234...| 22| 1A| John| [drawing, skiing]| Smith|\n", | |
"|[5f537b2945e17234...| 24| 1B| Mike| [chess, swimming]| Jones|\n", | |
"|[5f537b2945e17234...| 28| 2A| Diana|[curling, swimming]|Williams|\n", | |
"|[5f537b2945e17234...| 21| 1B|Samantha| [guitar, singing]| Brown|\n", | |
"+--------------------+---+-----+--------+-------------------+--------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"students.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Transform" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"students_with_groups = (students\n", | |
" .join(groups, groups.group_number == students.group, how='left')\n", | |
" .select(\"name\",\"surname\",\"age\",\"group_id\",\"group_number\",\"something_important\",\"skills\")\n", | |
" )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"| name| surname|age|group_id|group_number|something_important| skills|\n", | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"| John| Smith| 22| 1| 1A| 100| [drawing, skiing]|\n", | |
"| Diana|Williams| 28| 2| 2A| 102|[curling, swimming]|\n", | |
"| Mike| Jones| 24| 4| 1B| 123| [chess, swimming]|\n", | |
"|Samantha| Brown| 21| 4| 1B| 123| [guitar, singing]|\n", | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"students_with_groups.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Load" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"(students_with_groups\n", | |
" .write\n", | |
" .format(\"org.apache.spark.sql.cassandra\")\n", | |
" .option(\"keyspace\",\"school\")\n", | |
" .option(\"table\",\"students\")\n", | |
" .mode(\"append\")\n", | |
" .save())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"| name| surname|age|group_id|group_number| skills|something_important|\n", | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"|Samantha| Brown| 21| 4| 1B| [guitar, singing]| 123|\n", | |
"| John| Smith| 22| 1| 1A| [drawing, skiing]| 100|\n", | |
"| Mike| Jones| 24| 4| 1B| [chess, swimming]| 123|\n", | |
"| Diana|Williams| 28| 2| 2A|[curling, swimming]| 102|\n", | |
"+--------+--------+---+--------+------------+-------------------+-------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"spark.table(\"casscatalog.school.students\").show()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment