Skip to content

Instantly share code, notes, and snippets.

@nehiljain
Created November 19, 2019 12:18
Show Gist options
  • Save nehiljain/e7e39751bf9dcecbc83940a6fcab080b to your computer and use it in GitHub Desktop.
Save nehiljain/e7e39751bf9dcecbc83940a6fcab080b to your computer and use it in GitHub Desktop.
Snowpipe Alembic Utils
'''This is the extension on the alembic API to facilitate the custom autogenerate script'''
import json
import logging
import os
import boto3
from alembic.autogenerate import comparators, renderers
from alembic.operations import MigrateOperation, Operations
from alembic.operations.ops import CreateTableOp, ModifyTableOps
from update_s3_schemas import get_schemas
from utils.file_compare_utils import traverse
@Operations.register_operation("build_snowpipe")
class CreateSnowpipeOp(MigrateOperation):
"""The setup required to build a snowpipe"""
def __init__(self, table_name, metadata_cols=None, json_cols=None):
self.table_name = table_name
self.metadata_cols = metadata_cols
self.json_cols = json_cols
@classmethod
def build_snowpipe(cls, operations, table_name, metadata_cols, json_cols):
op = CreateSnowpipeOp(table_name, metadata_cols, json_cols)
return operations.invoke(op)
def reverse(self):
# only needed to support autogenerate
return DropSnowpipeOp(self.table_name)
@Operations.register_operation("drop_snowpipe")
class DropSnowpipeOp(MigrateOperation):
"""Drops the snowpipe and stage"""
def __init__(self, table_name):
self.table_name = table_name
@classmethod
def drop_snowpipe(cls, operations, table_name):
op = DropSnowpipeOp(table_name)
return operations.invoke(op)
def reverse(self):
# only needed to support autogenerate
return CreateSnowpipeOp(self.table_name)
@Operations.register_operation("modify_snowpipe")
class ModifySnowpipeOp(MigrateOperation):
"""Changes the snowpipe"""
def __init__(self, table_name, metadata_cols, json_cols, copy_command):
self.table_name = table_name
self.metadata_cols = metadata_cols
self.json_cols = json_cols
self.copy_command = copy_command
@classmethod
def modify_snowpipe(cls, operations, table_name, metadata_cols, json_cols, copy_command):
op = ModifySnowpipeOp(table_name, metadata_cols, json_cols, copy_command)
return operations.invoke(op)
def reverse(self):
return RevertSnowpipeOp(self.table_name, self.metadata_cols, self.json_cols, self.copy_command)
@Operations.register_operation("revert_snowpipe")
class RevertSnowpipeOp(MigrateOperation):
"""Changes the snowpipe"""
def __init__(self, table_name, metadata_cols, json_cols, copy_command):
self.table_name = table_name
self.metadata_cols = metadata_cols
self.json_cols = json_cols
self.copy_command = copy_command
@classmethod
def revert_snowpipe(cls, operations, table_name, metadata_cols, json_cols, copy_command):
op = RevertSnowpipeOp(table_name, metadata_cols, json_cols, copy_command)
return operations.invoke(op)
def reverse(self):
return ModifySnowpipeOp(self.table_name, self.metadata_cols, self.json_cols, self.copy_command)
@Operations.implementation_for(CreateSnowpipeOp)
def build_snowpipe(operations, operation):
'''Generates the operation to build the snowpipe'''
build_stage_op = operations.execute("""
CREATE OR REPLACE STAGE SCHEMA.{upper_tablename}_STAGE URL = 's3://{bucket}/{env}/schema_{tablename}/'
CREDENTIALS = (AWS_KEY_ID = '{key_id}' AWS_SECRET_KEY='{secret_key}')
FILE_FORMAT=SCHEMA.GZIP_JSON_UEL_FORMAT""".format(bucket=os.getenv('S3_BUCKET'),
env=os.getenv('ENV').lower(),
upper_tablename=operation.table_name.upper(),
tablename=operation.table_name,
key_id=os.getenv('AWS_ACCESS_KEY_ID'),
secret_key=os.getenv('AWS_SECRET_ACCESS_KEY')))
build_snowpipe_op = operations.execute("""
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE
AUTO_INGEST=TRUE AS COPY INTO {schema_name}.{tablename}
({metadata_cols}) FROM
(SELECT {json_cols} FROM @SCHEMA.{tablename}_STAGE)
FILE_FORMAT=SCHEMA_UNIFIED_EVENTS.GZIP_JSON_UEL_FORMAT""".format(
**{
'tablename': operation.table_name.upper(),
'schema_name': os.getenv('SNOWFLAKE_SCHEMA').upper(),
'metadata_cols': ','.join(operation.metadata_cols),
'json_cols': ','.join(['$1:\"' + col.replace(":", "\":\"") + '\"' for col in operation.json_cols])
}
))
return [build_stage_op, build_snowpipe_op]
@Operations.implementation_for(DropSnowpipeOp)
def drop_snowpipe(operations, operation):
'''Generates the operation to drop the snowpipe'''
drop_stage = operations.execute(
"""DROP STAGE IF EXISTS SCHEMA.{table_name}_STAGE""".format(table_name=operation.table_name.upper()))
drop_pipe = operations.execute(
"""DROP PIPE IF EXISTS SCHEMA.{table_name}_PIPE""".format(table_name=operation.table_name.upper()))
return [drop_stage, drop_pipe]
@Operations.implementation_for(ModifySnowpipeOp)
def modify_snowpipe(operations, operation):
'''Generates the operation to recreate a snowpipe'''
modify_pipe = operations.execute("""
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE
AUTO_INGEST=TRUE AS COPY INTO {schema_name}.{tablename}
({metadata_cols}) FROM
(SELECT {json_cols} FROM @SCHEMA.{tablename}_STAGE)
FILE_FORMAT=SCHEMA.GZIP_JSON_UEL_FORMAT""".format(
**{
'tablename': operation.table_name.upper(),
'schema_name': os.getenv('SNOWFLAKE_SCHEMA').upper(),
'metadata_cols': ','.join(operation.metadata_cols),
'json_cols': ','.join(['$1:\"' + col.replace(":", "\":\"") + '\"' for col in operation.json_cols])
}
))
return [modify_pipe]
@Operations.implementation_for(RevertSnowpipeOp)
def revert_snowpipe(operations, operation):
'''Generates the operation to revert the snowpipe'''
revert_pipe = operations.execute("""
CREATE OR REPLACE PIPE SCHEMA.{tablename}_PIPE
AUTO_INGEST=TRUE AS {copy_command}""".format(
**{
'tablename': operation.table_name.upper(),
'copy_command': operation.copy_command
}
))
return [revert_pipe]
@comparators.dispatch_for("schema")
def compare_pipes(autogen_context, upgrade_ops, schemas): # pylint: disable=unused-argument
'''Compares the pipes in snowflake vs json and models object'''
# Pipes that currently exist in snowflake
all_conn_pipes = {
row[0]: row[1] for row in autogen_context.connection.execute(
"SELECT PIPE_NAME,DEFINITION FROM INFORMATION_SCHEMA.PIPES WHERE PIPE_SCHEMA='SCHEMA';")
}
logging.info("testings the logs")
# Retrieves pipe names we have in our metadata object
pipe_names = {key.upper() + "_PIPE" for key in autogen_context.metadata.tables.keys()}
schemas = get_schemas()
json_col_mapping = {}
for filename in schemas:
schema_filename = os.path.basename(filename)
schema_name, _ = os.path.splitext(schema_filename)
with open(filename, 'r') as f:
schema = json.loads(f.read())
json_col_mapping[schema_name] = sorted([col['name'] for col in traverse(schema, '', [])])
for op in upgrade_ops.ops:
try:
table_name = op.table_name
json_cols = json_col_mapping[table_name]
metadata_table = autogen_context.metadata.tables[table_name]
metadata_cols = sorted(metadata_table.columns.keys())
except KeyError as e:
logging.info('%s This table was not checked in through alembic, or there is not json for it', str(e))
continue
if isinstance(op, CreateTableOp):
upgrade_ops.ops.append(
CreateSnowpipeOp(table_name, metadata_cols, json_cols))
elif isinstance(op, ModifyTableOps):
copy_command = all_conn_pipes[table_name.upper() + '_PIPE']
upgrade_ops.ops.append(
ModifySnowpipeOp(table_name, metadata_cols, json_cols, copy_command)
)
# Pipes that are going to be removed
for pipe in set(all_conn_pipes.keys()).difference(pipe_names):
upgrade_ops.ops.append(
DropSnowpipeOp(pipe))
@renderers.dispatch_for(CreateSnowpipeOp)
def render_build_snowpipe(autogen_context, op): # pylint: disable=unused-argument
'''Template to render build snowpipe in script'''
return "op.build_snowpipe('{0}', {1}, {2})\n" \
"sqs_arn = get_snowpipe_arn(op, '{0}')\n" \
"add_sqs_listener(sqs_arn, '{0}')".format(op.table_name, op.metadata_cols, op.json_cols)
@renderers.dispatch_for(DropSnowpipeOp)
def render_drop_snowpipe(autogen_context, op): # pylint: disable=unused-argument
'''Template to render drop snowpipe in script'''
return "op.drop_snowpipe('{0}')\n" \
"remove_sqs_listener('{0}')".format(op.table_name)
@renderers.dispatch_for(ModifySnowpipeOp)
def render_modify_snowpipe(autogen_context, op): # pylint: disable=unused-argument
'''Template to render modify snowpipe in script'''
return "op.modify_snowpipe('{}', {}, {}, '''{}''')".format(op.table_name, op.metadata_cols,
op.json_cols, op.copy_command)
@renderers.dispatch_for(RevertSnowpipeOp)
def render_revert_snowpipe(autogen_context, op): # pylint: disable=unused-argument
'''Template to render revert snowpipe in script '''
return "op.revert_snowpipe('{}', {}, {}, '''{}''')".format(op.table_name, op.metadata_cols,
op.json_cols, op.copy_command)
def get_snowpipe_arn(op, tablename):
'''Gets the arn from the pipe to use for alerting in s3'''
result = op.get_bind().execute("""
SELECT
NOTIFICATION_CHANNEL_NAME
FROM INFORMATION_SCHEMA.pipes
WHERE PIPE_NAME = '{tablename}_PIPE'
""".format(tablename=tablename.upper()))
return result.fetchone()[0]
def add_sqs_listener(sqs_arn, tablename):
'''Adds the sqs listerner to notify the snowpipe when a new file object is created'''
# Adds the notification alert to s3
# Code taken from here https://blog.narrativ.com/migrating-from-redshift-to-snowflake-in-python-269bb57f8809
notification_config = {
'Events': ['s3:ObjectCreated:*'],
'Id': os.getenv('ENV').upper() + '_' + tablename.upper() + '_NOTIFS',
'QueueArn': sqs_arn,
'Filter': {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': '{}/schema_{}/'.format(os.getenv('ENV').lower(),
tablename)}]}}
}
client = boto3.client('s3')
existing_configs = client.get_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET')) \
.get('QueueConfigurations', [])
try:
index = [config['Id'] for config in existing_configs].index(tablename)
existing_configs[index] = notification_config
except ValueError:
existing_configs.append(notification_config)
client.put_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET'),
NotificationConfiguration={'QueueConfigurations': existing_configs})
def remove_sqs_listener(tablename):
client = boto3.client('s3')
existing_configs = client.get_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET')) \
.get('QueueConfigurations', [])
new_config = list(filter(lambda nc: nc['Id'] != os.getenv('ENV').upper() + '_' + tablename.upper() \
+ '_NOTIFS', existing_configs))
client.put_bucket_notification_configuration(Bucket=os.getenv('S3_BUCKET'),
NotificationConfiguration={'QueueConfigurations': new_config})
@fernandocollova-endava
Copy link

I would love to make a package out of this! Would you mind adding some license info to it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment