Skip to content

Instantly share code, notes, and snippets.

View judoole's full-sized avatar

Ole Christian Langfjæran judoole

View GitHub Profile
@judoole
judoole / turbine_bdd.py
Last active April 19, 2024 10:33
A python test helper file for writing BDD like steps for testing Airflow using pytest
from ast import Dict
from dataclasses import dataclass, field
from typing import Any, List
from airflow import DAG
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
from airflow.models.taskinstance import TaskInstance
import pendulum
from turbineflow_pytest_airflow.turbine_bdd import bdd, TurbineBDD
from hamcrest import *
def test_task_delete_ephemeral_signals_is_rendered_correctly(bdd: TurbineBDD):
"""Ensure rendered fields of delete_ephemeral_signals are correctly rendered"""
bdd.given_dag("daily_signals")
bdd.given_task("load_files.delete_ephemeral_signals")
bdd.given_execution_date("1976-08-13")
bdd.when_I_render_the_task_template_fields()
bdd.then_it_should_match(has_property("deletion_dataset_table", equal_to("daily-signals-project.07_day_retention.signals_in_19760813")))
@judoole
judoole / impersonation_chain.feature
Created April 10, 2024 08:13
Example of a behave feature for Airflow
@ci
Feature: Always use impersonation chain where possible
As a developer
I want to make sure we use impersonation_chain
So that I can develop locally and not be afraid that I overwrite production tables
And that clients don't use other client's GCP projects
Scenario: All tasks should use impersonation_chain when applicable
Given airflow is running
@judoole
judoole / readme.md
Created November 11, 2020 08:30
Merging with another repo

As I from time to time merge a repository into an existing one, I keep this todo list on how-to

  1. Create an orphan branch git checkout --orphan my-merging-repo
  2. Reset all git reset --hard && git clean -fd
  3. Add remote git remote add my-merging-repo git@my-merging-repo
  4. Pull master
@judoole
judoole / opsgenie_jinja.py
Last active November 9, 2020 11:42
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
# Slack friendly message
_DEFAULT_DESCRIPTION = """
*DAG:*
<{{dag_url}}|{{ti.dag_id}}>
*Task:*
{{ti.task_id}}
*Execution date:*
{{ds}}
*Log:*
{{ti.log_url}}
@judoole
judoole / opsgenie_closer.py
Last active November 9, 2020 10:48
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
class OpsGenieCloseHandler():
def __init__(self, connection_id: str = "opsgenie_default",) -> None:
self.connection_id = connection_id
def __call__(self, context) -> requests.Response:
task_instance: TaskInstance = context['task_instance']
if task_instance.try_number > 1:
# create the alias
alias = f"{ti.dag_id}.{ti.task_id}-{context['ds']}"
@judoole
judoole / opsgenie_reporter_simple.py
Last active November 9, 2020 10:45
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
class OpsGenieExceptionReporter():
def __init__(self, connection_id: str = opsgenie_default,) -> None:
self.connection_id = connection_id
def __call__(self, context) -> requests.Response:
ti: TaskInstance = context["ti"]
json = {
"message": f"Something went terribly wrong with {ti.task_id}",
"description": f"See more at {ti.log_url}",
"responders": [{"name": "my-fancy-team", "type": "team"}],
@judoole
judoole / failing_operator.py
Last active November 9, 2020 08:34
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
task_that_failes = BashOperator(
task_id="task_that_failes",
bash_command="this-will-fail",
on_failure_callback=OpsGenieExceptionReporter(),
)
@judoole
judoole / .gitignore
Created April 14, 2016 07:23
gitignore IntelliJ share runConfigurations
.idea/*
!.idea/runConfigurations/
@judoole
judoole / fabfile.py
Last active November 25, 2015 07:06
Fabric whoami
from fabric.api import run
def whoami():
run('whoami')