Skip to content

Instantly share code, notes, and snippets.

@saumalya75
Last active January 22, 2023 11:10
Show Gist options
  • Save saumalya75/421df660f3c932d732243d3a2b16048f to your computer and use it in GitHub Desktop.
Save saumalya75/421df660f3c932d732243d3a2b16048f to your computer and use it in GitHub Desktop.
Glue-Spark code to handle malformed records while reading csv from s3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# Importing required field types to define schema
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
# Importing required functions
from pyspark.sql.functions import input_file_name, col, concat_ws, when, lit
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
################## Input data read ##################
# The input data will be read using sparkContext istead of glueContext to ensure better control over malformed data handling
# Input data used:
# id,name,mobile,address,pin
# 1,personA,23345345,dashadrone,756587
# 2,personB,,new town,756588
# 3,personC,56509808,amta,howrah,756589
# x,PersonD,34524650,sibpur,756580
# 3rd and fourth row is corrupted, 3rd row has extra delimeter, 4th row had string value in integer feld
# Define the schema of the source file explicitly
# Please note we have added one extra column - _corrupt_record to store the malformed records explicitly
# Strucfield signaure -> StructFied(ColumnName, DataType, Nullable)
customSchema = StructType([
StructField("id",IntegerType(),False),
StructField("name",StringType(),False),
StructField("mobile",StringType(),False),
StructField("address", StringType(), True),
StructField("pincode", IntegerType(), False),
StructField("_corrupt_record", StringType(), True)
])
# Please not Nullable option in StructField is not a constraint, more of a hint,
# Meaning even if the file contains null value in a nullable-false field, it will not be considered as malformed.
# To work around this we will put explicit checks after reading the data
sc.setLogLevel("INFO")
# To accept and mark malformed records, mode is set to PERMISSIVE and corrupt record row name is assigned to _corrupted_records
input_df = spark.read.format("csv") \
.schema(customSchema) \
.option("quote", "") \
.option("header", "true") \
.option("inferSchema","false") \
.option("sep",",") \
.option("mode","PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.load("s3://glue-handson-data/RejectedRecordHandling/SRC/")
# Adding source file name is optional, it is same as informatica
input_df = input_df.withColumn("file_name", input_file_name())
# Cching the dataframe before filtering is mandatory, for more info read about lazy evaluation of spark
# Although in this case it is redundant as we are taking count, again have a look at transformations vs actions in spark
input_df.cache()
total_row_count = input_df.count()
input_df.printSchema()
# Shows actual enforce schema, notice nullability became true for most of the not null defined columns
# root
# |-- id: integer (nullable = true)
# |-- name: string (nullable = true)
# |-- mobile: string (nullable = true)
# |-- address: string (nullable = true)
# |-- pincode: integer (nullable = true)
# |-- _corrupt_record: string (nullable = true)
# |-- file_name: string (nullable = false)
print("input_df :::")
input_df.show(truncate=False)
# Showing the state of the dataframe just after reding, notice the __corrupt_record column
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+
# |id |name |mobile |address |pincode|_corrupt_record |file_name |
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# |2 |personB|null |new town |756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# |null|null |null |null |null |3,personC,56509808,amta,howrah,756589|s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# |null|null |null |null |null |x,PersonD,34524650,sibpur,756580 |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+
print("total_row_count = " + str(total_row_count))
semi_valid_data_df = input_df.filter(col("_corrupt_record").isNull())
corrupted_data_df = input_df.filter(col("_corrupt_record").isNotNull()).select("_corrupt_record")
input_df_columns = input_df.drop("_corrupt_record", "file_name").columns
corrupted_data_df = corrupted_data_df.withColumnRenamed("_corrupt_record", ','.join(input_df_columns))
print("semi_valid_data_df :::")
semi_valid_data_df.show(truncate=False)
# Semi-Valid data (nulls are still to be handled)
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
# |id |name |mobile |address |pincode|_corrupt_record|file_name |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# |2 |personB|null |new town |756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
print("corrupted_data_df :::")
corrupted_data_df.show(truncate=False)
# Corrupt Record, can be written in any output system based on requirement
# +-------------------------------------+
# |id,name,mobile,address,pincode |
# +-------------------------------------+
# |3,personC,56509808,amta,howrah,756589|
# |x,PersonD,34524650,sibpur,756580 |
# +-------------------------------------+
################## Null values in not null columns handling ##################
# detecting supposed to be not null records from defined schema
defined_not_null_cols = []
for field in customSchema:
if not field.nullable:
defined_not_null_cols.append(field.name)
# if any not null columns are found...
if defined_not_null_cols:
null_filter_query = ""
not_null_filter_query = ""
for ind, column_name in enumerate(defined_not_null_cols):
if not ind:
null_filter_query += f"{column_name} is null"
not_null_filter_query += f"{column_name} is not null"
else:
null_filter_query += f" OR {column_name} is null"
not_null_filter_query += f" AND {column_name} is not null"
else:
null_filter_query = "1=2"
not_null_filter_query = "1=1"
# Separating valid/corrupted data based on nullability
final_valid_data_df = semi_valid_data_df.where(not_null_filter_query)
null_corrupted_data_df = semi_valid_data_df.where(null_filter_query)
print("final_valid_data_df :::")
final_valid_data_df.show(truncate=False)
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
# |id |name |mobile |address |pincode|_corrupt_record|file_name |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+
print("null_corrupted_data_df :::")
# null_corrupted_data_df.show(truncate=False)
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+
# |id |name |mobile|address |pincode|_corrupt_record|file_name |
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+
# |2 |personB|null |new town|756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv|
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+
################## Output handling ##################
# Now both these dataframes can be written in required output system
# Dataframes can be written directly using sparkContext, but here we will convert to dynamic frame and write using glueContext
# Converting the spark dataframes to glue dynamic frames to be able to use glue dynamic frame write APIs
final_valid_data_dyf = DynamicFrame.fromDF(final_valid_data_df.drop("_corrupt_record", "file_name"), glueContext, "final_valid_data_dyf")
corrupted_data_dyf = DynamicFrame.fromDF(corrupted_data_df, glueContext, "corrupted_data_dyf")
null_corrupted_data_dyf = DynamicFrame.fromDF(null_corrupted_data_df.drop("_corrupt_record", "file_name"), glueContext, "null_corrupted_data_dyf")
# Writting back to s3 bucket
glueContext.write_dynamic_frame.from_options(
frame=final_valid_data_dyf,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://glue-handson-data/RejectedRecordHandling/TGT/",
"partitionKeys": [],
},
transformation_ctx="final_valid_data_dyf_write",
)
# id,name,mobile,address,pincode
# 1,personA,23345345,dashadrone,756587
glueContext.write_dynamic_frame.from_options(
frame=corrupted_data_dyf,
connection_type="s3",
format="csv",
format_options={
"quoteChar": -1,
},
connection_options={
"path": "s3://glue-handson-data/RejectedRecordHandling/BAD/",
"partitionKeys": [],
},
transformation_ctx="corrupted_data_dyf_write",
)
# id,name,mobile,address,pincode
# 3,personC,56509808,amta,howrah,756589
# x,PersonD,34524650,sibpur,756580
glueContext.write_dynamic_frame.from_options(
frame=null_corrupted_data_dyf,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://glue-handson-data/RejectedRecordHandling/NULL/",
"partitionKeys": [],
},
transformation_ctx="null_corrupted_data_dyf_write",
)
# id,name,mobile,address,pincode
# 2,personB,,"new town",756588
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment