Skip to content

Instantly share code, notes, and snippets.

@peacing
Last active May 25, 2021 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peacing/88f4670896fa2cde323b7811657b0986 to your computer and use it in GitHub Desktop.
Save peacing/88f4670896fa2cde323b7811657b0986 to your computer and use it in GitHub Desktop.
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from lakefs_provider.operators.create_branch_operator import CreateBranchOperator
from lakefs_provider.operators.merge_operator import MergeOperator
from lakefs_provider.operators.commit_operator import CommitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': 'lakeFS', 'branch': 'example-branch', 'repo': 'example-repo',
'default-branch': 'main', 'lakefs_conn_id': 'conn_lakefs'
}
@dag(default_args=default_args, start_date=days_ago(2), schedule_interval=None, tags=['example'])
def lakeFS_workflow():
# Create the branch to run on
task_create_branch = CreateBranchOperator(
task_id='create_branch',
source_branch=default_args.get('default-branch')
)
# submit the actual job
spark_submit = SparkSubmitOperator(
task_id='spark_submit',
application='path/to/job/script.py',
application_args=['s3://example-repo/example-branch/path/to/input',
's3://example-repo/example-branch/path/to/output' ]
)
# Commit the changes to the branch.
# (Also a good place to validate the new changes before committing them)
task_commit = CommitOperator(
task_id='commit',
msg='committing to lakeFS using airflow!',
metadata={'committed_from': 'airflow-operator'}
)
# Merge the changes back to the main branch.
task_merge = MergeOperator(
task_id='merge_branches',
source_ref=default_args.get('branch'),
destination_branch=default_args.get('default-branch'),
msg='merging job outputs',
metadata={'committer': 'airflow-operator'}
)
task_create_branch >> spark_submit >> task_commit >> task_merge
sample_workflow_dag = lakeFS_workflow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment