Skip to content

Instantly share code, notes, and snippets.

@EtsuNDmA
Created October 28, 2022 09:58
Show Gist options
  • Save EtsuNDmA/dc23d8c1ef4d869cff123bdaa2ca58eb to your computer and use it in GitHub Desktop.
Save EtsuNDmA/dc23d8c1ef4d869cff123bdaa2ca58eb to your computer and use it in GitHub Desktop.
Simple script to anonymize dicoms in parallel
import argparse
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import timedelta
from functools import partial
from pathlib import Path
import pydicom
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
logger = logging.getLogger("anonymizer")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"input_dicom_dir",
type=Path,
help="path to the directory with dicom files",
)
parser.add_argument(
"output_dicom_dir",
type=Path,
help="path to the directory fro anonymized files",
)
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("-c", "--concurrency", help="number of processes to run in parallel", type=int, default=None)
return parser.parse_args()
def configure_logging(is_verbose: bool = False):
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)-15s %(levelname)-8s %(message)s"))
logger.addHandler(handler)
logger.setLevel(level=logging.DEBUG if is_verbose else logging.INFO)
def scan_input_dir_or_path(path_to_dicoms: Path) -> list[Path]:
dicom_paths: list[Path] = []
if path_to_dicoms.is_dir():
logger.info("Scanning dicoms in %s", path_to_dicoms)
for dir_path, _, filenames in os.walk(path_to_dicoms):
for filename in filenames:
if filename.endswith(".dcm"):
dicom_paths.append(Path(dir_path) / filename)
else:
if path_to_dicoms.suffix == ".dcm":
dicom_paths.append(path_to_dicoms)
logger.info("Found %s files", len(dicom_paths))
return dicom_paths
def _anonymize(dataset):
dataset.PatientID = "ANON"
dataset.PatientName = "ANON"
dataset.PatientBirthDate = "19000101"
if "OtherPatientIDs" in dataset:
del dataset.OtherPatientIDs
if "OtherPatientIDsSequence" in dataset:
del dataset.OtherPatientIDsSequence
def anonymize(filename: Path, input_dicom_dir: Path, output_dicom_dir: Path):
dataset = pydicom.dcmread(filename)
_anonymize(dataset)
output_filename = output_dicom_dir / filename.relative_to(input_dicom_dir)
Path(output_filename.parent).mkdir(parents=True, exist_ok=True)
dataset.save_as(output_filename)
def main(args: argparse.Namespace) -> None:
tic = time.monotonic()
input_dicom_dir = args.input_dicom_dir
output_dicom_dir = args.output_dicom_dir
dicom_paths = scan_input_dir_or_path(input_dicom_dir)
with ProcessPoolExecutor(args.concurrency) as executor:
executor.map(
partial(anonymize, input_dicom_dir=input_dicom_dir, output_dicom_dir=output_dicom_dir), dicom_paths
)
toc = time.monotonic()
logger.info("Anonymized dicoms saved to %s", output_dicom_dir)
logger.debug("Anonymization of %s files finished at %s", len(dicom_paths), timedelta(seconds=toc - tic))
if __name__ == "__main__":
args = parse_args()
configure_logging(args.verbose)
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment