Last active
April 12, 2019 11:15
-
-
Save fpaupier/7fe0567ac0cc0b6bcec40362442be2eb to your computer and use it in GitHub Desktop.
What the steps of an ETL pipeline would look like
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample extract transform load pipeline to illustrate how Nifi can simplify the construction | |
of a dataflow. | |
NOTE: This code is simply to illustrate an idea and is not a working ETL pipeline. | |
""" | |
import os | |
import db_utils | |
import data_sanitizer | |
def ingest(source, user, password): | |
""" | |
Get data from the specified source. | |
Let's say we use a Change Data Capture to listen to changes in the DB. | |
Args: | |
source (str): address of the data source | |
user (str) | |
password (str) | |
Returns: | |
data (string): json representation of the ingested data | |
""" | |
# Establish connection with the db | |
connection = db_utils.connect(source, user, password) | |
# Get the latest modified data of all the tables of the DB | |
data = connection.capture_data_changes("all-tables") | |
return data | |
def process(data): | |
""" | |
Transform the data in a more appropriate format. | |
Enrich it with some kind of analytics. | |
Args: | |
data (str): | |
Returns: | |
transformed_data (str): json representation data enriched with some kind of metadata and analytics | |
""" | |
formatted_data = data_sanitizer.format(data) | |
# Trim some values | |
processed_data = data_sanitizer.trim(formatted_data) | |
# Add context information like timestamp | |
enriched_data = data_sanitizer.add_metadata(processed_data) | |
return enriched_data | |
def save(data, fpath): | |
""" | |
Save data to the disk | |
Args: | |
data (str): content to save to teh disk | |
fpath (str): path to the saved file | |
Returns: True if saving succeeded | |
""" | |
# Open file | |
fd = os.open(fpath, os.O_RDWR | os.O_CREAT) | |
# Writing text | |
os.write(fd, str.encode(data)) | |
return True | |
if __name__ == "__main__": | |
""" | |
Start the ingestion pipeline whe script is launched. | |
""" | |
# Define source and user | |
data_source = "example.db.com:5432" | |
password = "superStrongPassword" | |
user = "jDoe" | |
fpath = "/path/to/data.json" | |
# 1. Get the data from the DB | |
raw_data = ingest(data_source, user, password) | |
# 2. Process this data | |
processed_data = process(raw_data) | |
# 3. Save this data to the disk | |
save(processed_data, fpath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment