Created
December 16, 2022 22:36
-
-
Save rossturk/d1d8a8461b0e760caa58661164fd74e5 to your computer and use it in GitHub Desktop.
Gather commits by file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from pydriller import Repository, ModificationType | |
from astro import sql as aql | |
from astro.sql.table import Table | |
from airflow.models import DAG | |
from git import Repo | |
from os.path import exists | |
from airflow.operators.python import get_current_context | |
from airflow.exceptions import AirflowSkipException | |
import pandas as pd | |
CONN_ID = "dwh" | |
CLONE_DIR = "/tmp/airflow-repo" | |
@aql.dataframe | |
def get_commits_and_files(): | |
""" | |
#### Get Commit List | |
This uses the pydriller package to grab the commits and files changed | |
for a given day. | |
""" | |
if exists(CLONE_DIR): | |
print("Found an existing local clone, pulling from origin.") | |
repo = Repo(CLONE_DIR) | |
repo.remotes.origin.pull() | |
else: | |
print("There is no local clone of the repo, creating one.") | |
repo = Repo.clone_from("https://github.com/apache/airflow", CLONE_DIR) | |
context = get_current_context() | |
data_interval_start = context["data_interval_start"] | |
data_interval_end = context["data_interval_end"] | |
print( | |
"getting commits from {} to {}".format(data_interval_start, data_interval_end) | |
) | |
commits = Repository( | |
CLONE_DIR, | |
since=data_interval_start, | |
to=data_interval_end, | |
only_in_branch="main", | |
).traverse_commits() | |
changed_files = [] | |
num_files = 0 | |
for commit in commits: | |
for file in commit.modified_files: | |
if file.change_type == ModificationType.DELETE: | |
path = file.old_path | |
else: | |
path = file.new_path | |
changed_files.append( | |
{ | |
"hash": commit.hash, | |
"commit_date": commit.committer_date.strftime("%Y-%m-%d"), | |
"path": path, | |
"added_lines": file.added_lines, | |
"deleted_lines": file.deleted_lines, | |
"nloc": file.nloc, | |
"complexity": file.complexity, | |
} | |
) | |
num_files += 1 | |
if num_files == 0: | |
raise AirflowSkipException | |
return pd.DataFrame( | |
changed_files, | |
columns=[ | |
"hash", | |
"commit_date", | |
"path", | |
"added_lines", | |
"deleted_lines", | |
"nloc", | |
"complexity", | |
], | |
) | |
with DAG( | |
"git-commits-by-file", | |
schedule_interval="@daily", | |
start_date=datetime(2014, 10, 6), | |
catchup=True, | |
default_args={ | |
"retries": 2, | |
"depends_on_past": True, | |
}, | |
tags=["git", "airflow"], | |
) as dag: | |
""" | |
### Airflow Commits by File | |
This is a simple pipeline that analyzes commits in the Airflow repo | |
""" | |
daily_commits = get_commits_and_files( | |
output_table=Table( | |
conn_id=CONN_ID, | |
) | |
) | |
reporting_table = aql.append( | |
target_table=Table( | |
name="COMMITS_AND_FILES", | |
conn_id=CONN_ID, | |
), | |
source_table=daily_commits, | |
) | |
aql.cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment