Last active
January 22, 2023 11:10
-
-
Save saumalya75/421df660f3c932d732243d3a2b16048f to your computer and use it in GitHub Desktop.
Glue-Spark code to handle malformed records while reading csv from s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
from awsglue.dynamicframe import DynamicFrame | |
# Importing required field types to define schema | |
from pyspark.sql.types import StringType, StructType, StructField, IntegerType | |
# Importing required functions | |
from pyspark.sql.functions import input_file_name, col, concat_ws, when, lit | |
args = getResolvedOptions(sys.argv, ["JOB_NAME"]) | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args["JOB_NAME"], args) | |
################## Input data read ################## | |
# The input data will be read using sparkContext istead of glueContext to ensure better control over malformed data handling | |
# Input data used: | |
# id,name,mobile,address,pin | |
# 1,personA,23345345,dashadrone,756587 | |
# 2,personB,,new town,756588 | |
# 3,personC,56509808,amta,howrah,756589 | |
# x,PersonD,34524650,sibpur,756580 | |
# 3rd and fourth row is corrupted, 3rd row has extra delimeter, 4th row had string value in integer feld | |
# Define the schema of the source file explicitly | |
# Please note we have added one extra column - _corrupt_record to store the malformed records explicitly | |
# Strucfield signaure -> StructFied(ColumnName, DataType, Nullable) | |
customSchema = StructType([ | |
StructField("id",IntegerType(),False), | |
StructField("name",StringType(),False), | |
StructField("mobile",StringType(),False), | |
StructField("address", StringType(), True), | |
StructField("pincode", IntegerType(), False), | |
StructField("_corrupt_record", StringType(), True) | |
]) | |
# Please not Nullable option in StructField is not a constraint, more of a hint, | |
# Meaning even if the file contains null value in a nullable-false field, it will not be considered as malformed. | |
# To work around this we will put explicit checks after reading the data | |
sc.setLogLevel("INFO") | |
# To accept and mark malformed records, mode is set to PERMISSIVE and corrupt record row name is assigned to _corrupted_records | |
input_df = spark.read.format("csv") \ | |
.schema(customSchema) \ | |
.option("quote", "") \ | |
.option("header", "true") \ | |
.option("inferSchema","false") \ | |
.option("sep",",") \ | |
.option("mode","PERMISSIVE") \ | |
.option("columnNameOfCorruptRecord", "_corrupt_record") \ | |
.load("s3://glue-handson-data/RejectedRecordHandling/SRC/") | |
# Adding source file name is optional, it is same as informatica | |
input_df = input_df.withColumn("file_name", input_file_name()) | |
# Cching the dataframe before filtering is mandatory, for more info read about lazy evaluation of spark | |
# Although in this case it is redundant as we are taking count, again have a look at transformations vs actions in spark | |
input_df.cache() | |
total_row_count = input_df.count() | |
input_df.printSchema() | |
# Shows actual enforce schema, notice nullability became true for most of the not null defined columns | |
# root | |
# |-- id: integer (nullable = true) | |
# |-- name: string (nullable = true) | |
# |-- mobile: string (nullable = true) | |
# |-- address: string (nullable = true) | |
# |-- pincode: integer (nullable = true) | |
# |-- _corrupt_record: string (nullable = true) | |
# |-- file_name: string (nullable = false) | |
print("input_df :::") | |
input_df.show(truncate=False) | |
# Showing the state of the dataframe just after reding, notice the __corrupt_record column | |
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+ | |
# |id |name |mobile |address |pincode|_corrupt_record |file_name | | |
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+ | |
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# |2 |personB|null |new town |756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# |null|null |null |null |null |3,personC,56509808,amta,howrah,756589|s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# |null|null |null |null |null |x,PersonD,34524650,sibpur,756580 |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# +----+-------+--------+----------+-------+-------------------------------------+-------------------------------------------------------------------------+ | |
print("total_row_count = " + str(total_row_count)) | |
semi_valid_data_df = input_df.filter(col("_corrupt_record").isNull()) | |
corrupted_data_df = input_df.filter(col("_corrupt_record").isNotNull()).select("_corrupt_record") | |
input_df_columns = input_df.drop("_corrupt_record", "file_name").columns | |
corrupted_data_df = corrupted_data_df.withColumnRenamed("_corrupt_record", ','.join(input_df_columns)) | |
print("semi_valid_data_df :::") | |
semi_valid_data_df.show(truncate=False) | |
# Semi-Valid data (nulls are still to be handled) | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
# |id |name |mobile |address |pincode|_corrupt_record|file_name | | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# |2 |personB|null |new town |756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
print("corrupted_data_df :::") | |
corrupted_data_df.show(truncate=False) | |
# Corrupt Record, can be written in any output system based on requirement | |
# +-------------------------------------+ | |
# |id,name,mobile,address,pincode | | |
# +-------------------------------------+ | |
# |3,personC,56509808,amta,howrah,756589| | |
# |x,PersonD,34524650,sibpur,756580 | | |
# +-------------------------------------+ | |
################## Null values in not null columns handling ################## | |
# detecting supposed to be not null records from defined schema | |
defined_not_null_cols = [] | |
for field in customSchema: | |
if not field.nullable: | |
defined_not_null_cols.append(field.name) | |
# if any not null columns are found... | |
if defined_not_null_cols: | |
null_filter_query = "" | |
not_null_filter_query = "" | |
for ind, column_name in enumerate(defined_not_null_cols): | |
if not ind: | |
null_filter_query += f"{column_name} is null" | |
not_null_filter_query += f"{column_name} is not null" | |
else: | |
null_filter_query += f" OR {column_name} is null" | |
not_null_filter_query += f" AND {column_name} is not null" | |
else: | |
null_filter_query = "1=2" | |
not_null_filter_query = "1=1" | |
# Separating valid/corrupted data based on nullability | |
final_valid_data_df = semi_valid_data_df.where(not_null_filter_query) | |
null_corrupted_data_df = semi_valid_data_df.where(null_filter_query) | |
print("final_valid_data_df :::") | |
final_valid_data_df.show(truncate=False) | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
# |id |name |mobile |address |pincode|_corrupt_record|file_name | | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
# |1 |personA|23345345|dashadrone|756587 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# +---+-------+--------+----------+-------+---------------+-------------------------------------------------------------------------+ | |
print("null_corrupted_data_df :::") | |
# null_corrupted_data_df.show(truncate=False) | |
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+ | |
# |id |name |mobile|address |pincode|_corrupt_record|file_name | | |
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+ | |
# |2 |personB|null |new town|756588 |null |s3://glue-handson-data/RejectedRecordHandling/SRC/RejctRecordsTestCSV.csv| | |
# +---+-------+------+--------+-------+---------------+-------------------------------------------------------------------------+ | |
################## Output handling ################## | |
# Now both these dataframes can be written in required output system | |
# Dataframes can be written directly using sparkContext, but here we will convert to dynamic frame and write using glueContext | |
# Converting the spark dataframes to glue dynamic frames to be able to use glue dynamic frame write APIs | |
final_valid_data_dyf = DynamicFrame.fromDF(final_valid_data_df.drop("_corrupt_record", "file_name"), glueContext, "final_valid_data_dyf") | |
corrupted_data_dyf = DynamicFrame.fromDF(corrupted_data_df, glueContext, "corrupted_data_dyf") | |
null_corrupted_data_dyf = DynamicFrame.fromDF(null_corrupted_data_df.drop("_corrupt_record", "file_name"), glueContext, "null_corrupted_data_dyf") | |
# Writting back to s3 bucket | |
glueContext.write_dynamic_frame.from_options( | |
frame=final_valid_data_dyf, | |
connection_type="s3", | |
format="csv", | |
connection_options={ | |
"path": "s3://glue-handson-data/RejectedRecordHandling/TGT/", | |
"partitionKeys": [], | |
}, | |
transformation_ctx="final_valid_data_dyf_write", | |
) | |
# id,name,mobile,address,pincode | |
# 1,personA,23345345,dashadrone,756587 | |
glueContext.write_dynamic_frame.from_options( | |
frame=corrupted_data_dyf, | |
connection_type="s3", | |
format="csv", | |
format_options={ | |
"quoteChar": -1, | |
}, | |
connection_options={ | |
"path": "s3://glue-handson-data/RejectedRecordHandling/BAD/", | |
"partitionKeys": [], | |
}, | |
transformation_ctx="corrupted_data_dyf_write", | |
) | |
# id,name,mobile,address,pincode | |
# 3,personC,56509808,amta,howrah,756589 | |
# x,PersonD,34524650,sibpur,756580 | |
glueContext.write_dynamic_frame.from_options( | |
frame=null_corrupted_data_dyf, | |
connection_type="s3", | |
format="csv", | |
connection_options={ | |
"path": "s3://glue-handson-data/RejectedRecordHandling/NULL/", | |
"partitionKeys": [], | |
}, | |
transformation_ctx="null_corrupted_data_dyf_write", | |
) | |
# id,name,mobile,address,pincode | |
# 2,personB,,"new town",756588 | |
job.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment