Skip to content

Instantly share code, notes, and snippets.

@anna-anisienia
Created August 24, 2020 22:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-anisienia/978fcf3645096453e0cbe48d0d3dd694 to your computer and use it in GitHub Desktop.
Save anna-anisienia/978fcf3645096453e0cbe48d0d3dd694 to your computer and use it in GitHub Desktop.
Basic ETL with Prefect using Docker storage
from prefect.environments.storage import Docker
from prefect import Flow, task
import pandas as pd
def score_check(grade, subject, student):
"""
This is a normal "business logic" function which is not a Prefect task.
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL.
:param grade: number of points on an exam
:param subject: school subject
:param student: name of the student
:return: final nr of points
"""
if pd.notnull(subject) and grade > 90:
new_grade = grade * 2
print(f'Doubled score: {new_grade}, Subject: {subject}, Student name: {student}')
return new_grade
else:
return grade
@task
def extract():
""" Return a dataframe with students and their grades"""
data = {'Name': ['Hermione', 'Hermione', 'Hermione', 'Hermione', 'Hermione',
'Ron', 'Ron', 'Ron', 'Ron', 'Ron',
'Harry', 'Harry', 'Harry', 'Harry', 'Harry'],
'Age': [12] * 15,
'Subject': ['History of Magic', 'Dark Arts', 'Potions', 'Flying', None,
'History of Magic', 'Dark Arts', 'Potions', 'Flying', None,
'History of Magic', 'Dark Arts', 'Potions', 'Flying', None],
'Score': [100, 100, 100, 68, 99,
45, 53, 39, 87, 99,
67, 86, 37, 100, 99]}
df = pd.DataFrame(data)
return df
@task(log_stdout=True)
def transform(x):
x["New_Score"] = x.apply(lambda row: score_check(grade=row['Score'],
subject=row['Subject'],
student=row['Name']), axis=1)
return x
@task(log_stdout=True)
def load(y):
old = y["Score"].tolist()
new = y["New_Score"].tolist()
print(f"ETL finished. Old scores: {old}. New scores: {new}")
with Flow("basic-prefect-etl-flow",
storage=Docker(registry_url="<YOUR_ECR_REGISTRY_ID>.dkr.ecr.eu-central-1.amazonaws.com",
python_dependencies=["pandas==1.1.0"],
image_tag='latest')) as flow:
extracted_df = extract()
transformed_df = transform(extracted_df)
load(transformed_df)
if __name__ == '__main__':
flow.register(project_name='Medium_AWS_Prefect')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment