Created
April 23, 2019 06:08
-
-
Save nitinmlvya/ba4626e8ec40dc546119bb14a8349b45 to your computer and use it in GitHub Desktop.
ETL Code using AWS Glue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) | |
## Initialize the GlueContext and SparkContext | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
## Read the data from Amazon S3 and have their structure in the data catalog. | |
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") | |
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2") | |
## Apply transformation, join the tables | |
join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1") | |
## Write the transformed data into Amazon Redshift | |
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", connection_options = {"dbtable": "sch_demo_1.tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello,
Thanks for the code, really appreciate it. However my use case is little diiferent and I am modifying your code to fit the purpose.
I have an S3 location where multiple tables are present in parquet format. I am reading each location and writing it to redshift using dynamic frame. When I provided a single table as input it runs without a doubt. But when I am trying to loop the s3 location and write in redhsift separately its not working. Glue status shows succeeded but still redshift table doesn't gets created.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logging.basicConfig(format = format)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
@params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}
#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"
for s3_path,table_name in normalized_table_dictionary.items():
normalized_table = ''
redshift_view = ''
normalized_table = table_name
redshift_view = "test_"+normalized_table