Last active
September 3, 2022 05:38
-
-
Save rossturk/1e593a3482de1a48b2ab2a70ea2cbea0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from os.path import exists | |
from pydriller import Repository, ModificationType | |
from git import Repo | |
from airflow.models import DAG | |
from airflow.operators.python import get_current_context | |
from airflow.exceptions import AirflowSkipException | |
from astro import sql as aql | |
from astro.sql.table import Table | |
import pandas as pd | |
# The name of the Airflow connection for our target DB | |
CONN_ID = "dwh" | |
# A spot to check our repo out - persist this, perhaps? | |
CLONE_DIR = "/tmp/airflow-repo" | |
@aql.dataframe | |
def get_commits_and_files(): | |
""" | |
#### Get Commit List | |
This uses the pydriller package to grab the commits and files changed | |
for a given day. | |
""" | |
# First thing, make sure we have a checked out copy of the repo we are studying | |
if exists(CLONE_DIR): | |
print("Found an existing local clone, pulling from origin.") | |
repo = Repo(CLONE_DIR) | |
repo.remotes.origin.pull() | |
else: | |
print("There is no local clone of the repo, creating one.") | |
repo = Repo.clone_from("https://github.com/apache/airflow", CLONE_DIR) | |
# Airflow tells us the time interval to fetch data for | |
context = get_current_context() | |
data_interval_start = context["data_interval_start"] | |
data_interval_end = context["data_interval_end"] | |
print( | |
"Getting commits from {} to {}".format(data_interval_start, data_interval_end) | |
) | |
commits = Repository( | |
CLONE_DIR, | |
since=data_interval_start, | |
to=data_interval_end, | |
only_in_branch="main", | |
).traverse_commits() | |
changed_files = [] | |
num_files = 0 | |
for commit in commits: | |
for file in commit.modified_files: | |
if file.change_type == ModificationType.DELETE: | |
path = file.old_path | |
else: | |
path = file.new_path | |
changed_files.append( | |
{ | |
"hash": commit.hash, | |
"commit_date": commit.committer_date.strftime("%Y-%m-%d"), | |
"path": path, | |
"added_lines": file.added_lines, | |
"deleted_lines": file.deleted_lines, | |
"nloc": file.nloc, | |
"complexity": file.complexity, | |
} | |
) | |
num_files += 1 | |
# There were no changed files during this interval, so skip the downstream tasks | |
if num_files == 0: | |
raise AirflowSkipException | |
return pd.DataFrame(changed_files, | |
columns=[ | |
"hash", | |
"commit_date", | |
"path", | |
"added_lines", | |
"deleted_lines", | |
"nloc", | |
"complexity", | |
] | |
) | |
with DAG( | |
"git-commits-by-file", | |
schedule_interval="@daily", | |
start_date=datetime(2014, 10, 6), | |
catchup=True, | |
default_args={ | |
"retries": 2, | |
"depends_on_past": True, | |
}, | |
tags=["git", "airflow"], | |
) as dag: | |
""" | |
### Airflow Commits by File | |
This is a simple pipeline that analyzes commits in the Airflow repo | |
""" | |
# Get all the commits for our time interval and load them into a temporary table | |
daily_commits = get_commits_and_files( | |
output_table=Table( | |
conn_id=CONN_ID, | |
) | |
) | |
# Append the temporary table to our COMMITS_AND_FILES table | |
reporting_table = aql.append( | |
target_table=Table( | |
name="COMMITS_AND_FILES", | |
conn_id=CONN_ID, | |
), | |
source_table=daily_commits, | |
) | |
aql.cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment