Skip to content

Instantly share code, notes, and snippets.

View MatrixManAtYrService's full-sized avatar

Matt Rixman MatrixManAtYrService

View GitHub Profile
@MatrixManAtYrService
MatrixManAtYrService / Dockerfile
Last active April 29, 2023 00:27
tini vs dumb init
FROM ubuntu
RUN apt update
RUN apt install -y tini dumb-init python3-pip
RUN pip install apache-airflow
# place the entrypoint script
COPY entrypoint.sh /entrypoint
RUN chmod +x /entrypoint
# a script that calls airflow (and does test-relevant things too)
from datetime import datetime
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from airflow.configuration import conf
namespace = conf.get("kubernetes", "NAMESPACE")
version: 2.1
jobs:
my-job:
machine:
image: ubuntu-2004:202107-02
steps:
- checkout
- run:
name: snap install
command: sudo snap install microk8s --classic
@MatrixManAtYrService
MatrixManAtYrService / steady_task_stream.py
Created September 20, 2021 02:41
a dag with tasks that complete one after another
from airflow.decorators import dag, task
from airflow.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.utils.dates import days_ago
from time import sleep
def tensec_taskfactory(i):
@task(task_id=f"task_{i}")
def wait():
@MatrixManAtYrService
MatrixManAtYrService / Dockerfile
Last active September 12, 2021 03:16
Running airflow with a faked clock
FROM quay.io/astronomer/ap-airflow-dev:2.2.0-buster-43536
USER root
# deps for faking time and talking to postgres databases
RUN apt-get update && apt-get install -y faketime libpq-dev build-essential
# place shim where airflow was, `airflow` is now `actual_airflow`
RUN sh -c 'set -x ; mv $(which airflow) $(dirname $(which airflow))/actual_airflow'
COPY ./airflow_wrapper.sh /home/root/
asdfasdfasdfa
@MatrixManAtYrService
MatrixManAtYrService / airflow dags test wait_1_async $(date -u +'%Y-%m-%dT%H:%M:%SZ')
Created July 31, 2021 21:32
An attempt to use the Debug Executor with a deferrable task
$ airflow dags test wait_1_async $(date -u +'%Y-%m-%dT%H:%M:%SZ')
INFO - Filling up the DagBag from /Users/matt/src/qa-scenario-dags/utility_dags
INFO - Adding to queue: ['<TaskInstance: wait_1_async.start 2021-07-31 21:26:55+00:00 [queued]>']
INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=wait_1_async
AIRFLOW_CTX_TASK_ID=start
AIRFLOW_CTX_EXECUTION_DATE=2021-07-31T21:26:55+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-07-31T21:26:55+00:00
from airflow.operators.python import task
from airflow.utils.dates import days_ago
from airflow.decorators import dag
from airflow.operators.subdag import SubDagOperator
# uninteresting args
default_args = {
"schedule_interval": None,
"start_date": days_ago(1),
"default_args": {"owner": "airflow"},
@task
def print_one(it):
print(it)
@task
def print_another(it):
print(it)
INFO - Adding to queue: ['<TaskInstance: two_virtualenv.a ']
hi
INFO - Marking task as SUCCESS. dag_id=two_virtualenv, task_id=a
ERROR - Failed to execute task: cannot pickle 'module' object.
Traceback (most recent call last):
File "/Users/matt/src/airflow/airflow/executors/debug_executor.py", line 79, in _run_task
ti._run_raw_task(job_id=ti.job_id, **params) # pylint: disable=protected-access
File "/Users/matt/src/airflow/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/matt/src/airflow/airflow/models/taskinstance.py", line 1201, in _run_raw_task