Skip to content

Instantly share code, notes, and snippets.

@kervel
Last active March 1, 2021 11:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kervel/c557f888ffe202f738753c2218766999 to your computer and use it in GitHub Desktop.
Save kervel/c557f888ffe202f738753c2218766999 to your computer and use it in GitHub Desktop.
airflow DAG operator
prototype airflow DAG operator
#!/usr/bin/python3
import getopt, sys, json, yaml, os, shutil
from typing import List, Dict, Union
from dataclasses import dataclass
@dataclass
class OpEvent:
"""
a string from the name or group parameters. If these parameters has not been set in the binding configuration,
then strings "schedule" or "kubernetes" are used. For a hook executed at startup, this value is always "onStartup".
"""
binding: str = ""
"""
"Schedule" for schedule bindings. "Synchronization" or "Event" for kubernetes bindings. "Synchronization" or "Group" if group is defined.
"""
fType: str = ""
"""
the possible value is one of the values you can use with executeHookOnEvent parameter: "Added", "Modified" or "Deleted".
"""
watchEvent: str = ""
"""
a JSON dump of the full object related to the event. It contains an exact copy of the corresponding field in WatchEvent response,
so it's the object state at the moment of the event (not at the moment of the hook execution).
"""
fObject: Dict = None
"""
the result of jq execution with specified jqFilter on the above mentioned object. If jqFilter is not specified, then filterResult is omitted.
"""
filterResult: Dict = None
"""
The hook receives existed objects on startup for each binding with "Synchronization"-type binding context:
a list of existing objects that match selectors in binding configuration. Each item of this list contains object and filterResult
fields. If the list is empty, the value of objects is an empty array.
"""
objects: List[Dict] = None
"""
a map that contains a list of objects for each binding name from includeSnapshotsFrom or for each kubernetes binding in a group.
If includeSnapshotsFrom list is empty, the field is omitted.
"""
snapshots: Dict[str, List[Dict]] = None
def get_current_namespace():
if os.path.exists("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
else:
current_namespace = "default"
return current_namespace
def get_config() -> Dict:
return {
"configVersion":"v1",
"kubernetes": [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"executeHookOnEvent": [ "Added", "Modified","Deleted" ],
"labelSelector": { "matchLabels": { "airflow-dag": "yes" } },
"namespace": { "nameSelector": { "matchNames": [ get_current_namespace() ] } }
}
]
}
DAG_FOLDER="/var/dags"
def clear_dags():
print(">> CLEARING DAGS")
for d in os.listdir(DAG_FOLDER):
shutil.rmtree(os.path.join(DAG_FOLDER, d), ignore_errors=True)
def add_dag(folder: str, fname: str, contents: str):
print(f">> SAVING DAG {fname} in folder {folder}")
dg = os.path.join(DAG_FOLDER, folder, fname)
fldr = os.path.join(DAG_FOLDER, folder)
if not os.path.exists(fldr):
os.makedirs(fldr)
with open(dg, "w") as f:
f.write(contents)
def delete_dag(folder: str, fname: str):
print(f">> REMOVING DAG {fname}")
dg = os.path.join(DAG_FOLDER, folder, fname)
if os.path.exists(dg):
os.unlink(dg)
def do_trigger(ctx: List[OpEvent]):
if len([x for x in ctx if x.fType == "Synchronization"]) > 0:
clear_dags()
for evt in ctx:
if evt.fType in ["Synchronization"]:
for o in evt.objects:
for k,v in o["object"]["data"].items():
add_dag(o["object"]["metadata"]["name"], k, v)
if evt.watchEvent in ["Added","Modified"]:
for k,v in evt.fObject["data"].items():
add_dag(evt.fObject["metadata"]["name"], k, v)
if evt.watchEvent in ["Deleted"]:
for k,v in evt.fObject["data"].items():
delete_dag(evt.fObject["metadata"]["name"], k)
def usage():
print("use --config or nothing")
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "h:v", ["config"])
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit(0)
if o in ("-c", "--config"):
r = get_config()
print(yaml.dump(r))
sys.exit(0)
ctxp = os.getenv("BINDING_CONTEXT_PATH")
if ctxp is None or not os.path.exists(ctxp):
print("no options given and no context found")
sys.exit(1)
ctx = json.load(open(ctxp,"r"))
pctx = []
for x in ctx:
if "type" in x:
x["fType"] = x.pop("type")
if "object" in x:
x["fObject"] = x.pop("object")
pctx.append(OpEvent(**x))
do_trigger(pctx)
except getopt.GetoptError as err:
# print help information and exit:
print(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
if __name__ == "__main__":
main()
FROM flant/shell-operator:latest
RUN apk --no-cache add python3 py3-yaml && mkdir /var/dags
ADD hooks /hooks
RUN (cd /hooks; ls *.py | while read i; do chmod +x "$i"; done)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment