Created
July 20, 2022 02:29
-
-
Save rajrao/e031a2d0c147c9d1627226e251a5a6ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#This code is an example of how to use text based mapper class names to query the airflow db. | |
import inspect | |
import re | |
from airflow.models.base import Base | |
from airflow import DAG, settings | |
from airflow.operators.python import PythonOperator | |
from airflow.utils.dates import days_ago | |
import sqlalchemy.orm.attributes | |
### run this part of the code at beginning, to build the registry | |
model_registry = {} | |
column_registry = {} | |
for model in Base.registry.mappers: | |
name = re.sub("[<>']", "", str(model.class_))[6:] #extract the class name from text that looks like this: <class 'classname'> | |
model_registry[model.class_.__name__] = model.class_ | |
for key,value in inspect.getmembers(model.class_): | |
if type(value) is sqlalchemy.orm.attributes.InstrumentedAttribute: | |
column_registry[f"{model.class_.__name__}.{key}"] = value | |
#example of how to pull the table and column names | |
table = model_registry["DagRun"] | |
column = column_registry["DagRun.execution_date"] | |
print(table) | |
print(column) | |
session = settings.Session() | |
print("session: ",str(session)) | |
#example of how to use the model_registry | |
query = session.query(table).filter(column >= days_ago(1)) | |
print(query) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment