Skip to content

Instantly share code, notes, and snippets.

@fpaupier
Last active April 12, 2019 11:15
Show Gist options
  • Save fpaupier/7fe0567ac0cc0b6bcec40362442be2eb to your computer and use it in GitHub Desktop.
Save fpaupier/7fe0567ac0cc0b6bcec40362442be2eb to your computer and use it in GitHub Desktop.
What the steps of an ETL pipeline would look like
"""
Sample extract transform load pipeline to illustrate how Nifi can simplify the construction
of a dataflow.
NOTE: This code is simply to illustrate an idea and is not a working ETL pipeline.
"""
import os
import db_utils
import data_sanitizer
def ingest(source, user, password):
"""
Get data from the specified source.
Let's say we use a Change Data Capture to listen to changes in the DB.
Args:
source (str): address of the data source
user (str)
password (str)
Returns:
data (string): json representation of the ingested data
"""
# Establish connection with the db
connection = db_utils.connect(source, user, password)
# Get the latest modified data of all the tables of the DB
data = connection.capture_data_changes("all-tables")
return data
def process(data):
"""
Transform the data in a more appropriate format.
Enrich it with some kind of analytics.
Args:
data (str):
Returns:
transformed_data (str): json representation data enriched with some kind of metadata and analytics
"""
formatted_data = data_sanitizer.format(data)
# Trim some values
processed_data = data_sanitizer.trim(formatted_data)
# Add context information like timestamp
enriched_data = data_sanitizer.add_metadata(processed_data)
return enriched_data
def save(data, fpath):
"""
Save data to the disk
Args:
data (str): content to save to teh disk
fpath (str): path to the saved file
Returns: True if saving succeeded
"""
# Open file
fd = os.open(fpath, os.O_RDWR | os.O_CREAT)
# Writing text
os.write(fd, str.encode(data))
return True
if __name__ == "__main__":
"""
Start the ingestion pipeline whe script is launched.
"""
# Define source and user
data_source = "example.db.com:5432"
password = "superStrongPassword"
user = "jDoe"
fpath = "/path/to/data.json"
# 1. Get the data from the DB
raw_data = ingest(data_source, user, password)
# 2. Process this data
processed_data = process(raw_data)
# 3. Save this data to the disk
save(processed_data, fpath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment