Created
August 24, 2020 20:35
-
-
Save anna-anisienia/11c9f946007b8556d948925ef98608fa to your computer and use it in GitHub Desktop.
Basic ETL pipeline with Prefect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Flow, task | |
import pandas as pd | |
def score_check(grade, subject, student): | |
""" | |
This is a normal "business logic" function which is not a Prefect task. | |
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL. | |
:param grade: number of points on an exam | |
:param subject: school subject | |
:param student: name of the student | |
:return: final nr of points | |
""" | |
if pd.notnull(subject) and grade > 90: | |
new_grade = grade * 2 | |
print(f'Doubled score: {new_grade}, Subject: {subject}, Student name: {student}') | |
return new_grade | |
else: | |
return grade | |
@task | |
def extract(): | |
""" Return a dataframe with students and their grades""" | |
data = {'Name': ['Hermione', 'Hermione', 'Hermione', 'Hermione', 'Hermione', | |
'Ron', 'Ron', 'Ron', 'Ron', 'Ron', | |
'Harry', 'Harry', 'Harry', 'Harry', 'Harry'], | |
'Age': [12] * 15, | |
'Subject': ['History of Magic', 'Dark Arts', 'Potions', 'Flying', None, | |
'History of Magic', 'Dark Arts', 'Potions', 'Flying', None, | |
'History of Magic', 'Dark Arts', 'Potions', 'Flying', None], | |
'Score': [100, 100, 100, 68, 99, | |
45, 53, 39, 87, 99, | |
67, 86, 37, 100, 99]} | |
df = pd.DataFrame(data) | |
return df | |
@task(log_stdout=True) | |
def transform(x): | |
x["New_Score"] = x.apply(lambda row: score_check(grade=row['Score'], | |
subject=row['Subject'], | |
student=row['Name']), axis=1) | |
return x | |
@task(log_stdout=True) | |
def load(y): | |
old = y["Score"].tolist() | |
new = y["New_Score"].tolist() | |
print(f"ETL finished. Old scores: {old}. New scores: {new}") | |
with Flow("basic-prefect-etl-flow") as flow: | |
extracted_df = extract() | |
transformed_df = transform(extracted_df) | |
load(transformed_df) | |
if __name__ == '__main__': | |
# flow.run() | |
flow.register(project_name='Medium_AWS_Prefect') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment