Skip to content

Instantly share code, notes, and snippets.

@shin-nien
Created September 23, 2016 16:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save shin-nien/2d011972d8631c92675d1de58b000168 to your computer and use it in GitHub Desktop.
Save shin-nien/2d011972d8631c92675d1de58b000168 to your computer and use it in GitHub Desktop.
airflow mongodb hook attempt
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
import psycopg2
import psycopg2.extensions
from airflow.hooks.dbapi_hook import DbApiHook
# Will show up under airflow.hooks.PluginHook
class MongoDbHook(BaseHook):
'''
Interact with MongoDb.
You can specify ssl parameters in the extra field of your connection
as ``{"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}``.
'''
conn_name_attr = 'mongodb_conn_id'
default_conn_name = 'mongodb_default'
supports_autocommit = False
def get_conn(self):
conn = self.get_connection(self.mongodb_conn_id)
conn_args = dict(
host=conn.host,
user=conn.login,
password=conn.password,
dbname=conn.schema,
port=conn.port)
# check for ssl parameters in conn.extra
for arg_name, arg_val in conn.extra_dejson.items():
if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl']:
conn_args[arg_name] = arg_val
psycopg2_conn = psycopg2.connect(**conn_args)
if psycopg2_conn.server_version < 70400:
self.supports_autocommit = True
return psycopg2_conn
# Will show up under airflow.operators.PluginOperator
class PluginOperator(BaseOperator):
pass
# Will show up under airflow.executors.PluginExecutor
class PluginExecutor(BaseExecutor):
pass
# Creating a flask admin BaseView
class TestView(BaseView):
@expose('/')
def test(self):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")
# Creating a flask blueprint to intergrate the templates and static folder
bp = Blueprint(
"test_plugin", __name__,
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
static_folder='static',
static_url_path='/static/test_plugin')
ml = MenuLink(
category='Test Plugin',
name='Test Menu Link',
url='http://pythonhosted.org/airflow/')
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [PluginOperator]
flask_blueprints = [bp]
hooks = [MongoDbHook]
executors = [PluginExecutor]
admin_views = [v]
menu_links = [ml]
@openp2pdesign
Copy link

There's an open issue for this in the Airflow development, I linked this gist there. There's also another related gist here. I look forward to see the hook with MongoDB working soon!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment