Created
December 16, 2022 22:44
-
-
Save rossturk/4b5c14df30dc9876155edfe5ad9fb913 to your computer and use it in GitHub Desktop.
Grab Homebrew download stats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from astro import sql as aql | |
from astro.sql.table import Table | |
from airflow.models import DAG | |
from git import Repo | |
from os.path import exists | |
from airflow.operators.python import get_current_context | |
from airflow.exceptions import AirflowSkipException | |
import pandas as pd | |
import io | |
import json | |
CONN_ID = "dwh" | |
CLONE_DIR = "/tmp/homebrew-formulae-repo" | |
@aql.dataframe | |
def get_homebrew_stats(): | |
""" | |
#### Get Homebrew Download Stats | |
This grabs download stats for a given day from the formulae git repo. | |
""" | |
if exists(CLONE_DIR): | |
print("Found an existing local clone, pulling from origin.") | |
repo = Repo(CLONE_DIR) | |
repo.remotes.origin.pull() | |
else: | |
print("There is no local clone of the repo, creating one.") | |
repo = Repo.clone_from( | |
"https://github.com/Homebrew/formulae.brew.sh.git", CLONE_DIR | |
) | |
context = get_current_context() | |
data_interval_start = context["data_interval_start"] | |
data_interval_end = context["data_interval_end"] | |
print("getting stats for {}".format(data_interval_end)) | |
commits = list( | |
repo.iter_commits( | |
"--all", | |
max_count=1, | |
since=data_interval_start, | |
until=data_interval_end, | |
paths="_data/analytics/install/30d.json", | |
) | |
) | |
if len(commits) == 0: | |
raise AirflowSkipException | |
commit = commits[0] | |
targetfile = commit.tree / "_data/analytics/install/30d.json" | |
with io.BytesIO(targetfile.data_stream.read()) as f: | |
# data = json.load(f) | |
stats = pd.DataFrame(json.load(f)["items"]) | |
filtered_stats = stats[stats["formula"].str.contains("astro")] # TODO: do better filtering | |
if len(filtered_stats) == 0: | |
raise AirflowSkipException | |
filtered_stats["date"] = context["ds"] | |
return filtered_stats | |
with DAG( | |
"astro-cli-hb-analytics", | |
schedule_interval="@daily", | |
start_date=datetime(2018, 5, 29), | |
catchup=True, | |
default_args={ | |
"retries": 2, | |
"depends_on_past": True, | |
}, | |
tags=["homebrew", "astro"], | |
) as dag: | |
""" | |
### Airflow Commits by File | |
This is a simple pipeline that analyzes commits in the Airflow repo | |
""" | |
stats = get_homebrew_stats( | |
output_table=Table( | |
conn_id=CONN_ID, | |
) | |
) | |
reporting_table = aql.append( | |
target_table=Table( | |
name="ASTRO_HOMEBREW_ANALYTICS", | |
conn_id=CONN_ID, | |
), | |
source_table=stats, | |
) | |
aql.cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment