Skip to content

Instantly share code, notes, and snippets.

@zorteran
Created September 5, 2020 18:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zorteran/8ca73ff33c2daee2a5e517c25866bf1f to your computer and use it in GitHub Desktop.
Save zorteran/8ca73ff33c2daee2a5e517c25866bf1f to your computer and use it in GitHub Desktop.
Simple ETL joining data from MySQL and MongoDB and wriiting to Cassandra
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: findspark in /opt/conda/lib/python3.8/site-packages (1.3.0)\r\n"
]
}
],
"source": [
"!pip install findspark"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,mysql:mysql-connector-java:5.1.49 pyspark-shell'"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import findspark\n",
"findspark.init()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"spark = (SparkSession.builder\n",
" .appName('simple_etl')\n",
" .config('spark.mongodb.input.uri', 'mongodb://root:example@mongo/school.students?authSource=admin')\n",
" .config('spark.mongodb.output.uri', 'mongodb://root:example@mongo/school.students?authSource=admin')\n",
" .config(\"spark.cassandra.connection.host\", \"cassandra\")\n",
" .config(\"spark.cassandra.auth.username\", \"cassandra\")\n",
" .config(\"spark.cassandra.auth.password\", \"cassandra\")\n",
" .config(\"spark.sql.extensions\", \"com.datastax.spark.connector.CassandraSparkExtensions\")\n",
" .config(\"spark.sql.catalog.casscatalog\",\"com.datastax.spark.connector.datasource.CassandraCatalog\")\n",
" .getOrCreate())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extract"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read from MySql"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"groups = (spark.read\n",
" .format(\"jdbc\")\n",
" .option(\"url\", \"jdbc:mysql://mysql/school\")\n",
" .option(\"driver\", \"com.mysql.jdbc.Driver\")\n",
" .option(\"dbtable\", \"groups\")\n",
" .option(\"user\", \"user\")\n",
" .option(\"password\", \"password\")\n",
" .load())"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+------------+-----------+-------------------+\n",
"|group_id|group_number|description|something_important|\n",
"+--------+------------+-----------+-------------------+\n",
"| 1| 1A| some group| 100|\n",
"| 2| 2A| some group| 102|\n",
"| 3| 3A| some group| 101|\n",
"| 4| 1B| some group| 123|\n",
"| 5| 2B| some group| 133|\n",
"| 6| 3B| some group| 144|\n",
"+--------+------------+-----------+-------------------+\n",
"\n"
]
}
],
"source": [
"groups.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read from Mongo"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"students = spark.read.format(\"mongo\").load()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---+-----+--------+-------------------+--------+\n",
"| _id|age|group| name| skills| surname|\n",
"+--------------------+---+-----+--------+-------------------+--------+\n",
"|[5f537b2945e17234...| 22| 1A| John| [drawing, skiing]| Smith|\n",
"|[5f537b2945e17234...| 24| 1B| Mike| [chess, swimming]| Jones|\n",
"|[5f537b2945e17234...| 28| 2A| Diana|[curling, swimming]|Williams|\n",
"|[5f537b2945e17234...| 21| 1B|Samantha| [guitar, singing]| Brown|\n",
"+--------------------+---+-----+--------+-------------------+--------+\n",
"\n"
]
}
],
"source": [
"students.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Transform"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"students_with_groups = (students\n",
" .join(groups, groups.group_number == students.group, how='left')\n",
" .select(\"name\",\"surname\",\"age\",\"group_id\",\"group_number\",\"something_important\",\"skills\")\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"| name| surname|age|group_id|group_number|something_important| skills|\n",
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"| John| Smith| 22| 1| 1A| 100| [drawing, skiing]|\n",
"| Diana|Williams| 28| 2| 2A| 102|[curling, swimming]|\n",
"| Mike| Jones| 24| 4| 1B| 123| [chess, swimming]|\n",
"|Samantha| Brown| 21| 4| 1B| 123| [guitar, singing]|\n",
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"\n"
]
}
],
"source": [
"students_with_groups.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"(students_with_groups\n",
" .write\n",
" .format(\"org.apache.spark.sql.cassandra\")\n",
" .option(\"keyspace\",\"school\")\n",
" .option(\"table\",\"students\")\n",
" .mode(\"append\")\n",
" .save())"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"| name| surname|age|group_id|group_number| skills|something_important|\n",
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"|Samantha| Brown| 21| 4| 1B| [guitar, singing]| 123|\n",
"| John| Smith| 22| 1| 1A| [drawing, skiing]| 100|\n",
"| Mike| Jones| 24| 4| 1B| [chess, swimming]| 123|\n",
"| Diana|Williams| 28| 2| 2A|[curling, swimming]| 102|\n",
"+--------+--------+---+--------+------------+-------------------+-------------------+\n",
"\n"
]
}
],
"source": [
"spark.table(\"casscatalog.school.students\").show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment