Skip to content

Instantly share code, notes, and snippets.

@Wittline
Last active June 13, 2022 09:53
Show Gist options
  • Save Wittline/9308a337ec28503b49760faa6bd5bb1e to your computer and use it in GitHub Desktop.
Save Wittline/9308a337ec28503b49760faa6bd5bb1e to your computer and use it in GitHub Desktop.
test_livy.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "-hHsmlGN0ETj"
},
"source": [
"# **Installing livyc** 🔧"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "z9HFmSWHybp8",
"outputId": "5a9cd433-f9ef-4519-cf8e-e63c1db3655d"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting livyc==0.0.13Note: you may need to restart the kernel to use updated packages.\n",
" Downloading livyc-0.0.13-py3-none-any.whl (4.9 kB)\n",
"Installing collected packages: livyc\n",
" Attempting uninstall: livyc\n",
" Found existing installation: livyc 0.0.12\n",
" Uninstalling livyc-0.0.12:\n",
" Successfully uninstalled livyc-0.0.12\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.\n",
"You should consider upgrading via the 'c:\\python\\python37\\python.exe -m pip install --upgrade pip' command.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Successfully installed livyc-0.0.13\n"
]
}
],
"source": [
"pip install livyc==0.0.13"
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "YB7JoZCj9luc",
"outputId": "5fba4b3b-d468-42ec-ec08-a7c3597dac99"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: psycopg2 in c:\\python\\python37\\lib\\site-packages (2.8.6)\n",
"Note: you may need to restart the kernel to use updated packages.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.\n",
"You should consider upgrading via the 'c:\\python\\python37\\python.exe -m pip install --upgrade pip' command.\n"
]
}
],
"source": [
"pip install psycopg2"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "TsGFrrZG0Xgj"
},
"source": [
"# **Importing livyc library** ⚡"
]
},
{
"cell_type": "code",
"execution_count": 86,
"metadata": {
"id": "svZULhLhy7yo"
},
"outputs": [],
"source": [
"from livyc import livyc"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nUdzhm0D1ABb"
},
"source": [
"# **Setting livy configuration** ✍"
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {
"id": "utAaRVFq0b8q"
},
"outputs": [],
"source": [
"data_livy = {\n",
" \"livy_server_url\": \"localhost\",\n",
" \"port\": \"8998\",\n",
" \"jars\": [\"org.postgresql:postgresql:42.3.1\"]\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Ru1gpwWu13hJ"
},
"source": [
"# **Populate PostgreSQL DB with data** 🗄"
]
},
{
"cell_type": "code",
"execution_count": 136,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "86xKmv_43nd_",
"outputId": "eb127be2-3453-4fe6-9211-3ab4383217cf"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connecting to the PostgreSQL database\n",
"PostgreSQL database version:\n",
"('PostgreSQL 14.3 (Debian 14.3-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit',)\n",
"Dropping tables\n",
"Tables dropped\n",
"Creating created\n",
"Tables created\n",
"Copying data from .csv to staging zone\n",
"Staging ready\n",
"Table staging has 106 records\n",
"Database connection closed\n"
]
}
],
"source": [
"# DROP TABLES\n",
"staging_table_drop = \"DROP TABLE IF EXISTS staging\"\n",
"\n",
"# CREATE TABLES\n",
"\n",
"staging_table_create = (\"\"\"\n",
"CREATE TABLE IF NOT EXISTS staging(\n",
" id serial PRIMARY KEY NOT NULL,\n",
" first_name varchar,\n",
" last_name varchar,\n",
" company_name varchar,\n",
" address varchar,\n",
" city varchar,\n",
" state varchar,\n",
" zip varchar,\n",
" phone1 varchar,\n",
" phone2 varchar,\n",
" email varchar,\n",
" department varchar\n",
");\n",
"\"\"\")\n",
"\n",
"create_table_queries = [staging_table_create]\n",
"drop_table_queries = [staging_table_drop]\n",
"\n",
"\n",
"import psycopg2\n",
"import pandas as pd\n",
"import os\n",
"\n",
"\n",
"def create_connection(params):\n",
" \"\"\"\n",
" create a new connection with the postgreSQL \n",
" database and return the cur and conn object\n",
" :param params: connection string \n",
" \"\"\"\n",
" conn = None\n",
"\n",
" try:\n",
" print('Connecting to the PostgreSQL database')\n",
" conn = psycopg2.connect(**params)\n",
" conn.set_session(autocommit=True)\n",
"\n",
" cur = conn.cursor()\n",
"\n",
" print('PostgreSQL database version:')\n",
" cur.execute('SELECT version()')\n",
"\n",
" db_version = cur.fetchone()\n",
" print(db_version) \n",
" return cur, conn\n",
" except (Exception, psycopg2.DatabaseError) as error:\n",
" print(error)\n",
"\n",
"\n",
"def close_connection(cur, conn):\n",
" \"\"\"\n",
" close the connection with the postgreSQL database \n",
" :param cur: cursor\n",
" :param conn: connection object\n",
" \"\"\"\n",
" try:\n",
" cur.close()\n",
" if conn is not None:\n",
" conn.close()\n",
" print('Database connection closed') \n",
" except (Exception, psycopg2.DatabaseError) as error:\n",
" print(error)\n",
"\n",
"\n",
"def drop_tables(cur, conn):\n",
" \"\"\"\n",
" drop all the tables in the example \n",
" :param cur: cursor\n",
" :param conn: connection object\n",
" \"\"\"\n",
" print(\"Dropping tables\")\n",
" for query in drop_table_queries: \n",
" cur.execute(query)\n",
" conn.commit()\n",
" print(\"Tables dropped\")\n",
"\n",
"\n",
"def create_tables(cur, conn):\n",
" \"\"\"\n",
" create all the tables in the example \n",
" :param cur: cursor\n",
" :param conn: connection object\n",
" \"\"\"\n",
" print(\"Creating created\")\n",
" for query in create_table_queries:\n",
" cur.execute(query)\n",
" conn.commit()\n",
" print(\"Tables created\")\n",
"\n",
"\n",
"def check_data(cur, conn, tables):\n",
" \"\"\"\n",
" Check count of records in tables\n",
" :param cur: cursor\n",
" :param conn: connection object\n",
" :param tables: tables to check\n",
" \"\"\"\n",
"\n",
" count_values = {}\n",
"\n",
" for table in tables:\n",
" query_count = \"SELECT COUNT(*) FROM {0}\".format(table)\n",
"\n",
" try:\n",
" cur = conn.cursor()\n",
" cur.execute(query_count)\n",
" count_values[table] = cur.fetchone()[0] \n",
" except (Exception, psycopg2.DatabaseError) as error:\n",
" print(\"Error: %s\" % error)\n",
" raise\n",
"\n",
" return count_values\n",
"\n",
"def set_staging(cur, conn, staging_file, columns):\n",
"\n",
" print(\"Copying data from .csv to staging zone\")\n",
"\n",
" try:\n",
" copy_cmd = f\"copy staging({','.join(columns)}) from stdout (format csv)\"\n",
" with open(staging_file, 'r') as f:\n",
" next(f)\n",
" cur.copy_expert(copy_cmd, f) \n",
" conn.commit()\n",
" print(\"Staging ready\")\n",
" except (psycopg2.Error) as e:\n",
" print(e)\n",
"\n",
" \n",
" \n",
"class Pipeline:\n",
"\n",
" def __init__(self, params, staging_file):\n",
" self.params = params\n",
" self.staging_file = staging_file\n",
"\n",
" def run(self):\n",
" tables = ['staging']\n",
" columns_staging = ['first_name','last_name','company_name','address','city','state','zip','phone1','phone2','email','department']\n",
" cur, conn = create_connection(self.params)\n",
" drop_tables(cur, conn)\n",
" create_tables(cur, conn)\n",
" set_staging(cur, conn, self.staging_file, columns_staging)\n",
" count_tables = check_data(cur, conn, tables)\n",
" for k, v in count_tables.items():\n",
" print(\"Table {0} has {1} records\".format(k, v))\n",
" close_connection(cur, conn)\n",
"\n",
"\n",
"params = {\"host\": \"localhost\", \"port\":\"5432\", \"database\": \"db\", \"user\": \"postgres\", \"password\": \"pg12345\"}\n",
"\n",
"\n",
"staging_file = \"./Documents/sample.csv\"\n",
"pipeline = Pipeline(params, staging_file)\n",
"pipeline.run()\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "eHoROghy3m9o"
},
"source": [
"# **Let's try launch a pySpark script to Apache Livy Server** 🤓"
]
},
{
"cell_type": "code",
"execution_count": 137,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "GlujsSQK4ZIo",
"outputId": "a9d562e8-27c6-490b-f8ec-11ab0109aa1e"
},
"outputs": [
{
"data": {
"text/plain": [
"'\\n\\n from pyspark.sql.functions import udf, col, explode\\n from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType\\n from pyspark.sql import Row\\n from pyspark.sql import SparkSession\\n\\n\\n df = spark.read.format(\"jdbc\") .option(\"url\", \"jdbc:postgresql://pg_container:5432/db\") .option(\"driver\", \"org.postgresql.Driver\") .option(\"dbtable\", \"staging\") .option(\"user\", \"postgres\") .option(\"password\", \"pg12345\") .load()\\n \\n n_rows = df.count()\\n\\n spark.stop()\\n'"
]
},
"execution_count": 137,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"params[\"table\"] = \"staging\"\n",
"\n",
"pyspark_script = \"\"\"\n",
"\n",
" from pyspark.sql.functions import udf, col, explode\n",
" from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType\n",
" from pyspark.sql import Row\n",
" from pyspark.sql import SparkSession\n",
"\n",
"\n",
" df = spark.read.format(\"jdbc\") \\\n",
" .option(\"url\", \"jdbc:postgresql://pg_container:{port}/{database}\") \\\n",
" .option(\"driver\", \"org.postgresql.Driver\") \\\n",
" .option(\"dbtable\", \"{table}\") \\\n",
" .option(\"user\", \"{user}\") \\\n",
" .option(\"password\", \"{password}\") \\\n",
" .load()\n",
" \n",
" n_rows = df.count()\n",
"\n",
" spark.stop()\n",
"\"\"\"\n",
"\n",
"pyspark_script = pyspark_script.format(**params)\n",
"pyspark_script\n"
]
},
{
"cell_type": "code",
"execution_count": 157,
"metadata": {
"id": "MEcMsmKh12FK"
},
"outputs": [],
"source": [
"lvy = livyc.LivyC(data_livy)"
]
},
{
"cell_type": "code",
"execution_count": 161,
"metadata": {
"id": "65aOoDKv2J0S"
},
"outputs": [],
"source": [
"session = lvy.create_session()"
]
},
{
"cell_type": "code",
"execution_count": 162,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "Uv8PYIBPykd7",
"outputId": "5292aff7-3de1-42e9-cd8b-cc6687f79c9e"
},
"outputs": [
{
"data": {
"text/plain": [
"''"
]
},
"execution_count": 162,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lvy.run_script(session, pyspark_script)"
]
},
{
"cell_type": "code",
"execution_count": 163,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "g8HL2K4fypX7",
"outputId": "c2626dc0-5eec-4342-ac9e-b0c51c5a6a7b"
},
"outputs": [
{
"data": {
"text/plain": [
"106"
]
},
"execution_count": 163,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lvy.read_variable(session, \"n_rows\")"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"name": "test-livy.ipynb",
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment