Created
August 10, 2023 12:45
-
-
Save costastf/0f0cf6e1ab3aa9379f0c90430323a3a7 to your computer and use it in GitHub Desktop.
Redact text from pdf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__pyproject__ = """ | |
[project] | |
requires-python = ">=3.9" | |
dependencies = [ | |
"coloredlogs>=15.0.1,<16.0", | |
"pypdf>=3.15.0,<4.0", | |
] | |
""" | |
import logging | |
import os | |
import sys | |
from typing import Union | |
import coloredlogs | |
import pypdf | |
from pypdf import PdfReader, PdfWriter | |
from pypdf import filters | |
from pypdf.constants import StreamAttributes as SA | |
from pypdf.generic import DecodedStreamObject, EncodedStreamObject as EncodedStreamObjectToPatch | |
LOGGER = logging.getLogger(__file__) | |
class EncodedStreamObject(EncodedStreamObjectToPatch): | |
phrases = [] | |
matches = 0 | |
@staticmethod | |
def redact(text): | |
if isinstance(text, bytes): | |
try: | |
text = text.decode('utf-8') | |
except UnicodeDecodeError: | |
text = text.decode('unicode_escape') | |
for phrase in EncodedStreamObject.phrases: | |
LOGGER.debug(f'Looking for phrase "{phrase}" to redact.') | |
if phrase in text: | |
EncodedStreamObject.matches += 1 | |
text = text.replace(phrase, ' ' * len(phrase)) | |
return text | |
def get_data(self) -> Union[None, str, bytes]: | |
if self.decoded_self is not None: | |
# cached version of decoded object | |
return self.decoded_self.get_data() | |
decoded = DecodedStreamObject() | |
decoded._data = self.redact(filters.decode_stream_data(self)) | |
for key, value in list(self.items()): | |
if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): | |
decoded[key] = value | |
self.decoded_self = decoded | |
return decoded._data | |
def calculate_redacted_filename(filename): | |
name, _, _ = filename.rpartition('.') | |
return f'{name}_redacted.pdf' | |
def redact_and_save(filepath): | |
parent_path, _, filename = filepath.rpartition(os.sep) | |
reader = PdfReader(open(filepath, "rb")) | |
writer = PdfWriter() | |
for index, page in enumerate(reader.pages): | |
try: | |
# Force the redaction by merging with itself | |
page.merge_page(reader.pages[index]) | |
LOGGER.debug(f'Redacted page number :{index}') | |
except pypdf.errors.PdfStreamError: | |
LOGGER.warning(f'Errors trying to redact page :{index}') | |
try: | |
LOGGER.debug(f'Adding page number :{index} to output file.') | |
writer.add_page(page) | |
except pypdf.errors.PdfReadError: | |
LOGGER.warning(f'Errors trying to add page :{index} to output file.') | |
try: | |
output_file = open(calculate_redacted_filename(filename), "wb") | |
writer.write(output_file) | |
output_file.close() | |
except Exception: | |
LOGGER.exception('Something broke.') | |
return 1 | |
return 0 | |
def manage_arguments(arguments): | |
if not 2 < len(arguments) <= 4: | |
LOGGER.error('Usage: redact.py pdf_file_path.pdf "pipe|delimited|text to replace" "OPTIONAL_LOGGING_LEVEL"') | |
raise SystemExit(1) | |
_, filename_path, *rest = arguments | |
if len(rest) == 1: | |
phrases = rest.pop() | |
logging_level = 'info' | |
else: | |
phrases, logging_level = rest | |
return filename_path, phrases.split('|'), logging_level | |
def main(arguments): | |
filename_path, phrases, logging_level = manage_arguments(arguments) | |
coloredlogs.install(level=getattr(logging, logging_level.upper())) | |
EncodedStreamObject.phrases = phrases | |
pypdf.generic._data_structures.EncodedStreamObject = EncodedStreamObject # noqa | |
result = redact_and_save(filename_path) | |
LOGGER.info(f'Found {EncodedStreamObject.matches} occurrences and replaced them.') | |
return result | |
if __name__ == '__main__': | |
raise SystemExit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment