Skip to content

Instantly share code, notes, and snippets.

@vdparikh
Created September 25, 2023 19:38
Show Gist options
  • Save vdparikh/c8cc4a4c94e2620c14fbd930154ffbbf to your computer and use it in GitHub Desktop.
Save vdparikh/c8cc4a4c94e2620c14fbd930154ffbbf to your computer and use it in GitHub Desktop.
import os
import fnmatch
import pandas as pd
from presidio_analyzer import AnalyzerEngine, RecognizerResult, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import EngineResult
from presidio_image_redactor import ImageRedactorEngine, ImageAnalyzerEngine
from PIL import Image
import docx2txt
from pdf2image import convert_from_path
# Define your location list
location_list = pd.read_csv("us_cities_states_counties.csv", sep='|').reset_index(drop=True)
# Initialize the analyzers
batch_analyzer = BatchAnalyzerEngine()
analyzer = AnalyzerEngine()
image_analyzer = ImageAnalyzerEngine()
redactor = ImageRedactorEngine()
# Define patterns and recognizers
zip_pattern = Pattern(name="zip_pattern", regex='(\\b\\d{5}(?:\\-\\d{4})?\\b)', score=0.5)
zip_recognizer = PatternRecognizer(supported_entity="ZIPCODE", patterns=[zip_pattern], context=["zip", "zipcode"])
batch_analyzer.registry.add_recognizer(zip_recognizer)
analyzer.registry.add_recognizer(zip_recognizer)
state_recognizer = PatternRecognizer(supported_entity="STATE", deny_list=list(location_list['State short'].dropna().unique()), context=["state", "address"])
batch_analyzer.registry.add_recognizer(state_recognizer)
analyzer.registry.add_recognizer(state_recognizer)
city_recognizer = PatternRecognizer(supported_entity="CITY", deny_list=list(location_list['City'].dropna().unique()), context=["city", "address"])
batch_analyzer.registry.add_recognizer(city_recognizer)
analyzer.registry.add_recognizer(city_recognizer)
password_pattern = Pattern(name="password_pattern", regex='^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$%^&*-]).{8,}$', score=0.5)
password_recognizer = PatternRecognizer(supported_entity="PASSWORD", patterns=[password_pattern], context=["password"])
batch_analyzer.registry.add_recognizer(password_recognizer)
analyzer.registry.add_recognizer(password_recognizer)
# Define a function to process different file types
def process_files(file_pattern, file_extension, analyze_func):
for filename in os.listdir('.'):
if fnmatch.fnmatch(filename, file_pattern):
if file_extension == 'csv':
data = pd.read_csv(filename, index_col=0).reset_index(drop=True)
elif file_extension == 'xlsx':
data = pd.read_excel(filename, engine='openpyxl')
elif file_extension == 'docx':
MY_TEXT = docx2txt.process(filename)
with open("pii_docx_made.txt", "w") as text_file:
print(MY_TEXT, file=text_file)
data = pd.read_csv("pii_docx_made.txt", sep="\t")
elif file_extension == 'pdf':
images = convert_from_path(filename)
data = []
for i, img in enumerate(images):
img.save(f'made_{filename}{i}.jpg', 'JPEG')
data.append(f'made_{filename}{i}.jpg')
else:
continue
data = data.astype(str).replace('nan', pd.NA)
data_dict = data.to_dict(orient="list")
analyzer_results = analyze_func(data_dict, language="en")
analyzer_df = pd.DataFrame(analyzer_results)
presidio_df = pd.DataFrame(list(analyzer_df['recognizer_results']), analyzer_df['key']).reset_index()
presidio_df.insert(0, 'filename', filename, True)
presidio_df.to_csv(f"result_{file_extension}_{filename}.csv", index=False)
# Process text files
process_files('s_pii_*.txt', 'txt', batch_analyzer.analyze_dict)
# Process CSV files
process_files('s_pii_*.csv', 'csv', batch_analyzer.analyze_dict)
# Process XLSX files
process_files('s_pii_*.xlsx', 'xlsx', batch_analyzer.analyze_dict)
# Process DOCX files
process_files('s_pii_*.docx', 'docx', batch_analyzer.analyze_dict)
# Process PDF files
process_files('s_pii_*.pdf', 'pdf', batch_analyzer.analyze_dict)
# Process image files (JPG and PNG)
for image_extension in ['jpg', 'png']:
process_files(f's_pii_*.{image_extension}', image_extension, image_analyzer.analyze)
# Merge all results into a single CSV
result_files = [f for f in os.listdir('.') if f.startswith('result_')]
result_dfs = [pd.read_csv(f) for f in result_files]
final = pd.concat(result_dfs, ignore_index=True)
final.to_csv("result_structured.csv", index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment