Skip to content

Instantly share code, notes, and snippets.

@daradecic
Created June 26, 2021 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daradecic/b99a98617a7e9a8044c1501f835c9e7d to your computer and use it in GitHub Desktop.
Save daradecic/b99a98617a7e9a8044c1501f835c9e7d to your computer and use it in GitHub Desktop.
import json
import requests
import pandas as pd
from datetime import datetime
from prefect import task, Flow, Parameter
from prefect.schedules import CronSchedule
@task
def extract(url: str) -> dict:
res = requests.get(url)
if not res:
raise Exception('No data fetched!')
return json.loads(res.content)
@task
def transform(data: dict) -> pd.DataFrame:
transformed = []
for user in data:
transformed.append({
'ID': user['id'],
'Name': user['name'],
'Username': user['username'],
'Email': user['email'],
'Address': f"{user['address']['street']}, {user['address']['suite']}, {user['address']['city']}",
'PhoneNumber': user['phone'],
'Company': user['company']['name']
})
return pd.DataFrame(transformed)
@task
def load(data: pd.DataFrame, path: str) -> None:
data.to_csv(path_or_buf=path, index=False)
scheduler = CronSchedule(
cron='* * * * *'
)
def prefect_flow():
with Flow(name='simple_etl_pipeline', schedule=scheduler) as flow:
param_url = Parameter(name='p_url', required=True)
users = extract(url=param_url)
df_users = transform(users)
load(data=df_users, path=f'data/users_{int(datetime.now().timestamp())}.csv')
return flow
if __name__ == '__main__':
flow = prefect_flow()
flow.run(parameters={
'p_url': 'https://jsonplaceholder.typicode.com/users'
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment