Skip to content

Instantly share code, notes, and snippets.

@daradecic
Created June 26, 2021 12:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save daradecic/3479e50de5189aecdbe39d9e7e3a2862 to your computer and use it in GitHub Desktop.
Save daradecic/3479e50de5189aecdbe39d9e7e3a2862 to your computer and use it in GitHub Desktop.
import json
import requests
import pandas as pd
from datetime import datetime
from prefect import task, Flow, Parameter
@task
def extract(url: str) -> dict:
res = requests.get(url)
if not res:
raise Exception('No data fetched!')
return json.loads(res.content)
@task
def transform(data: dict) -> pd.DataFrame:
transformed = []
for user in data:
transformed.append({
'ID': user['id'],
'Name': user['name'],
'Username': user['username'],
'Email': user['email'],
'Address': f"{user['address']['street']}, {user['address']['suite']}, {user['address']['city']}",
'PhoneNumber': user['phone'],
'Company': user['company']['name']
})
return pd.DataFrame(transformed)
@task
def load(data: pd.DataFrame, path: str) -> None:
data.to_csv(path_or_buf=path, index=False)
def prefect_flow():
with Flow(name='simple_etl_pipeline') as flow:
param_url = Parameter(name='p_url', required=True)
users = extract(url=param_url)
df_users = transform(users)
load(data=df_users, path=f'data/users_{int(datetime.now().timestamp())}.csv')
return flow
if __name__ == '__main__':
flow = prefect_flow()
flow.run(parameters={
'p_url': 'https://jsonplaceholder.typicode.com/users'
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment