Skip to content

Instantly share code, notes, and snippets.

View elijahbenizzy's full-sized avatar

Elijah ben Izzy elijahbenizzy

View GitHub Profile
@elijahbenizzy
elijahbenizzy / metrics_layer.py
Created September 15, 2022 00:19
Metrics Layer using Hamilton
### metrics_layer.py
import pandas as pd
from hamilton import driver
def workspaces(db: str, table: str='workspaces') -> pd.DataFrame:
return query("SELECT * from {db}.{table}".format(db))
def count_workspaces(workspaces: pd.DataFrame, grain: str) -> pd.Series:
"""Pandas code that does...
import numpy as np
from pandas.core.indexes.datetimes import DatetimeIndex
import pandas as pd
from typing import Tuple, List, Dict
import functools as ft
from hamilton.function_modifiers import inject, resolve, ResolveAt, parameterized_subdag, source, value, group
# Quick hack to keep everything in the same runnable file
# But you should probably store these in their own modules :)
from hamilton.function_modifiers import check_output_custom
from hamilton.data_quality.base import DataValidator, ValidationResult, DataValidationLevel
from hamilton.ad_hoc_utils import create_temporary_module
from hamilton import driver
class UniqueColumnsValidator(DataValidator):
def __init__(self, importance: str):
super(UniqueColumnsValidator, self).__init__(importance=importance)
from typing import Dict, Any, List
from hamilton.ad_hoc_utils import create_temporary_module
from hamilton import driver
from hamilton.driver import Variable
from hamilton.function_modifiers import parameterize_sources
import pandas as pd
foos = ['a', 'b', 'c']
import inspect
from typing import Callable
import pandas as pd
from pyspark.sql import Column, DataFrame, SparkSession
from hamilton import base, driver, htypes
from hamilton.ad_hoc_utils import create_temporary_module
from hamilton.experimental.h_spark import with_columns
from hamilton.function_modifiers.dependencies import source, value
def file_to_load() -> Parallelizable[str]:
for file_ in _list_files(...):
yield file_
def file_contents(file_to_load: str) -> str:
with open(file_to_load) as f:
return f.read()
import functions
from hamilton.execution import executors
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
# these are the deafults -- use the SynchronousLocalTaskExecutor
# for remote if you want to iterate on a small dataset
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))
.with_local_executor(executors.SynchronousLocalTaskExecutor())
def starcount_url(repositories: List[str]) -> Parallelizable[str]:
"""Generates API URLs for counting stars on a repo. We do this
so we can paginate requests later.
:param repo: The repository name in the format 'organization/repo'
:return: A URL to the GitHub API
"""
for repo in repositories:
yield f"https://api.github.com/repos/{repo}"
def star_count(starcount_url: str, github_api_key: str) -> Tuple[str, int]:
"""Generates the star count for a given repo.
:param starcount_url: URL of the repo
:param github_api_key: API key for GitHub
:return: A tuple of the repo name and the star count
"""
response = requests.get(
starcount_url, headers={"Authorization": f"token {github_api_key}"}
)
response.raise_for_status() # Raise an exception for unsuccessful requests
def stars_by_repo(star_count: Collect[Tuple[str, int]]) -> Dict[str, int]:
"""Aggregates the star count for each repo into a dictionary, so we
can generate paginated requests.
:param star_count: A tuple of the repo name and the star count
:return: The star count for each repo
"""
star_count_dict = {}
for repo_name, stars in star_count:
star_count_dict[repo_name] = stars
return star_count_dict