Skip to content

Instantly share code, notes, and snippets.

@mik-laj
Created May 16, 2020 11:35
Show Gist options
  • Save mik-laj/ff50e4991d4735c6caa8b69060cd89ea to your computer and use it in GitHub Desktop.
Save mik-laj/ff50e4991d4735c6caa8b69060cd89ea to your computer and use it in GitHub Desktop.
Użycie: python pickle-test.py | bash -
import base64
import inspect
import pickle
import shlex
import textwrap
def get_dag_runs(dag_id_prefix):
print("dag_id_prefix=", dag_id_prefix)
import logging
logging.getLogger("airflow").setLevel(logging.CRITICAL)
from airflow.settings import Session
from airflow.models import DagRun
print(Session.query(DagRun).filter(DagRun.dag_id.startswith(dag_id_prefix)).all())
TRAMPOLINE_TMP = textwrap.dedent("""\
import base64
import pickle
{func_code}
args = pickle.loads(base64.b64decode({args}))
kwargs = pickle.loads(base64.b64decode({kwargs}))
{func_name}(*args, **kwargs)
""")
def prepare_trampoline(func, *args, **kwargs):
func_code = inspect.getsource(get_dag_runs)
trampoline = TRAMPOLINE_TMP.format(
func_code=func_code,
func_name=func.__name__,
args=base64.b64encode(pickle.dumps(args)),
kwargs=base64.b64encode(pickle.dumps(kwargs)),
)
return trampoline
def invoke_remote(func, *args, **kwargs):
trampoline = prepare_trampoline(func, *args, **kwargs)
cmd = ["python", "-c", trampoline]
return " ".join([shlex.quote(c) for c in cmd])
print(invoke_remote(get_dag_runs, 'dag_id'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment