Skip to content

Instantly share code, notes, and snippets.

@rossturk
Last active September 16, 2022 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/79f4075e259c8b16d83f2c1b69e612b4 to your computer and use it in GitHub Desktop.
Save rossturk/79f4075e259c8b16d83f2c1b69e612b4 to your computer and use it in GitHub Desktop.
astro-homebrew-history
from datetime import datetime
from astro import sql as aql
from astro.sql.table import Table
from airflow.models import DAG
from git import Repo
from os.path import exists
from airflow.operators.python import get_current_context
import pandas as pd
import io
import json
CONN_ID = "dwh"
CLONE_DIR = "/tmp/homebrew-formulae-repo"
@aql.dataframe
def get_homebrew_stats():
"""
#### Get Homebrew Download Stats
This grabs download stats for a given day from the formulae git repo.
"""
if exists(CLONE_DIR):
print("Found an existing local clone, pulling from origin.")
repo = Repo(CLONE_DIR)
repo.remotes.origin.pull()
else:
print("There is no local clone of the repo, creating one.")
repo = Repo.clone_from("https://github.com/Homebrew/formulae.brew.sh.git", CLONE_DIR)
context = get_current_context()
data_interval_start = context["data_interval_start"]
data_interval_end = context["data_interval_end"]
print(
"getting stats for {}".format(data_interval_end)
)
commit = list(repo.iter_commits(
'--all',
max_count=1,
since=data_interval_start,
until=data_interval_end,
paths='_data/analytics/install/30d.json',
))[0]
targetfile = commit.tree / '_data/analytics/install/30d.json'
with io.BytesIO(targetfile.data_stream.read()) as f:
#data = json.load(f)
stats = pd.DataFrame(json.load(f)["items"])
stats[stats['formula'].str.contains('astro')] # TODO: do better filtering
return stats
with DAG(
"astro-homebrew-history",
schedule_interval="@daily",
start_date=datetime(2014, 10, 6),
catchup=True,
default_args={
"retries": 2,
"depends_on_past": True,
},
tags=["homebrew", "astro"],
) as dag:
"""
### Astro Homebrew Download Stats
This is a simple pipeline that analyzes Homebrew download stats
"""
stats = get_homebrew_stats(
output_table=Table(
conn_id=CONN_ID,
)
)
reporting_table = aql.append(
target_table=Table(
name="ASTRO_HOMEBREW_HISTORY",
conn_id=CONN_ID,
),
source_table=stats,
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment