Created
November 19, 2019 12:18
-
-
Save nehiljain/e7e39751bf9dcecbc83940a6fcab080b to your computer and use it in GitHub Desktop.
Snowpipe Alembic Utils
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''This is the extension on the alembic API to facilitate the custom autogenerate script''' | |
import json | |
import logging | |
import os | |
import boto3 | |
from alembic.autogenerate import comparators, renderers | |
from alembic.operations import MigrateOperation, Operations | |
from alembic.operations.ops import CreateTableOp, ModifyTableOps | |
from update_s3_schemas import get_schemas | |
from utils.file_compare_utils import traverse | |
@Operations.register_operation("build_snowpipe") | |
class CreateSnowpipeOp(MigrateOperation): | |
"""The setup required to build a snowpipe""" | |
def __init__(self, table_name, metadata_cols=None, json_cols=None): | |
self.table_name = table_name | |
self.metadata_cols = metadata_cols | |
self.json_cols = json_cols | |
@classmethod | |
def build_snowpipe(cls, operations, table_name, metadata_cols, json_cols): | |
op = CreateSnowpipeOp(table_name, metadata_cols, json_cols) | |
return operations.invoke(op) | |
def reverse(self): | |
# only needed to support autogenerate | |
return DropSnowpipeOp(self.table_name) | |
@Operations.register_operation("drop_snowpipe") | |
class DropSnowpipeOp(MigrateOperation): | |
"""Drops the snowpipe and stage""" | |
def __init__(self, table_name): | |
self.table_name = table_name | |
@classmethod | |
def drop_snowpipe(cls, operations, table_name): | |
op = DropSnowpipeOp(table_name) | |
return operations.invoke(op) | |
def reverse(self): | |
# only needed to support autogenerate | |
return CreateSnowpipeOp(self.table_name) | |
@Operations.register_operation("modify_snowpipe") | |
class ModifySnowpipeOp(MigrateOperation): | |
"""Changes the snowpipe""" | |
def __init__(self, table_name, metadata_cols, json_cols, copy_command): | |
self.table_name = table_name | |
self.metadata_cols = metadata_cols | |
self.json_cols = json_cols | |
self.copy_command = copy_command | |
@classmethod | |
def modify_snowpipe(cls, operations, table_name, metadata_cols, json_cols, copy_command): | |
op = ModifySnowpipeOp(table_name, metadata_cols, json_cols, copy_command) | |
return operations.invoke(op) | |
def reverse(self): | |
return RevertSnowpipeOp(self.table_name, self.metadata_cols, self.json_cols, self.copy_command) | |
@Operations.register_operation("revert_snowpipe") | |
class RevertSnowpipeOp(MigrateOperation): | |
"""Changes the snowpipe""" | |
def __init__(self, table_name, metadata_cols, json_cols, copy_command): | |
self.table_name = table_name | |
self.metadata_cols = metadata_cols | |
self.json_cols = json_cols | |
self.copy_command = copy_command | |
@classmethod | |
def revert_snowpipe(cls, operations, table_name, metadata_cols, json_cols, copy_command): | |
op = RevertSnowpipeOp(table_name, metadata_cols, json_cols, copy_command) | |
return operations.invoke(op) | |
def reverse(self): | |
return ModifySnowpipeOp(self.table_name, self.metadata_cols, self.json_cols, self.copy_command) | |
@Operations.implementation_for(CreateSnowpipeOp) | |
def build_snowpipe(operations, operation): | |
'''Generates the operation to build the snowpipe''' | |
build_stage_op = operations.execute(""" | |
CREATE OR REPLACE STAGE SCHEMA.{upper_tablename}_STAGE URL = 's3://{bucket}/{env}/schema_{tablename}/' | |
CREDENTIALS = (AWS_KEY_ID = '{key_id}' AWS_SECRET_KEY='{secret_key}') | |
FILE_FORMAT=SCHEMA.GZIP_JSON_UEL_FORMAT""".format(bucket=os.getenv('S3_BUCKET'), | |
env=os.getenv('ENV').lower(), | |
upper_tablename=operation.table_name.upper(), | |
tablename=operation.table_name, | |
key_id=os.getenv('AWS_ACCESS_KEY_ID'), | |
secret_key=os.getenv('AWS_SECRET_ACCESS_KEY'))) | |
build_snowpipe_op = operations.execute(""" | |
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE | |
AUTO_INGEST=TRUE AS COPY INTO {schema_name}.{tablename} | |
({metadata_cols}) FROM | |
(SELECT {json_cols} FROM @SCHEMA.{tablename}_STAGE) | |
FILE_FORMAT=SCHEMA_UNIFIED_EVENTS.GZIP_JSON_UEL_FORMAT""".format( | |
**{ | |
'tablename': operation.table_name.upper(), | |
'schema_name': os.getenv('SNOWFLAKE_SCHEMA').upper(), | |
'metadata_cols': ','.join(operation.metadata_cols), | |
'json_cols': ','.join(['$1:\"' + col.replace(":", "\":\"") + '\"' for col in operation.json_cols]) | |
} | |
)) | |
return [build_stage_op, build_snowpipe_op] | |
@Operations.implementation_for(DropSnowpipeOp) | |
def drop_snowpipe(operations, operation): | |
'''Generates the operation to drop the snowpipe''' | |
drop_stage = operations.execute( | |
"""DROP STAGE IF EXISTS SCHEMA.{table_name}_STAGE""".format(table_name=operation.table_name.upper())) | |
drop_pipe = operations.execute( | |
"""DROP PIPE IF EXISTS SCHEMA.{table_name}_PIPE""".format(table_name=operation.table_name.upper())) | |
return [drop_stage, drop_pipe] | |
@Operations.implementation_for(ModifySnowpipeOp) | |
def modify_snowpipe(operations, operation): | |
'''Generates the operation to recreate a snowpipe''' | |
modify_pipe = operations.execute(""" | |
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE | |
AUTO_INGEST=TRUE AS COPY INTO {schema_name}.{tablename} | |
({metadata_cols}) FROM | |
(SELECT {json_cols} FROM @SCHEMA.{tablename}_STAGE) | |
FILE_FORMAT=SCHEMA.GZIP_JSON_UEL_FORMAT""".format( | |
**{ | |
'tablename': operation.table_name.upper(), | |
'schema_name': os.getenv('SNOWFLAKE_SCHEMA').upper(), | |
'metadata_cols': ','.join(operation.metadata_cols), | |
'json_cols': ','.join(['$1:\"' + col.replace(":", "\":\"") + '\"' for col in operation.json_cols]) | |
} | |
)) | |
return [modify_pipe] | |
@Operations.implementation_for(RevertSnowpipeOp) | |
def revert_snowpipe(operations, operation): | |
'''Generates the operation to revert the snowpipe''' | |
revert_pipe = operations.execute(""" | |
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE | |
AUTO_INGEST=TRUE AS {copy_command}""".format( | |
**{ | |
'tablename': operation.table_name.upper(), | |
'copy_command': operation.copy_command | |
} | |
)) | |
return [revert_pipe] | |
@comparators.dispatch_for("schema") | |
def compare_pipes(autogen_context, upgrade_ops, schemas): # pylint: disable=unused-argument | |
'''Compares the pipes in snowflake vs json and models object''' | |
# Pipes that currently exist in snowflake | |
all_conn_pipes = { | |
row[0]: row[1] for row in autogen_context.connection.execute( | |
"SELECT PIPE_NAME,DEFINITION FROM INFORMATION_SCHEMA.PIPES WHERE PIPE_SCHEMA='SCHEMA';") | |
} | |
logging.info("testings the logs") | |
# Retrieves pipe names we have in our metadata object | |
pipe_names = {key.upper() + "_PIPE" for key in autogen_context.metadata.tables.keys()} | |
schemas = get_schemas() | |
json_col_mapping = {} | |
for filename in schemas: | |
schema_filename = os.path.basename(filename) | |
schema_name, _ = os.path.splitext(schema_filename) | |
with open(filename, 'r') as f: | |
schema = json.loads(f.read()) | |
json_col_mapping[schema_name] = sorted([col['name'] for col in traverse(schema, '', [])]) | |
for op in upgrade_ops.ops: | |
try: | |
table_name = op.table_name | |
json_cols = json_col_mapping[table_name] | |
metadata_table = autogen_context.metadata.tables[table_name] | |
metadata_cols = sorted(metadata_table.columns.keys()) | |
except KeyError as e: | |
logging.info('%s This table was not checked in through alembic, or there is not json for it', str(e)) | |
continue | |
if isinstance(op, CreateTableOp): | |
upgrade_ops.ops.append( | |
CreateSnowpipeOp(table_name, metadata_cols, json_cols)) | |
elif isinstance(op, ModifyTableOps): | |
copy_command = all_conn_pipes[table_name.upper() + '_PIPE'] | |
upgrade_ops.ops.append( | |
ModifySnowpipeOp(table_name, metadata_cols, json_cols, copy_command) | |
) | |
# Pipes that are going to be removed | |
for pipe in set(all_conn_pipes.keys()).difference(pipe_names): | |
upgrade_ops.ops.append( | |
DropSnowpipeOp(pipe)) | |
@renderers.dispatch_for(CreateSnowpipeOp) | |
def render_build_snowpipe(autogen_context, op): # pylint: disable=unused-argument | |
'''Template to render build snowpipe in script''' | |
return "op.build_snowpipe('{0}', {1}, {2})\n" \ | |
"sqs_arn = get_snowpipe_arn(op, '{0}')\n" \ | |
"add_sqs_listener(sqs_arn, '{0}')".format(op.table_name, op.metadata_cols, op.json_cols) | |
@renderers.dispatch_for(DropSnowpipeOp) | |
def render_drop_snowpipe(autogen_context, op): # pylint: disable=unused-argument | |
'''Template to render drop snowpipe in script''' | |
return "op.drop_snowpipe('{0}')\n" \ | |
"remove_sqs_listener('{0}')".format(op.table_name) | |
@renderers.dispatch_for(ModifySnowpipeOp) | |
def render_modify_snowpipe(autogen_context, op): # pylint: disable=unused-argument | |
'''Template to render modify snowpipe in script''' | |
return "op.modify_snowpipe('{}', {}, {}, '''{}''')".format(op.table_name, op.metadata_cols, | |
op.json_cols, op.copy_command) | |
@renderers.dispatch_for(RevertSnowpipeOp) | |
def render_revert_snowpipe(autogen_context, op): # pylint: disable=unused-argument | |
'''Template to render revert snowpipe in script ''' | |
return "op.revert_snowpipe('{}', {}, {}, '''{}''')".format(op.table_name, op.metadata_cols, | |
op.json_cols, op.copy_command) | |
def get_snowpipe_arn(op, tablename): | |
'''Gets the arn from the pipe to use for alerting in s3''' | |
result = op.get_bind().execute(""" | |
SELECT | |
NOTIFICATION_CHANNEL_NAME | |
FROM INFORMATION_SCHEMA.pipes | |
WHERE PIPE_NAME = '{tablename}_PIPE' | |
""".format(tablename=tablename.upper())) | |
return result.fetchone()[0] | |
def add_sqs_listener(sqs_arn, tablename): | |
'''Adds the sqs listerner to notify the snowpipe when a new file object is created''' | |
# Adds the notification alert to s3 | |
# Code taken from here https://blog.narrativ.com/migrating-from-redshift-to-snowflake-in-python-269bb57f8809 | |
notification_config = { | |
'Events': ['s3:ObjectCreated:*'], | |
'Id': os.getenv('ENV').upper() + '_' + tablename.upper() + '_NOTIFS', | |
'QueueArn': sqs_arn, | |
'Filter': {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': '{}/schema_{}/'.format(os.getenv('ENV').lower(), | |
tablename)}]}} | |
} | |
client = boto3.client('s3') | |
existing_configs = client.get_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET')) \ | |
.get('QueueConfigurations', []) | |
try: | |
index = [config['Id'] for config in existing_configs].index(tablename) | |
existing_configs[index] = notification_config | |
except ValueError: | |
existing_configs.append(notification_config) | |
client.put_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET'), | |
NotificationConfiguration={'QueueConfigurations': existing_configs}) | |
def remove_sqs_listener(tablename): | |
client = boto3.client('s3') | |
existing_configs = client.get_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET')) \ | |
.get('QueueConfigurations', []) | |
new_config = list(filter(lambda nc: nc['Id'] != os.getenv('ENV').upper() + '_' + tablename.upper() \ | |
+ '_NOTIFS', existing_configs)) | |
client.put_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET'), | |
NotificationConfiguration={'QueueConfigurations': new_config}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I would love to make a package out of this! Would you mind adding some license info to it?