Skip to content

Instantly share code, notes, and snippets.

@KayvanShah1
Created May 27, 2022 11:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KayvanShah1/ac5db4fb481b9683069f2c0d1784cd11 to your computer and use it in GitHub Desktop.
Save KayvanShah1/ac5db4fb481b9683069f2c0d1784cd11 to your computer and use it in GitHub Desktop.
Prefect 2.0 ORION: Flow in Flow Example
tickers:
- ticker_code: ^IXIC
name: NASDAQ Composite
- ticker_code: ^GSPC
name: S&P 500
- ticker_code: ^DJI
name: Dow Jones Industrial Average
- ticker_code: ^BSESN
name: S&P BSE SENSEX
- ticker_code: ^RUT
name: Russell 2000
- ticker_code: ^GDAXI
name: DAX PERFORMANCE-INDEX
- ticker_code: ^N225
name: Nikkei 225
import json
import os
import random
from datetime import date
from pathlib import Path
import pandas as pd
import yaml
import yfinance as yf
from pandas_datareader import data as pdr
yf.pdr_override()
from prefect import flow, task
from prefect.logging import get_logger, get_run_logger
from prefect.task_runners import SequentialTaskRunner
BASE_PATH = Path(__file__).parent
YAHOO_CONFIG_YAML = os.path.join(BASE_PATH, "yahoo.yaml")
YAHOO_DATA_PATH = os.path.join(BASE_PATH, "data")
@task
def read_config(metadata_path: str) -> dict:
"""This function reads the metadata from a JSON or YAML file
Args:
metadata_path (str): Path to metadata JSON or YAML file
Returns:
dict: The JSON configuration file
"""
with open(metadata_path, "r") as f:
if metadata_path.split(".") == "json":
return json.load(f)
return yaml.load(f, Loader=yaml.SafeLoader)
@task
def get_ticker_data(
ticker_dict: dict,
start_date: str = date.today().strftime("%Y-%m-%d"),
end_date: str = date.today().strftime("%Y-%m-%d"),
):
ticker = ticker_dict["ticker_code"]
data = pdr.get_data_yahoo(ticker, start=start_date, end=end_date)
return data
@task
def clean_ticker_data(df):
df = df.reset_index()
df.columns = [col.lower().replace(" ", "_") for col in df.columns]
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.sort_values(by="date", ascending=True)
return df
@task
def basic_preprocess(df):
df.iloc[:, 1:] = df.iloc[:, 1:].astype("float64")
df = df.interpolate()
df = df.fillna(method="ffill")
return df
@task
def save_ticker_data(ticker_dict: dict, cleaned_data):
logger = get_run_logger()
ticker, ticker_name = ticker_dict["ticker_code"], ticker_dict["name"]
if not os.path.exists(YAHOO_DATA_PATH):
os.makedirs(YAHOO_DATA_PATH)
file_name = f"{ticker_name}({ticker}).parquet"
file_path = os.path.join(YAHOO_DATA_PATH, file_name)
if os.path.exists(file_path):
df = pd.read_parquet(file_path)
cleaned_data = df.append(cleaned_data)
cleaned_data = cleaned_data.reset_index()
cleaned_data.drop_duplicates(subset="date", keep="last", inplace=True)
cleaned_data.to_parquet(file_path, index=False)
logger.info(f"{ticker_name} data saved at {file_path}")
@flow(task_runner=SequentialTaskRunner())
def ticker_single_pipeline(ticker_dict: dict):
data = get_ticker_data(ticker_dict, start_date="2022-05-01")
data = clean_ticker_data(data)
data = basic_preprocess(data)
data = save_ticker_data(ticker_dict, data)
@flow
def ticker_mutli_pipeline():
conf = read_config(YAHOO_CONFIG_YAML)
for ticker_dict in conf.result()["tickers"]:
ticker_single_pipeline(ticker_dict)
ticker_mutli_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment