Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@super-eben
Last active April 5, 2019 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save super-eben/b1d49538b46fa3f55c5bddfe73405b34 to your computer and use it in GitHub Desktop.
Save super-eben/b1d49538b46fa3f55c5bddfe73405b34 to your computer and use it in GitHub Desktop.
SuperQuery operator for Apache Airflow
import uuid
import logging
import pandas as pd
import json
from contextlib import closing
import sys
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook, BigQueryPandasConnector
from airflow.hooks.mysql_hook import MySqlHook
LOG = logging.getLogger(__name__)
class SuperQueryOperator(BaseOperator):
"""
Executes sql code in Google Bigquery via the SuperQuery proxy MySql interface
:param sql: the sql code to be executed. Can receive a str representing a
sql statement, a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'
(templated)
:type sql: str or list[str]
:param mysql_conn_id: reference to a specific mysql database
:type mysql_conn_id: str
:param parameters: (optional) the parameters to render the SQL query with.
:type parameters: mapping or iterable
:param autocommit: if True, each command is automatically committed.
(default value: False)
:type autocommit: bool
:param database: name of database which overwrite defined one in connection
:type database: str
:param explain: if True, the execution plan (and cost) of the sql is returned to the Airflow logs.
(default value: False)
:type explain: bool
"""
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, database=None, explain=False, *args, **kwargs):
super(SuperQueryOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
self.database = database
self.explain = explain
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
if (self.explain):
test_answer = self._get_records(
hook,
["SET super_userAgent = airflow", self.sql, "explain;"],
parameters=self.parameters)
self.log.info(test_answer)
else:
test_answer = self._get_records(
hook,
self.sql,
parameters=self.parameters)
self.log.info(test_answer)
def _get_records(self, hook, sql, parameters=None):
"""
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str)
:type sql: str
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
if sys.version_info[0] < 3:
sql = sql.encode('utf-8')
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cur:
# In this scenario we only execute the SQL
# and don't return the result
if (isinstance(sql, str)):
if parameters is not None:
cur.execute(sql, parameters)
else:
cur.execute(sql)
# In this scenario, we have the SQL and the Explain;
# This means we return the Explain payload
if (isinstance(sql, list)):
if (len(sql) > 2):
raise AirflowException("Can only execute a single SQL statement, not a list of statements.")
for s in sql:
if parameters is not None:
cur.execute(s, parameters)
else:
cur.execute(s)
return cur.fetchall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment