Skip to content

Instantly share code, notes, and snippets.

@troyharvey
Created September 21, 2018 19:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save troyharvey/50dfab3b37ec61bdaa0225bcbd8ffc65 to your computer and use it in GitHub Desktop.
Save troyharvey/50dfab3b37ec61bdaa0225bcbd8ffc65 to your computer and use it in GitHub Desktop.
An Airflow operator that translates a Postgres table definition into a Redshift CREATE TABLE statement.
class CreateRedshiftTableFromSchemaOperator(BaseOperator):
"""
Create Redshift Table from schema file.
:param redshift_conn_id Airflow Redshift Connection ID
:type redshift_conn_id string
:param schema Redshift schema
:type schema string
:param table Redshift table
:type table string
:param s3_bucket S3 bucket containing the table definition
:type s3_bucket string
:param s3_key S3 key with table definition
:type s3_key string
"""
template_fields = ["s3_key"]
@apply_defaults
def __init__(
self,
redshift_conn_id,
schema,
table,
s3_bucket,
s3_key,
*args,
**kwargs
) -> bool:
"""Run a Redshift SQL create table script generated from the Postgres table definition."""
super().__init__(*args, **kwargs)
self.redshift_conn_id = redshift_conn_id
self.schema = schema
self.table = table
self.s3_bucket = s3_bucket
self.s3_key = s3_key
def execute(self, context):
self.redshift_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
conn = self.redshift_hook.get_conn()
cursor = conn.cursor()
sql = self._get_create_table_sql(
self._get_postgres_table_definition()
)
cursor.execute(sql)
cursor.close()
conn.commit()
return True
def _get_postgres_table_definition(self):
s3_hook = S3Hook()
return json.loads(s3_hook.read_key(key=self.s3_key, bucket_name=self.s3_bucket))
def _get_create_table_sql(self, postgres_table_definition):
column_sql = []
for column in postgres_table_definition:
column_sql.append(
"{column_name} {data_type} {nullable}".format(
column_name=column["column_name"],
data_type=self._postgres_to_redshift_data_type(column),
nullable=""
# nullable="not null" if column["is_nullable"].lower() == "no" else ""
)
)
return f"""
DROP TABLE IF EXISTS {self.schema}.{self.table} CASCADE;
CREATE TABLE IF NOT EXISTS {self.schema}.{self.table}
(
{", ".join(column_sql)}
);
"""
def _postgres_to_redshift_data_type(self, column):
"""
Convert a Postgres data type into a Redshift data type.
These data types appear in our Postgres databases, but are not
supported by Redshift:
- inet -> varchar(45) https://stackoverflow.com/a/7477384
- json -> varchar(max)
- jsonb -> varchar(max)
- text -> varchar(max)
- uuid -> char(36) https://stackoverflow.com/a/18989006
"""
if column["data_type"].lower() == "array":
return "varchar(16384)"
if column["data_type"].lower() == "inet":
return "varchar(45)"
if column["data_type"].lower() == "json":
return "varchar(16384)"
if column["data_type"].lower() == "jsonb":
return "varchar(16384)"
if column["data_type"].lower() == "text":
return "varchar(16384)"
if column["data_type"].lower() == "uuid":
return "char(36)"
if "character_maximum_length" in column and column["character_maximum_length"]:
return f"{column['data_type']}({column['character_maximum_length']})"
return column["data_type"]
@chandnidewani
Copy link

hey, could you explain in what format have you saved your postgres table schema?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment