Skip to content

Instantly share code, notes, and snippets.

@rossturk
Created December 16, 2022 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/4b5c14df30dc9876155edfe5ad9fb913 to your computer and use it in GitHub Desktop.
Save rossturk/4b5c14df30dc9876155edfe5ad9fb913 to your computer and use it in GitHub Desktop.
Grab Homebrew download stats
from datetime import datetime
from astro import sql as aql
from astro.sql.table import Table
from airflow.models import DAG
from git import Repo
from os.path import exists
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
import pandas as pd
import io
import json
CONN_ID = "dwh"
CLONE_DIR = "/tmp/homebrew-formulae-repo"
@aql.dataframe
def get_homebrew_stats():
"""
#### Get Homebrew Download Stats
This grabs download stats for a given day from the formulae git repo.
"""
if exists(CLONE_DIR):
print("Found an existing local clone, pulling from origin.")
repo = Repo(CLONE_DIR)
repo.remotes.origin.pull()
else:
print("There is no local clone of the repo, creating one.")
repo = Repo.clone_from(
"https://github.com/Homebrew/formulae.brew.sh.git", CLONE_DIR
)
context = get_current_context()
data_interval_start = context["data_interval_start"]
data_interval_end = context["data_interval_end"]
print("getting stats for {}".format(data_interval_end))
commits = list(
repo.iter_commits(
"--all",
max_count=1,
since=data_interval_start,
until=data_interval_end,
paths="_data/analytics/install/30d.json",
)
)
if len(commits) == 0:
raise AirflowSkipException
commit = commits[0]
targetfile = commit.tree / "_data/analytics/install/30d.json"
with io.BytesIO(targetfile.data_stream.read()) as f:
# data = json.load(f)
stats = pd.DataFrame(json.load(f)["items"])
filtered_stats = stats[stats["formula"].str.contains("astro")] # TODO: do better filtering
if len(filtered_stats) == 0:
raise AirflowSkipException
filtered_stats["date"] = context["ds"]
return filtered_stats
with DAG(
"astro-cli-hb-analytics",
schedule_interval="@daily",
start_date=datetime(2018, 5, 29),
catchup=True,
default_args={
"retries": 2,
"depends_on_past": True,
},
tags=["homebrew", "astro"],
) as dag:
"""
### Airflow Commits by File
This is a simple pipeline that analyzes commits in the Airflow repo
"""
stats = get_homebrew_stats(
output_table=Table(
conn_id=CONN_ID,
)
)
reporting_table = aql.append(
target_table=Table(
name="ASTRO_HOMEBREW_ANALYTICS",
conn_id=CONN_ID,
),
source_table=stats,
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment