Last active
June 13, 2022 09:53
-
-
Save Wittline/9308a337ec28503b49760faa6bd5bb1e to your computer and use it in GitHub Desktop.
test_livy.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "-hHsmlGN0ETj" | |
}, | |
"source": [ | |
"# **Installing livyc** 🔧" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 83, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "z9HFmSWHybp8", | |
"outputId": "5a9cd433-f9ef-4519-cf8e-e63c1db3655d" | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Collecting livyc==0.0.13Note: you may need to restart the kernel to use updated packages.\n", | |
" Downloading livyc-0.0.13-py3-none-any.whl (4.9 kB)\n", | |
"Installing collected packages: livyc\n", | |
" Attempting uninstall: livyc\n", | |
" Found existing installation: livyc 0.0.12\n", | |
" Uninstalling livyc-0.0.12:\n", | |
" Successfully uninstalled livyc-0.0.12\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.\n", | |
"You should consider upgrading via the 'c:\\python\\python37\\python.exe -m pip install --upgrade pip' command.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\n", | |
"Successfully installed livyc-0.0.13\n" | |
] | |
} | |
], | |
"source": [ | |
"pip install livyc==0.0.13" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 84, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "YB7JoZCj9luc", | |
"outputId": "5fba4b3b-d468-42ec-ec08-a7c3597dac99" | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Requirement already satisfied: psycopg2 in c:\\python\\python37\\lib\\site-packages (2.8.6)\n", | |
"Note: you may need to restart the kernel to use updated packages.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.\n", | |
"You should consider upgrading via the 'c:\\python\\python37\\python.exe -m pip install --upgrade pip' command.\n" | |
] | |
} | |
], | |
"source": [ | |
"pip install psycopg2" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "TsGFrrZG0Xgj" | |
}, | |
"source": [ | |
"# **Importing livyc library** ⚡" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 86, | |
"metadata": { | |
"id": "svZULhLhy7yo" | |
}, | |
"outputs": [], | |
"source": [ | |
"from livyc import livyc" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "nUdzhm0D1ABb" | |
}, | |
"source": [ | |
"# **Setting livy configuration** ✍" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 88, | |
"metadata": { | |
"id": "utAaRVFq0b8q" | |
}, | |
"outputs": [], | |
"source": [ | |
"data_livy = {\n", | |
" \"livy_server_url\": \"localhost\",\n", | |
" \"port\": \"8998\",\n", | |
" \"jars\": [\"org.postgresql:postgresql:42.3.1\"]\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "Ru1gpwWu13hJ" | |
}, | |
"source": [ | |
"# **Populate PostgreSQL DB with data** 🗄" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 136, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "86xKmv_43nd_", | |
"outputId": "eb127be2-3453-4fe6-9211-3ab4383217cf" | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Connecting to the PostgreSQL database\n", | |
"PostgreSQL database version:\n", | |
"('PostgreSQL 14.3 (Debian 14.3-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit',)\n", | |
"Dropping tables\n", | |
"Tables dropped\n", | |
"Creating created\n", | |
"Tables created\n", | |
"Copying data from .csv to staging zone\n", | |
"Staging ready\n", | |
"Table staging has 106 records\n", | |
"Database connection closed\n" | |
] | |
} | |
], | |
"source": [ | |
"# DROP TABLES\n", | |
"staging_table_drop = \"DROP TABLE IF EXISTS staging\"\n", | |
"\n", | |
"# CREATE TABLES\n", | |
"\n", | |
"staging_table_create = (\"\"\"\n", | |
"CREATE TABLE IF NOT EXISTS staging(\n", | |
" id serial PRIMARY KEY NOT NULL,\n", | |
" first_name varchar,\n", | |
" last_name varchar,\n", | |
" company_name varchar,\n", | |
" address varchar,\n", | |
" city varchar,\n", | |
" state varchar,\n", | |
" zip varchar,\n", | |
" phone1 varchar,\n", | |
" phone2 varchar,\n", | |
" email varchar,\n", | |
" department varchar\n", | |
");\n", | |
"\"\"\")\n", | |
"\n", | |
"create_table_queries = [staging_table_create]\n", | |
"drop_table_queries = [staging_table_drop]\n", | |
"\n", | |
"\n", | |
"import psycopg2\n", | |
"import pandas as pd\n", | |
"import os\n", | |
"\n", | |
"\n", | |
"def create_connection(params):\n", | |
" \"\"\"\n", | |
" create a new connection with the postgreSQL \n", | |
" database and return the cur and conn object\n", | |
" :param params: connection string \n", | |
" \"\"\"\n", | |
" conn = None\n", | |
"\n", | |
" try:\n", | |
" print('Connecting to the PostgreSQL database')\n", | |
" conn = psycopg2.connect(**params)\n", | |
" conn.set_session(autocommit=True)\n", | |
"\n", | |
" cur = conn.cursor()\n", | |
"\n", | |
" print('PostgreSQL database version:')\n", | |
" cur.execute('SELECT version()')\n", | |
"\n", | |
" db_version = cur.fetchone()\n", | |
" print(db_version) \n", | |
" return cur, conn\n", | |
" except (Exception, psycopg2.DatabaseError) as error:\n", | |
" print(error)\n", | |
"\n", | |
"\n", | |
"def close_connection(cur, conn):\n", | |
" \"\"\"\n", | |
" close the connection with the postgreSQL database \n", | |
" :param cur: cursor\n", | |
" :param conn: connection object\n", | |
" \"\"\"\n", | |
" try:\n", | |
" cur.close()\n", | |
" if conn is not None:\n", | |
" conn.close()\n", | |
" print('Database connection closed') \n", | |
" except (Exception, psycopg2.DatabaseError) as error:\n", | |
" print(error)\n", | |
"\n", | |
"\n", | |
"def drop_tables(cur, conn):\n", | |
" \"\"\"\n", | |
" drop all the tables in the example \n", | |
" :param cur: cursor\n", | |
" :param conn: connection object\n", | |
" \"\"\"\n", | |
" print(\"Dropping tables\")\n", | |
" for query in drop_table_queries: \n", | |
" cur.execute(query)\n", | |
" conn.commit()\n", | |
" print(\"Tables dropped\")\n", | |
"\n", | |
"\n", | |
"def create_tables(cur, conn):\n", | |
" \"\"\"\n", | |
" create all the tables in the example \n", | |
" :param cur: cursor\n", | |
" :param conn: connection object\n", | |
" \"\"\"\n", | |
" print(\"Creating created\")\n", | |
" for query in create_table_queries:\n", | |
" cur.execute(query)\n", | |
" conn.commit()\n", | |
" print(\"Tables created\")\n", | |
"\n", | |
"\n", | |
"def check_data(cur, conn, tables):\n", | |
" \"\"\"\n", | |
" Check count of records in tables\n", | |
" :param cur: cursor\n", | |
" :param conn: connection object\n", | |
" :param tables: tables to check\n", | |
" \"\"\"\n", | |
"\n", | |
" count_values = {}\n", | |
"\n", | |
" for table in tables:\n", | |
" query_count = \"SELECT COUNT(*) FROM {0}\".format(table)\n", | |
"\n", | |
" try:\n", | |
" cur = conn.cursor()\n", | |
" cur.execute(query_count)\n", | |
" count_values[table] = cur.fetchone()[0] \n", | |
" except (Exception, psycopg2.DatabaseError) as error:\n", | |
" print(\"Error: %s\" % error)\n", | |
" raise\n", | |
"\n", | |
" return count_values\n", | |
"\n", | |
"def set_staging(cur, conn, staging_file, columns):\n", | |
"\n", | |
" print(\"Copying data from .csv to staging zone\")\n", | |
"\n", | |
" try:\n", | |
" copy_cmd = f\"copy staging({','.join(columns)}) from stdout (format csv)\"\n", | |
" with open(staging_file, 'r') as f:\n", | |
" next(f)\n", | |
" cur.copy_expert(copy_cmd, f) \n", | |
" conn.commit()\n", | |
" print(\"Staging ready\")\n", | |
" except (psycopg2.Error) as e:\n", | |
" print(e)\n", | |
"\n", | |
" \n", | |
" \n", | |
"class Pipeline:\n", | |
"\n", | |
" def __init__(self, params, staging_file):\n", | |
" self.params = params\n", | |
" self.staging_file = staging_file\n", | |
"\n", | |
" def run(self):\n", | |
" tables = ['staging']\n", | |
" columns_staging = ['first_name','last_name','company_name','address','city','state','zip','phone1','phone2','email','department']\n", | |
" cur, conn = create_connection(self.params)\n", | |
" drop_tables(cur, conn)\n", | |
" create_tables(cur, conn)\n", | |
" set_staging(cur, conn, self.staging_file, columns_staging)\n", | |
" count_tables = check_data(cur, conn, tables)\n", | |
" for k, v in count_tables.items():\n", | |
" print(\"Table {0} has {1} records\".format(k, v))\n", | |
" close_connection(cur, conn)\n", | |
"\n", | |
"\n", | |
"params = {\"host\": \"localhost\", \"port\":\"5432\", \"database\": \"db\", \"user\": \"postgres\", \"password\": \"pg12345\"}\n", | |
"\n", | |
"\n", | |
"staging_file = \"./Documents/sample.csv\"\n", | |
"pipeline = Pipeline(params, staging_file)\n", | |
"pipeline.run()\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "eHoROghy3m9o" | |
}, | |
"source": [ | |
"# **Let's try launch a pySpark script to Apache Livy Server** 🤓" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 137, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "GlujsSQK4ZIo", | |
"outputId": "a9d562e8-27c6-490b-f8ec-11ab0109aa1e" | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'\\n\\n from pyspark.sql.functions import udf, col, explode\\n from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType\\n from pyspark.sql import Row\\n from pyspark.sql import SparkSession\\n\\n\\n df = spark.read.format(\"jdbc\") .option(\"url\", \"jdbc:postgresql://pg_container:5432/db\") .option(\"driver\", \"org.postgresql.Driver\") .option(\"dbtable\", \"staging\") .option(\"user\", \"postgres\") .option(\"password\", \"pg12345\") .load()\\n \\n n_rows = df.count()\\n\\n spark.stop()\\n'" | |
] | |
}, | |
"execution_count": 137, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"\n", | |
"params[\"table\"] = \"staging\"\n", | |
"\n", | |
"pyspark_script = \"\"\"\n", | |
"\n", | |
" from pyspark.sql.functions import udf, col, explode\n", | |
" from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType\n", | |
" from pyspark.sql import Row\n", | |
" from pyspark.sql import SparkSession\n", | |
"\n", | |
"\n", | |
" df = spark.read.format(\"jdbc\") \\\n", | |
" .option(\"url\", \"jdbc:postgresql://pg_container:{port}/{database}\") \\\n", | |
" .option(\"driver\", \"org.postgresql.Driver\") \\\n", | |
" .option(\"dbtable\", \"{table}\") \\\n", | |
" .option(\"user\", \"{user}\") \\\n", | |
" .option(\"password\", \"{password}\") \\\n", | |
" .load()\n", | |
" \n", | |
" n_rows = df.count()\n", | |
"\n", | |
" spark.stop()\n", | |
"\"\"\"\n", | |
"\n", | |
"pyspark_script = pyspark_script.format(**params)\n", | |
"pyspark_script\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 157, | |
"metadata": { | |
"id": "MEcMsmKh12FK" | |
}, | |
"outputs": [], | |
"source": [ | |
"lvy = livyc.LivyC(data_livy)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 161, | |
"metadata": { | |
"id": "65aOoDKv2J0S" | |
}, | |
"outputs": [], | |
"source": [ | |
"session = lvy.create_session()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 162, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "Uv8PYIBPykd7", | |
"outputId": "5292aff7-3de1-42e9-cd8b-cc6687f79c9e" | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"''" | |
] | |
}, | |
"execution_count": 162, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"lvy.run_script(session, pyspark_script)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 163, | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "g8HL2K4fypX7", | |
"outputId": "c2626dc0-5eec-4342-ac9e-b0c51c5a6a7b" | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"106" | |
] | |
}, | |
"execution_count": 163, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"lvy.read_variable(session, \"n_rows\")" | |
] | |
} | |
], | |
"metadata": { | |
"colab": { | |
"collapsed_sections": [], | |
"name": "test-livy.ipynb", | |
"provenance": [] | |
}, | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.9" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment