Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anand086/974fe2fd3f0edcbbd9622aadcb28e63d to your computer and use it in GitHub Desktop.
Save anand086/974fe2fd3f0edcbbd9622aadcb28e63d to your computer and use it in GitHub Desktop.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglueml.transforms import EntityDetector
from pyspark.sql.types import StringType
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Amazon S3
AmazonS3_node1662315984254 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://learn-share-repeat-us-west-2/pii-glue/patients/raw_data/"],
"recurse": True,
},
transformation_ctx="AmazonS3_node1662315984254",
)
# Script generated for node Detect PII
entity_detector = EntityDetector()
detected_df = entity_detector.detect(
AmazonS3_node1662315984254,
[
"PERSON_NAME",
"EMAIL",
"CREDIT_CARD",
"IP_ADDRESS",
"MAC_ADDRESS",
"PHONE_NUMBER",
"USA_PASSPORT_NUMBER",
"USA_SSN",
"USA_ITIN",
"BANK_ACCOUNT",
"USA_DRIVING_LICENSE",
"USA_HCPCS_CODE",
"USA_NATIONAL_DRUG_CODE",
"USA_NATIONAL_PROVIDER_IDENTIFIER",
"USA_DEA_NUMBER",
"USA_HEALTH_INSURANCE_CLAIM_NUMBER",
"USA_MEDICARE_BENEFICIARY_IDENTIFIER",
],
"DetectedEntities",
)
def replace_cell(original_cell_value, sorted_reverse_start_end_tuples):
if sorted_reverse_start_end_tuples:
for entity in sorted_reverse_start_end_tuples:
to_mask_value = original_cell_value[entity[0] : entity[1]]
original_cell_value = original_cell_value.replace(
to_mask_value, "*#"
)
return original_cell_value
def row_pii(column_name, original_cell_value, detected_entities):
if column_name in detected_entities.keys():
entities = detected_entities[column_name]
start_end_tuples = map(
lambda entity: (entity["start"], entity["end"]), entities
)
sorted_reverse_start_end_tuples = sorted(
start_end_tuples, key=lambda start_end: start_end[1], reverse=True
)
return replace_cell(original_cell_value, sorted_reverse_start_end_tuples)
return original_cell_value
row_pii_udf = udf(row_pii, StringType())
def recur(df, remaining_keys):
if len(remaining_keys) == 0:
return df
else:
head = remaining_keys[0]
tail = remaining_keys[1:]
modified_df = df.withColumn(
head, row_pii_udf(lit(head), head, "DetectedEntities")
)
return recur(modified_df, tail)
keys = AmazonS3_node1662315984254.toDF().columns
updated_masked_df = recur(detected_df.toDF(), keys)
updated_masked_df = updated_masked_df.drop("DetectedEntities")
DetectPII_node1662317206797 = DynamicFrame.fromDF(
updated_masked_df, glueContext, "updated_masked_df"
)
# Script generated for node Amazon S3
AmazonS3_node1662317241498 = glueContext.write_dynamic_frame.from_options(
frame=DetectPII_node1662317206797,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://learn-share-repeat-us-west-2/pii-glue/patients/glue-pii/glue_find_column_sensitive_data_redacted/",
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="AmazonS3_node1662317241498",
)
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment