Skip to content

Instantly share code, notes, and snippets.

Created May 24, 2023 16:17
Show Gist options
  • Save EtsuNDmA/55ec5f998d79510469d2fbe7e1c0d524 to your computer and use it in GitHub Desktop.
Save EtsuNDmA/55ec5f998d79510469d2fbe7e1c0d524 to your computer and use it in GitHub Desktop.
import argparse
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import timedelta
from functools import partial
from pathlib import Path
import pydicom
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
logger = logging.getLogger("anonymizer")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
help="path to the directory with dicom files",
help="path to the directory fro anonymized files",
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("-c", "--concurrency", help="number of processes to run in parallel", type=int, default=None)
return parser.parse_args()
def configure_logging(is_verbose: bool = False):
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)-15s %(levelname)-8s %(message)s"))
logger.setLevel(level=logging.DEBUG if is_verbose else logging.INFO)
def scan_input_dir_or_path(path_to_dicoms: Path) -> list[Path]:
dicom_paths: list[Path] = []
if path_to_dicoms.is_dir():"Scanning dicoms in %s", path_to_dicoms)
for dir_path, _, filenames in os.walk(path_to_dicoms):
for filename in filenames:
if filename.endswith(".dcm"):
dicom_paths.append(Path(dir_path) / filename)
if path_to_dicoms.suffix == ".dcm":
dicom_paths.append(path_to_dicoms)"Found %s files", len(dicom_paths))
return dicom_paths
def _anonymize(dataset):
dataset.PatientID = "ANON"
dataset.PatientName = "ANON"
dataset.PatientBirthDate = "19000101"
if "OtherPatientIDs" in dataset:
del dataset.OtherPatientIDs
if "OtherPatientIDsSequence" in dataset:
del dataset.OtherPatientIDsSequence
def anonymize(filename: Path, input_dicom_dir: Path, output_dicom_dir: Path):
dataset = pydicom.dcmread(filename)
output_filename = output_dicom_dir / filename.relative_to(input_dicom_dir)
Path(output_filename.parent).mkdir(parents=True, exist_ok=True)
def main(args: argparse.Namespace) -> None:
tic = time.monotonic()
input_dicom_dir = args.input_dicom_dir
output_dicom_dir = args.output_dicom_dir
dicom_paths = scan_input_dir_or_path(input_dicom_dir)
with ProcessPoolExecutor(args.concurrency) as executor:
partial(anonymize, input_dicom_dir=input_dicom_dir, output_dicom_dir=output_dicom_dir), dicom_paths
toc = time.monotonic()"Anonymized dicoms saved to %s", output_dicom_dir)
logger.debug("Anonymization of %s files finished at %s", len(dicom_paths), timedelta(seconds=toc - tic))
if __name__ == "__main__":
args = parse_args()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment