Skip to content

Instantly share code, notes, and snippets.

@rossturk
Last active September 3, 2022 05:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossturk/1e593a3482de1a48b2ab2a70ea2cbea0 to your computer and use it in GitHub Desktop.
Save rossturk/1e593a3482de1a48b2ab2a70ea2cbea0 to your computer and use it in GitHub Desktop.
from datetime import datetime
from os.path import exists
from pydriller import Repository, ModificationType
from git import Repo
from airflow.models import DAG
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
from astro import sql as aql
from astro.sql.table import Table
import pandas as pd
# The name of the Airflow connection for our target DB
CONN_ID = "dwh"
# A spot to check our repo out - persist this, perhaps?
CLONE_DIR = "/tmp/airflow-repo"
@aql.dataframe
def get_commits_and_files():
"""
#### Get Commit List
This uses the pydriller package to grab the commits and files changed
for a given day.
"""
# First thing, make sure we have a checked out copy of the repo we are studying
if exists(CLONE_DIR):
print("Found an existing local clone, pulling from origin.")
repo = Repo(CLONE_DIR)
repo.remotes.origin.pull()
else:
print("There is no local clone of the repo, creating one.")
repo = Repo.clone_from("https://github.com/apache/airflow", CLONE_DIR)
# Airflow tells us the time interval to fetch data for
context = get_current_context()
data_interval_start = context["data_interval_start"]
data_interval_end = context["data_interval_end"]
print(
"Getting commits from {} to {}".format(data_interval_start, data_interval_end)
)
commits = Repository(
CLONE_DIR,
since=data_interval_start,
to=data_interval_end,
only_in_branch="main",
).traverse_commits()
changed_files = []
num_files = 0
for commit in commits:
for file in commit.modified_files:
if file.change_type == ModificationType.DELETE:
path = file.old_path
else:
path = file.new_path
changed_files.append(
{
"hash": commit.hash,
"commit_date": commit.committer_date.strftime("%Y-%m-%d"),
"path": path,
"added_lines": file.added_lines,
"deleted_lines": file.deleted_lines,
"nloc": file.nloc,
"complexity": file.complexity,
}
)
num_files += 1
# There were no changed files during this interval, so skip the downstream tasks
if num_files == 0:
raise AirflowSkipException
return pd.DataFrame(changed_files,
columns=[
"hash",
"commit_date",
"path",
"added_lines",
"deleted_lines",
"nloc",
"complexity",
]
)
with DAG(
"git-commits-by-file",
schedule_interval="@daily",
start_date=datetime(2014, 10, 6),
catchup=True,
default_args={
"retries": 2,
"depends_on_past": True,
},
tags=["git", "airflow"],
) as dag:
"""
### Airflow Commits by File
This is a simple pipeline that analyzes commits in the Airflow repo
"""
# Get all the commits for our time interval and load them into a temporary table
daily_commits = get_commits_and_files(
output_table=Table(
conn_id=CONN_ID,
)
)
# Append the temporary table to our COMMITS_AND_FILES table
reporting_table = aql.append(
target_table=Table(
name="COMMITS_AND_FILES",
conn_id=CONN_ID,
),
source_table=daily_commits,
)
aql.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment