Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Last active June 28, 2021 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/ea4372f2f2d76df277ea3cb4d85bab0a to your computer and use it in GitHub Desktop.
Save anna-geller/ea4372f2f2d76df277ea3cb4d85bab0a to your computer and use it in GitHub Desktop.
import logging
import pandas as pd
import awswrangler as wr
from src.timeseries_data_generator import TimeseriesGenerator
def upload_timeseries_data_to_s3(df: pd.DataFrame) -> None:
result = wr.s3.to_parquet(
df,
path=f"s3://data-lake-bronze/timeseries/",
index=False,
dataset=True,
database="timeseries",
table="ge-demo",
)
logger.info("New file uploaded: %s", result)
def test_happy_path() -> None:
df = TimeseriesGenerator(
start_date="2021-01-01", end_date="2021-01-31 23:59"
).get_timeseries()
upload_timeseries_data_to_s3(df)
def test_bad_order_of_columns() -> None:
"""Order of columns here is the opposite of what we expect in a time series"""
df = TimeseriesGenerator(
start_date="2021-02-01", end_date="2021-02-28 23:59"
).get_timeseries()
df = df[["value", "timestamp"]]
upload_timeseries_data_to_s3(df)
def test_incomplete_data() -> None:
"""Data doesn't contain the full month of July"""
df = TimeseriesGenerator(
start_date="2021-03-01", end_date="2021-03-25 23:59"
).get_timeseries()
upload_timeseries_data_to_s3(df)
def test_missing_nr_values() -> None:
df = TimeseriesGenerator(
start_date="2021-04-01", end_date="2021-04-30 23:59"
).get_timeseries()
df.at[0, "value"] = None
upload_timeseries_data_to_s3(df)
def test_missing_timestamp() -> None:
df = TimeseriesGenerator(
start_date="2021-05-01", end_date="2021-05-31 23:59"
).get_timeseries()
df.at[0, "timestamp"] = None
upload_timeseries_data_to_s3(df)
def test_incorrect_data_type_nr_column() -> None:
df = TimeseriesGenerator(
start_date="2021-06-01", end_date="2021-06-30 23:59"
).get_timeseries()
df["value"] = df["value"].astype(float)
upload_timeseries_data_to_s3(df)
def test_incorrect_data_type_dt_column() -> None:
df = TimeseriesGenerator(
start_date="2021-07-01", end_date="2021-07-31 23:59"
).get_timeseries()
df["timestamp"] = df["timestamp"].astype(str)
upload_timeseries_data_to_s3(df)
def test_incorrect_value_range() -> None:
df = TimeseriesGenerator(
start_date="2021-08-01", end_date="2021-08-31 23:59", min_value=0, max_value=120
).get_timeseries()
upload_timeseries_data_to_s3(df)
if __name__ == "__main__":
logging.basicConfig(
format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s", level="INFO"
)
logger = logging.getLogger(__name__)
test_happy_path()
test_bad_order_of_columns()
test_incomplete_data()
test_missing_nr_values()
test_missing_timestamp()
test_incorrect_data_type_nr_column()
test_incorrect_data_type_dt_column()
test_incorrect_value_range()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment