Skip to content

Instantly share code, notes, and snippets.

@nitinmlvya
Created April 23, 2019 06:08
Show Gist options
  • Save nitinmlvya/ba4626e8ec40dc546119bb14a8349b45 to your computer and use it in GitHub Desktop.
Save nitinmlvya/ba4626e8ec40dc546119bb14a8349b45 to your computer and use it in GitHub Desktop.
ETL Code using AWS Glue.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
## Initialize the GlueContext and SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Read the data from Amazon S3 and have their structure in the data catalog.
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2")
## Apply transformation, join the tables
join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1")
## Write the transformed data into Amazon Redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", connection_options = {"dbtable": "sch_demo_1.tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
job.commit()
@sumitsingh2415
Copy link

Hello,

Thanks for the code, really appreciate it. However my use case is little diiferent and I am modifying your code to fit the purpose.
I have an S3 location where multiple tables are present in parquet format. I am reading each location and writing it to redshift using dynamic frame. When I provided a single table as input it runs without a doubt. But when I am trying to loop the s3 location and write in redhsift separately its not working. Glue status shows succeeded but still redshift table doesn't gets created.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging

logging.basicConfig(format = format)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

@params: [TempDir, JOB_NAME]

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}

#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"

for s3_path,table_name in normalized_table_dictionary.items():
normalized_table = ''
redshift_view = ''
normalized_table = table_name
redshift_view = "test_"+normalized_table

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")

pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"

logger.info(pre_query)


post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"

logger.info(post_query)
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
logger.info('datasink1',datasink1)

datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")

logger.info('datasink1',datasink2)
job.commit()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment