Created October 28, 2019 19:34
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session
glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)
collections_input = "COLLECTIONS_REPLACE"
collections = collections_input.split(",")
dfs = []
# Loop over each collection read the collection and push it to dataframes list
for collection in collections:
source_df ="jdbc").option("url",jdbc_url).option("dbtable",collection).option("driver","cdata.jdbc.mongodb.MongoDBDriver").load()
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df_{}".format(collection))
dfs.append({"dynamic_frame": dynamic_dframe, "collection": collection})
# Write dataframes to s3
for df in dfs:
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = df["dynamic_frame"], connection_type = "s3", connection_options = {"path": "TARGET_BUCKET"}, format = "csv", transformation_ctx = "datasink4")
