Skip to content

Instantly share code, notes, and snippets.

@gallir
Created April 3, 2021 22:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gallir/5353d7020dfdd397d853063d4fc56b49 to your computer and use it in GitHub Desktop.
Save gallir/5353d7020dfdd397d853063d4fc56b49 to your computer and use it in GitHub Desktop.
base covid
import pandas as pd
import time
from datetime import date
import pathlib
class Series(object):
def __init__(self, name, schema, start, end, freq, top=0, use_cache=False, flag=False):
self.df = None
self.name = name
self.schema = schema
self.start = start
self.end = end
if self.end is None:
self.end = date.today()
self.freq = freq
self.top = top
self.use_cache = use_cache
self.flag = flag
self.cache_file = f"/tmp/cache_{self.name}_{self.start.isoformat()}_{self.end.isoformat()}_{self.freq}_{self.flag}_top{self.top}.pickle"
def read(self):
if self.use_cache:
if pathlib.Path(self.cache_file).exists():
df = pd.read_pickle(self.cache_file)
if df.shape[0] > 0:
self.df = df
def name(self):
return self.name
def freq(self):
return self.freq
def get_df(self) -> pd.DataFrame:
if self.df is not None:
return self.df
def store_cache(self):
self.df.to_pickle(self.cache_file)
def get_schema(self):
# Return the schema for the datasets
items = []
for c in self.df.columns:
if c in self.schema:
items.append({
"AttributeName": c,
"AttributeType": self.schema[c],
})
return {"Attributes": items}
def format(self):
# Format the dataframe to the schema
invalids = [c for c in self.df.columns if c not in self.schema]
if invalids:
self.df = self.df.drop(columns=invalids)
def explode(self, on_column: str, new_column: str, map: dict):
self.df[new_column] = self.df[on_column].apply(lambda x: map[x])
self.df = self.df.explode(new_column)
def rename_col(self, old: str, new: str):
self.df.rename(columns={old: new}, inplace=True)
def to_csv(self, filename: str):
self.df.to_csv(filename, index=False)
def top_keys(self, n: int, key_col: str, value_col: str):
keys = self.df[[key_col, value_col]].groupby([key_col], as_index=False).sum()
return set(keys.nlargest(n, [value_col])[key_col])
def filter_top(self, n: int, key_col: str, value_col: str):
if n <= 0:
return
top = self.top_keys(n, key_col, value_col)
self.df = self.df[self.df[key_col].isin(top)]
def cursor_as_pandas_retry(cursor, query, retries=3, sleep=3):
while retries > 0:
try:
return cursor.execute(query).as_pandas()
except Exception as e:
time.sleep(sleep)
retries -= 1
print(f"Retrying {retries} query {query}: {str(e)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment