Skip to content

Instantly share code, notes, and snippets.

@sleroy
Created November 7, 2024 09:54
Show Gist options
  • Save sleroy/e8d4ca2037524906a30579af4dbb062b to your computer and use it in GitHub Desktop.
Save sleroy/e8d4ca2037524906a30579af4dbb062b to your computer and use it in GitHub Desktop.
Glue example with concurrent thread pool to parallelize jdbc writes
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
num_partitions = 40
bulk_size = 30000
write_threads = 200
num_rows = 10000000
pk = "id"
## Read Data from a RDS DB using JDBC driver
connection_options = {
"useConnectionProperties": "true",
"dbtable": "new_table",
"connectionName": "source-sqlserver",
'hashexpression': pk,
'hashpartitions': num_partitions,
"bulkSize": str(bulk_size),
}
source_df = glueContext.create_dynamic_frame.from_options(
connection_type = "sqlserver",
connection_options = connection_options
)
# Convert DynamicFrame to DataFrame and limit to 100,000 rows
df = source_df.toDF().limit(num_rows)
limited_source_df = DynamicFrame.fromDF(df, glueContext, "limited_source_df")
#limited_source_df = source_df
# Create the list of dataframes dynamically
dfList = [
limited_source_df.filter(f=lambda item: (item[pk] % write_threads) == i)
for i in range(write_threads)
]
def write_partition_df(repart_df):
#from awsglue.dynamicframe import DynamicFrame
#dynframe = DynamicFrame.fromDF(repart_df, glueContext)
glueContext.write_dynamic_frame.from_jdbc_conf(repart_df,
catalog_connection = "target-rds-sqlserver",
connection_options={
"dbtable": "replicated",
"database": "example",
'hashexpression': pk,
'hashpartitions': num_partitions,
"bulkSize": bulk_size,
"mode": "append"
})
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=write_threads)
for df in dfList:
# calling method using Python thread
executor.submit(write_partition_df,df)
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment