Created
September 23, 2016 16:12
-
-
Save shin-nien/2d011972d8631c92675d1de58b000168 to your computer and use it in GitHub Desktop.
airflow mongodb hook attempt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the class you derive to create a plugin | |
from airflow.plugins_manager import AirflowPlugin | |
from flask import Blueprint | |
from flask_admin import BaseView, expose | |
from flask_admin.base import MenuLink | |
# Importing base classes that we need to derive | |
from airflow.hooks.base_hook import BaseHook | |
from airflow.models import BaseOperator | |
from airflow.executors.base_executor import BaseExecutor | |
import psycopg2 | |
import psycopg2.extensions | |
from airflow.hooks.dbapi_hook import DbApiHook | |
# Will show up under airflow.hooks.PluginHook | |
class MongoDbHook(BaseHook): | |
''' | |
Interact with MongoDb. | |
You can specify ssl parameters in the extra field of your connection | |
as ``{"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}``. | |
''' | |
conn_name_attr = 'mongodb_conn_id' | |
default_conn_name = 'mongodb_default' | |
supports_autocommit = False | |
def get_conn(self): | |
conn = self.get_connection(self.mongodb_conn_id) | |
conn_args = dict( | |
host=conn.host, | |
user=conn.login, | |
password=conn.password, | |
dbname=conn.schema, | |
port=conn.port) | |
# check for ssl parameters in conn.extra | |
for arg_name, arg_val in conn.extra_dejson.items(): | |
if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl']: | |
conn_args[arg_name] = arg_val | |
psycopg2_conn = psycopg2.connect(**conn_args) | |
if psycopg2_conn.server_version < 70400: | |
self.supports_autocommit = True | |
return psycopg2_conn | |
# Will show up under airflow.operators.PluginOperator | |
class PluginOperator(BaseOperator): | |
pass | |
# Will show up under airflow.executors.PluginExecutor | |
class PluginExecutor(BaseExecutor): | |
pass | |
# Creating a flask admin BaseView | |
class TestView(BaseView): | |
@expose('/') | |
def test(self): | |
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html | |
return self.render("test_plugin/test.html", content="Hello galaxy!") | |
v = TestView(category="Test Plugin", name="Test View") | |
# Creating a flask blueprint to intergrate the templates and static folder | |
bp = Blueprint( | |
"test_plugin", __name__, | |
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder | |
static_folder='static', | |
static_url_path='/static/test_plugin') | |
ml = MenuLink( | |
category='Test Plugin', | |
name='Test Menu Link', | |
url='http://pythonhosted.org/airflow/') | |
# Defining the plugin class | |
class AirflowTestPlugin(AirflowPlugin): | |
name = "test_plugin" | |
operators = [PluginOperator] | |
flask_blueprints = [bp] | |
hooks = [MongoDbHook] | |
executors = [PluginExecutor] | |
admin_views = [v] | |
menu_links = [ml] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There's an open issue for this in the Airflow development, I linked this gist there. There's also another related gist here. I look forward to see the hook with MongoDB working soon!