This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### metrics_layer.py | |
import pandas as pd | |
from hamilton import driver | |
def workspaces(db: str, table: str='workspaces') -> pd.DataFrame: | |
return query("SELECT * from {db}.{table}".format(db)) | |
def count_workspaces(workspaces: pd.DataFrame, grain: str) -> pd.Series: | |
"""Pandas code that does... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from pandas.core.indexes.datetimes import DatetimeIndex | |
import pandas as pd | |
from typing import Tuple, List, Dict | |
import functools as ft | |
from hamilton.function_modifiers import inject, resolve, ResolveAt, parameterized_subdag, source, value, group | |
# Quick hack to keep everything in the same runnable file | |
# But you should probably store these in their own modules :) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from hamilton.function_modifiers import check_output_custom | |
from hamilton.data_quality.base import DataValidator, ValidationResult, DataValidationLevel | |
from hamilton.ad_hoc_utils import create_temporary_module | |
from hamilton import driver | |
class UniqueColumnsValidator(DataValidator): | |
def __init__(self, importance: str): | |
super(UniqueColumnsValidator, self).__init__(importance=importance) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, Any, List | |
from hamilton.ad_hoc_utils import create_temporary_module | |
from hamilton import driver | |
from hamilton.driver import Variable | |
from hamilton.function_modifiers import parameterize_sources | |
import pandas as pd | |
foos = ['a', 'b', 'c'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
from typing import Callable | |
import pandas as pd | |
from pyspark.sql import Column, DataFrame, SparkSession | |
from hamilton import base, driver, htypes | |
from hamilton.ad_hoc_utils import create_temporary_module | |
from hamilton.experimental.h_spark import with_columns | |
from hamilton.function_modifiers.dependencies import source, value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def file_to_load() -> Parallelizable[str]: | |
for file_ in _list_files(...): | |
yield file_ | |
def file_contents(file_to_load: str) -> str: | |
with open(file_to_load) as f: | |
return f.read() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functions | |
from hamilton.execution import executors | |
dr = ( | |
driver.Builder() | |
.enable_dynamic_execution(allow_experimental_mode=True) | |
# these are the deafults -- use the SynchronousLocalTaskExecutor | |
# for remote if you want to iterate on a small dataset | |
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5)) | |
.with_local_executor(executors.SynchronousLocalTaskExecutor()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def starcount_url(repositories: List[str]) -> Parallelizable[str]: | |
"""Generates API URLs for counting stars on a repo. We do this | |
so we can paginate requests later. | |
:param repo: The repository name in the format 'organization/repo' | |
:return: A URL to the GitHub API | |
""" | |
for repo in repositories: | |
yield f"https://api.github.com/repos/{repo}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def star_count(starcount_url: str, github_api_key: str) -> Tuple[str, int]: | |
"""Generates the star count for a given repo. | |
:param starcount_url: URL of the repo | |
:param github_api_key: API key for GitHub | |
:return: A tuple of the repo name and the star count | |
""" | |
response = requests.get( | |
starcount_url, headers={"Authorization": f"token {github_api_key}"} | |
) | |
response.raise_for_status() # Raise an exception for unsuccessful requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def stars_by_repo(star_count: Collect[Tuple[str, int]]) -> Dict[str, int]: | |
"""Aggregates the star count for each repo into a dictionary, so we | |
can generate paginated requests. | |
:param star_count: A tuple of the repo name and the star count | |
:return: The star count for each repo | |
""" | |
star_count_dict = {} | |
for repo_name, stars in star_count: | |
star_count_dict[repo_name] = stars | |
return star_count_dict |
OlderNewer