Last active
September 17, 2021 09:01
-
-
Save WillSams/233d9c84fa63d88b9ec6a45f7f7291e1 to your computer and use it in GitHub Desktop.
Simple ETL example In Python Using PETL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# Simple ETL example | |
# Just extracting one table from a database and loading it into another | |
# Adapted from https://www.linkedin.com/pulse/easy-etl-python-beginners-oscar-valles | |
# Pre-req is Docker. If you are a savage and haven't used Docker yet, here are the install instructions on any disto with Ubuntu as the base: | |
# . /etc/os-release; RELEASE=$UBUNTU_CODENAME | |
# sudo bash -c "add-apt-repository 'deb [arch=amd64] https://download.docker.com/linux/ubuntu $RELEASE stable'" | |
# curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - | |
# sudo bash -c "apt update && apt install docker-ce docker-ce-cli containerd.io docker-compose -y" | |
# sudo bash -c "groupadd docker" | |
# sudo bash -c "usermod -aG docker $USER" | |
# newgrp docker | |
# This script uses Python version management so I don't pollute my system's environment. Not necessary for | |
# this example, but to install pyenv, just follow these GNU Bash instructions for Linux, WSL, or (maybe) Git Bash on Windows: | |
# git clone https://github.com/pyenv/pyenv.git ~/.pyenv | |
# echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc | |
# echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc | |
# echo 'eval "$(pyenv init -)"' >> ~/.bashrc | |
# source ~/.bashrc | |
# git clone https://github.com/pyenv/pyenv-virtualenv.git $HOME/.pyenv/plugins/pyenv-virtualenv | |
# echo 'eval "$(pyenv virtualenv-init -)"' >> ~/.bashrc | |
# source ~/.bashrc | |
########## 1 - Setup ########## | |
mkdir simple_etl && cd $_ | |
# The pyenv steps can be skipped if you don't want to use it | |
pyenv virtualenv-delete simple_etl #delete it if it already exists | |
pyenv virtualenv simple_etl | |
pyenv activate simple_etl | |
python3 -m pip install --upgrade pip | |
pip install petl psycopg2 | |
echo "export DB_USER=acme | |
export DB_PORT_PRODUCTS=15432 | |
export DB_PORT_SALES=15433 | |
export DB_PASS=passw0rd123" >| .envrc | |
direnv allow | |
# Creating of .editorconfig is not necessary, only using it to help enforce PEP-8 in my code editor | |
echo "root = true | |
# Unix-style newlines with a newline ending every file | |
[*] | |
end_of_line = lf | |
insert_final_newline = true | |
trim_trailing_whitespace = true | |
charset = utf-8 | |
# 4 space indentation | |
[*.py] | |
indent_style = space | |
indent_size = 4 | |
[{*.yml,*.json}] | |
indent_style = space | |
indent_size = 2" > .editorconfig | |
git init . | |
wget https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -O .gitignore | |
########## 2 - Set up database ########## | |
echo 'version: "3" | |
services: | |
products-database: | |
container_name: acme-products-db | |
image: postgres:13.1-alpine | |
environment: | |
- POSTGRES_USER=${DB_USER} | |
- POSTGRES_PASSWORD=${DB_PASS} | |
ports: | |
- "${DB_PORT_PRODUCTS}:5432" | |
volumes: | |
- acme-products-db-data:/var/lib/postgresql/data1 | |
networks: | |
- acme | |
sales-database: | |
container_name: acme-sales-db | |
image: postgres:13.1-alpine | |
environment: | |
- POSTGRES_USER=${DB_USER} | |
- POSTGRES_PASSWORD=${DB_PASS} | |
ports: | |
- "${DB_PORT_SALES}:5432" | |
volumes: | |
- acme-sales-db-data:/var/lib/postgresql/data2 | |
networks: | |
- acme | |
volumes: | |
acme-products-db-data: | |
acme-sales-db-data: | |
networks: | |
acme:' >| docker-compose.yml | |
docker-compose pull && docker-compose up -d | |
echo "DROP TABLE IF EXISTS products; | |
CREATE TABLE products ( | |
id smallint NOT NULL, | |
product_name character varying(40) NOT NULL, | |
quantity_per_unit character varying(20), | |
unit_price real | |
); | |
INSERT INTO products VALUES (1, 'Chai', '10 boxes x 30 bags', 18); | |
INSERT INTO products VALUES (2, 'Chang', '24 - 12 oz bottles', 19); | |
INSERT INTO products VALUES (3, 'Aniseed Syrup', '12 - 550 ml bottles', 10); | |
INSERT INTO products VALUES (4, 'Chef Anton''s Cajun Seasoning', '48 - 6 oz jars', 22); | |
INSERT INTO products VALUES (5, 'Chef Anton''s Gumbo Mix', '36 boxes', 21.3500004);" >| products.sql | |
env PGPASSWORD=$DB_PASS psql -h 127.0.0.1 -p $DB_PORT_PRODUCTS -U $DB_USER -a -f products.sql | |
echo "DROP TABLE IF EXISTS sales_poerson; | |
DROP TABLE IF EXISTS orders; | |
CREATE TABLE sales_person ( | |
id smallint NOT NULL, | |
region character varying(255), | |
manager character varying(255) | |
); | |
CREATE TABLE orders ( | |
id smallint NOT NULL, | |
order_date date | |
); | |
INSERT INTO sales_person VALUES (100, 'Maria Anders'); | |
INSERT INTO sales_person VALUES (100, 'Ana Trujillo'); | |
INSERT INTO sales_person VALUES (100, 'Francisco Chang'); | |
INSERT INTO orders VALUES (10249, '2021-09-15'); | |
INSERT INTO orders VALUES (10250, '2021-09-18'); | |
INSERT INTO orders VALUES (10251, '2021-09-18'); | |
INSERT INTO orders VALUES (10252, '2021-09-19');;" >| sales.sql | |
env PGPASSWORD=$DB_PASS psql -h 127.0.0.1 -p $DB_PORT_SALES -U $DB_USER -a -f sales.sql | |
########## 3 - Using Python, copy Products table from acme-products-db to acme-sales-db ########## | |
echo "import petl as etl, psycopg2 as pg, sys | |
from importlib import reload | |
reload(sys) | |
dbconns = { | |
'products':'dbname=acme user=$DB_USER password=$DB_PASS host=127.0.0.1 port=$DB_PORT_PRODUCTS', | |
'sales':'dbname=acme user=$DB_USER password=$DB_PASS host=127.0.0.1 port=$DB_PORT_SALES' | |
} | |
# set connections and cursors | |
sourceconn = pg.connect(dbconns['products']) #grab value by referencing key dictionary | |
targetconn = pg.connect(dbconns['sales']) #grab value by referencing key dictionary | |
sourcecursor = sourceconn.cursor() | |
targetcursor = targetconn.cursor() | |
# retrieve the names of the source tables to be copied | |
sourcecursor.execute(\"""select table_name from information_schema.columns where table_name in ('products','returns') group by 1\""") | |
sourcetables = sourcecursor.fetchall() | |
# iterate through table names to copy over | |
for t in sourcetables: | |
sourceDs = etl.fromdb(sourceconn, 'select * from %s' % (t[0])) | |
etl.todb(sourceDs, targetconn, t[0], create=True, sample=10000)" >| pyetl.py | |
########## 4 - Verfiy products table now exists in sales database ########## | |
env PGPASSWORD=$DB_PASS psql -h 127.0.0.1 -p $DB_PORT_SALES -U $DB_USER -c 'SELECT * FROM products' | |
########## 5 - Tear down ########## | |
docker-compose down | |
pyenv deactivate |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment