Last active
April 5, 2019 15:24
-
-
Save super-eben/b1d49538b46fa3f55c5bddfe73405b34 to your computer and use it in GitHub Desktop.
SuperQuery operator for Apache Airflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import logging | |
import pandas as pd | |
import json | |
from contextlib import closing | |
import sys | |
from airflow.exceptions import AirflowException | |
from airflow.models import BaseOperator | |
from airflow.plugins_manager import AirflowPlugin | |
from airflow.utils.decorators import apply_defaults | |
from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook | |
from airflow.contrib.hooks.bigquery_hook import BigQueryHook, BigQueryPandasConnector | |
from airflow.hooks.mysql_hook import MySqlHook | |
LOG = logging.getLogger(__name__) | |
class SuperQueryOperator(BaseOperator): | |
""" | |
Executes sql code in Google Bigquery via the SuperQuery proxy MySql interface | |
:param sql: the sql code to be executed. Can receive a str representing a | |
sql statement, a list of str (sql statements), or reference to a template file. | |
Template reference are recognized by str ending in '.sql' | |
(templated) | |
:type sql: str or list[str] | |
:param mysql_conn_id: reference to a specific mysql database | |
:type mysql_conn_id: str | |
:param parameters: (optional) the parameters to render the SQL query with. | |
:type parameters: mapping or iterable | |
:param autocommit: if True, each command is automatically committed. | |
(default value: False) | |
:type autocommit: bool | |
:param database: name of database which overwrite defined one in connection | |
:type database: str | |
:param explain: if True, the execution plan (and cost) of the sql is returned to the Airflow logs. | |
(default value: False) | |
:type explain: bool | |
""" | |
template_fields = ('sql',) | |
template_ext = ('.sql',) | |
ui_color = '#ededed' | |
@apply_defaults | |
def __init__( | |
self, sql, mysql_conn_id='mysql_default', parameters=None, | |
autocommit=False, database=None, explain=False, *args, **kwargs): | |
super(SuperQueryOperator, self).__init__(*args, **kwargs) | |
self.mysql_conn_id = mysql_conn_id | |
self.sql = sql | |
self.autocommit = autocommit | |
self.parameters = parameters | |
self.database = database | |
self.explain = explain | |
def execute(self, context): | |
self.log.info('Executing: %s', self.sql) | |
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, | |
schema=self.database) | |
if (self.explain): | |
test_answer = self._get_records( | |
hook, | |
["SET super_userAgent = airflow", self.sql, "explain;"], | |
parameters=self.parameters) | |
self.log.info(test_answer) | |
else: | |
test_answer = self._get_records( | |
hook, | |
self.sql, | |
parameters=self.parameters) | |
self.log.info(test_answer) | |
def _get_records(self, hook, sql, parameters=None): | |
""" | |
Executes the sql and returns a set of records. | |
:param sql: the sql statement to be executed (str) | |
:type sql: str | |
:param parameters: The parameters to render the SQL query with. | |
:type parameters: mapping or iterable | |
""" | |
if sys.version_info[0] < 3: | |
sql = sql.encode('utf-8') | |
with closing(hook.get_conn()) as conn: | |
with closing(conn.cursor()) as cur: | |
# In this scenario we only execute the SQL | |
# and don't return the result | |
if (isinstance(sql, str)): | |
if parameters is not None: | |
cur.execute(sql, parameters) | |
else: | |
cur.execute(sql) | |
# In this scenario, we have the SQL and the Explain; | |
# This means we return the Explain payload | |
if (isinstance(sql, list)): | |
if (len(sql) > 2): | |
raise AirflowException("Can only execute a single SQL statement, not a list of statements.") | |
for s in sql: | |
if parameters is not None: | |
cur.execute(s, parameters) | |
else: | |
cur.execute(s) | |
return cur.fetchall() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment