Skip to content

Instantly share code, notes, and snippets.

@rossturk
Created December 16, 2022 22:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/d1d8a8461b0e760caa58661164fd74e5 to your computer and use it in GitHub Desktop.
Save rossturk/d1d8a8461b0e760caa58661164fd74e5 to your computer and use it in GitHub Desktop.
Gather commits by file
from datetime import datetime
from pydriller import Repository, ModificationType
from astro import sql as aql
from astro.sql.table import Table
from airflow.models import DAG
from git import Repo
from os.path import exists
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
import pandas as pd
CONN_ID = "dwh"
CLONE_DIR = "/tmp/airflow-repo"
@aql.dataframe
def get_commits_and_files():
"""
#### Get Commit List
This uses the pydriller package to grab the commits and files changed
for a given day.
"""
if exists(CLONE_DIR):
print("Found an existing local clone, pulling from origin.")
repo = Repo(CLONE_DIR)
repo.remotes.origin.pull()
else:
print("There is no local clone of the repo, creating one.")
repo = Repo.clone_from("https://github.com/apache/airflow", CLONE_DIR)
context = get_current_context()
data_interval_start = context["data_interval_start"]
data_interval_end = context["data_interval_end"]
print(
"getting commits from {} to {}".format(data_interval_start, data_interval_end)
)
commits = Repository(
CLONE_DIR,
since=data_interval_start,
to=data_interval_end,
only_in_branch="main",
).traverse_commits()
changed_files = []
num_files = 0
for commit in commits:
for file in commit.modified_files:
if file.change_type == ModificationType.DELETE:
path = file.old_path
else:
path = file.new_path
changed_files.append(
{
"hash": commit.hash,
"commit_date": commit.committer_date.strftime("%Y-%m-%d"),
"path": path,
"added_lines": file.added_lines,
"deleted_lines": file.deleted_lines,
"nloc": file.nloc,
"complexity": file.complexity,
}
)
num_files += 1
if num_files == 0:
raise AirflowSkipException
return pd.DataFrame(
changed_files,
columns=[
"hash",
"commit_date",
"path",
"added_lines",
"deleted_lines",
"nloc",
"complexity",
],
)
with DAG(
"git-commits-by-file",
schedule_interval="@daily",
start_date=datetime(2014, 10, 6),
catchup=True,
default_args={
"retries": 2,
"depends_on_past": True,
},
tags=["git", "airflow"],
) as dag:
"""
### Airflow Commits by File
This is a simple pipeline that analyzes commits in the Airflow repo
"""
daily_commits = get_commits_and_files(
output_table=Table(
conn_id=CONN_ID,
)
)
reporting_table = aql.append(
target_table=Table(
name="COMMITS_AND_FILES",
conn_id=CONN_ID,
),
source_table=daily_commits,
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment