-
-
Save nitinmlvya/ba4626e8ec40dc546119bb14a8349b45 to your computer and use it in GitHub Desktop.
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) | |
## Initialize the GlueContext and SparkContext | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
## Read the data from Amazon S3 and have their structure in the data catalog. | |
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") | |
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2") | |
## Apply transformation, join the tables | |
join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1") | |
## Write the transformed data into Amazon Redshift | |
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", connection_options = {"dbtable": "sch_demo_1.tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") | |
job.commit() |
Hello,
Thanks for the code, really appreciate it. However my use case is little diiferent and I am modifying your code to fit the purpose.
I have an S3 location where multiple tables are present in parquet format. I am reading each location and writing it to redshift using dynamic frame. When I provided a single table as input it runs without a doubt. But when I am trying to loop the s3 location and write in redhsift separately its not working. Glue status shows succeeded but still redshift table doesn't gets created.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logging.basicConfig(format = format)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
@params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}
#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"
for s3_path,table_name in normalized_table_dictionary.items():
normalized_table = ''
redshift_view = ''
normalized_table = table_name
redshift_view = "test_"+normalized_table
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")
pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"
logger.info(pre_query)
post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"
logger.info(post_query)
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
logger.info('datasink1',datasink1)
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")
logger.info('datasink1',datasink2)
job.commit()
Hey @nitinmalvya! I came across your blog while reading about redshift and glue connectivity. Could you tell me what
catalog_connection
param means exactly?