Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created September 16, 2021 13:57
Show Gist options
  • Save anna-geller/a3c48e5015db00f704827a2ce95f03a6 to your computer and use it in GitHub Desktop.
Save anna-geller/a3c48e5015db00f704827a2ce95f03a6 to your computer and use it in GitHub Desktop.
import awswrangler as wr
from datetime import date
import logging
import pandas as pd
from pandas_datareader.data import DataReader
class StockDataReader:
def __init__(
self,
ticker_symbol: str,
dataset_name: str = "stock_market_data",
start_date: date = date(2021, 1, 1),
) -> None:
self.ticker_symbol = ticker_symbol
self.dataset_name = dataset_name
self.start_date = start_date
self._logger = logging.getLogger(type(self).__name__)
def extract(self) -> pd.DataFrame:
return DataReader(
name=self.ticker_symbol,
data_source="av-daily",
start=self.start_date,
end=date.today(),
)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df["ticker_symbol"] = self.ticker_symbol
return df
def load_to_data_lake(self, df: pd.DataFrame) -> None:
wr.s3.to_parquet(
df,
index=True,
dataset=True,
mode="append",
database="default",
table=self.dataset_name,
path=f"s3://data-lake-bronze/{self.dataset_name}/{self.ticker_symbol}",
)
self._logger.info(
"%s data with %d rows successfully loaded to data lake table: %s",
self.ticker_symbol,
len(df),
self.dataset_name,
)
def run_etl(self):
df = self.extract()
df = self.transform(df)
self.load_to_data_lake(df)
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s - [%(levelname)s] %(message)s", level="INFO"
)
for ticker_symbol_ in ["AAPL", "MSFT", "GOOGL", "TSLA"]:
StockDataReader(ticker_symbol_).run_etl()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment