Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@michelmilezzi
Last active August 29, 2023 22:08
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save michelmilezzi/fe7852ddc159a44ae7720193b3fd43d9 to your computer and use it in GitHub Desktop.
Save michelmilezzi/fe7852ddc159a44ae7720193b3fd43d9 to your computer and use it in GitHub Desktop.
AWS Glue script showing how to avoid duplicates during a job execution.
import sys
import pydevd
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col
from awsglue.job import Job
def main():
# Invoke pydevd to remote debug
pydevd.settrace('169.254.76.0', port=9001, stdoutToServer=True, stderrToServer=True)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
gc = GlueContext(sc)
job = Job(gc)
job.init("MyJob", args)
stagingdatasource = gc.create_dynamic_frame.from_catalog(
database="stagingdatabase",
table_name="staging_source_table",
transformation_ctx="stagingdatasource")
targetdatasource = gc.create_dynamic_frame.from_catalog(
database="targetdatabase",
redshift_tmp_dir=args["TempDir"],
table_name="target_table",
transformation_ctx="targetdatasource")
columnmapping = ApplyMapping.apply(
frame=stagingdatasource,
mappings=[("description", "string", "description", "string"), ("id", "int", "id", "int")],
transformation_ctx="columnmapping")
ta = columnmapping.toDF().alias('ta')
tb = targetdatasource.toDF().alias('tb')
left_join = ta\
.join(tb, ta.value == tb.value, how='left')\
.filter(col('tb.value').isNull())\
.select('ta.*')
# Inspect left join
# left_join.show()
finaldf = DynamicFrame.fromDF(left_join, gc, "nested")
gc.write_dynamic_frame.from_catalog(
frame=finaldf,
database="targetdatabase",
redshift_tmp_dir=args["TempDir"],
table_name="target_table")
job.commit()
if __name__ == "__main__":
main()
@hoIIer
Copy link

hoIIer commented Jan 14, 2022

Hey @michelmilezzi, what does ta.value represent? When I tried to implement this I get an error that the dataframe has no attribute value

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment