Last active
December 10, 2018 10:17
-
-
Save xnuinside/e680e33cebd31fefee95e7f92fc19648 to your computer and use it in GitHub Desktop.
PostgreSQLCheckTable and PostgreSQLCountRows Operators Airflow Plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" import operators with | |
from airflow.operators.postgres_custom import PostgreSQLCheckTable, PostgreSQLCountRows """ | |
import logging | |
from airflow.models import BaseOperator | |
from airflow.plugins_manager import AirflowPlugin | |
from airflow.utils.decorators import apply_defaults | |
from airflow.hooks.postgres_hook import PostgresHook | |
log = logging.getLogger(__name__) | |
class PostgreSQLCheckTable(BaseOperator): | |
""" operator to check table exist""" | |
@apply_defaults | |
def __init__(self, table_name, schema, | |
*args, **kwargs): | |
""" | |
:param table_name: table name | |
:param schema: database schemaname, example, how to get schema name: | |
SELECT * FROM pg_tables; | |
""" | |
self.table_name = table_name | |
self.hook = PostgresHook() | |
self.schema = schema | |
super(PostgreSQLCheckTable, self).__init__(*args, **kwargs) | |
def execute(self, context): | |
result = self.hook.get_first( | |
sql="SELECT * FROM information_schema.tables WHERE table_schema = '{}' " | |
"AND table_name = '{}'; ".format(self.schema, self.table_name)) | |
if result: | |
return True | |
else: | |
raise ValueError("table {} does not exist".format(self.table_name)) | |
class PostgreSQLCountRows(BaseOperator): | |
""" operator to check table exist""" | |
@apply_defaults | |
def __init__(self, table_name, | |
*args, **kwargs): | |
""" | |
:param table_name: table name | |
""" | |
self.table_name = table_name | |
self.hook = PostgresHook() | |
super(PostgreSQLCountRows, self).__init__(*args, **kwargs) | |
def execute(self, context): | |
result = self.hook.get_first( | |
sql="SELECT COUNT(*) FROM {};".format(self.table_name)) | |
log.info("Result: {}".format(result)) | |
return result | |
class PostgreSQLCustomOperatorsPlugin(AirflowPlugin): | |
name = "postgres_custom" | |
operators = [PostgreSQLCheckTable, PostgreSQLCountRows] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment