Created
October 17, 2024 14:37
-
-
Save vanossj/0ac53c843f0be0f6932e28a12c47de46 to your computer and use it in GitHub Desktop.
patches dicom files so that `itkimage2segimage` will stop complaining
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import re | |
import subprocess | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
import pydicom | |
class DcmError: | |
def __init__(self, msg): | |
tokens = msg.split(" - ") | |
if len(tokens) == 3: | |
self.t, self.path_to_attribute, self.message = tokens | |
self.value = None | |
elif len(tokens) == 4: | |
self.t, self.path_to_attribute, self.message, self.value = tokens | |
elif len(tokens) == 5: | |
self.t, self.path_to_attribute, a, b, self.value = tokens | |
self.message = f"{a} - {b}" | |
else: | |
raise ValueError(f"Error parsing error message: {msg}") | |
def __repr__(self): | |
return f"DcmError({self.path_to_attribute}, {self.message}, {self.value})" | |
class DcmBundle: | |
def __init__(self, dicom_path: Path): | |
self.dicom_path = dicom_path | |
self.ds = pydicom.dcmread(dicom_path) | |
# find errors | |
cmd = ["dciodvfy", "-new", str(dicom_path)] | |
proc = subprocess.run(cmd, capture_output=True) | |
errs = proc.stderr.decode("utf-8").splitlines() | |
self.errs = [DcmError(x) for x in errs if x.startswith("Error")] | |
def fix(self): | |
for err in self.errs: | |
self._fix(err) | |
def _fix(self, err: DcmError): | |
if err.path_to_attribute.startswith("</ProcedureCodeSequence(0008,1032)"): | |
# might have been removed already | |
if hasattr(self.ds, "ProcedureCodeSequence"): | |
del self.ds.ProcedureCodeSequence # remove the sequence | |
elif err.path_to_attribute == "</Laterality(0020,0060)>": | |
self.ds.Laterality = "" | |
elif err.path_to_attribute == "</PatientSex(0010,0040)[1]>": | |
self.ds.PatientSex = "" | |
elif err.path_to_attribute == "</PatientAge(0010,1010)[1]>": | |
try: | |
self.ds.PatientAge = f"{int(self.ds.PatientAge):03d}Y" | |
except ValueError: | |
del self.ds.PatientAge | |
elif err.path_to_attribute == "</PregnancyStatus(0010,21c0)>": | |
del self.ds.PregnancyStatus | |
elif err.path_to_attribute == "</Manufacturer(0008,0070)>": | |
self.ds.Manufacturer = "" | |
elif err.path_to_attribute == "</ImageType(0008,0008)>": | |
if self.ds.Modality == "CT": | |
self.ds.ImageType = "" | |
else: | |
raise NotImplementedError( | |
f"fix for {err.path_to_attribute} not implemented for modality: {self.ds.Modality}" | |
) | |
elif ( | |
err.path_to_attribute | |
== "</LongitudinalTemporalInformationModified(0028,0303)>" | |
): | |
if hasattr(self.ds, "LongitudinalTemporalInformationModified"): | |
del self.ds.LongitudinalTemporalInformationModified | |
elif err.path_to_attribute.startswith("</RequestAttributesSequence(0040,0275)"): | |
if hasattr(self.ds, "RequestAttributesSequence"): | |
del self.ds.RequestAttributesSequence | |
elif err.path_to_attribute == "</ClinicalTrialSponsorName(0012,0010)>": | |
self.ds.ClinicalTrialSponsorName = "UNKNOWN" | |
elif err.path_to_attribute == "</ClinicalTrialSubjectID(0012,0040)>": | |
self.ds.ClinicalTrialSubjectID = "UNKNOWN" | |
elif err.path_to_attribute == "</ClinicalTrialSubjectReadingID(0012,0042)>": | |
self.ds.ClinicalTrialSubjectReadingID = "UNKNOWN" | |
elif err.path_to_attribute.startswith( | |
"</PatientOrientationCodeSequence(0054,0410)[1]" | |
): | |
self.ds.PatientOrientationCodeSequence = [] | |
elif ( | |
err.path_to_attribute == "</PatientPosition(0018,5100)>" | |
and err.message | |
== "Shall not be present when PatientOrientationCodeSequence is present" | |
and hasattr(self.ds, "PatientOrientationCodeSequence") | |
and len(self.ds.PatientOrientationCodeSequence) > 0 | |
): | |
del self.ds.PatientPosition | |
elif ( | |
err.path_to_attribute == "</NumberOfTimeSlices(0054,0101)>" | |
and err.message | |
== "Attribute present when condition unsatisfied (which may not be present otherwise) for Type 1C Conditional" | |
): | |
del self.ds.NumberOfTimeSlices | |
elif ( | |
err.path_to_attribute == "</FrameTime(0018,1063)>" | |
and err.message | |
== "Attribute present when condition unsatisfied (which may not be present otherwise) for Type 1C Conditional" | |
): | |
del self.ds.FrameTime | |
elif ( | |
err.path_to_attribute == "</TriggerTime(0018,1060)>" | |
and err.message | |
== "Attribute present when condition unsatisfied (which may not be present otherwise) for Type 1C Conditional" | |
): | |
del self.ds.TriggerTime | |
elif err.path_to_attribute.startswith( | |
"</AcquisitionContextSequence(0040,0555)" | |
): | |
if hasattr(self.ds, "AcquisitionContextSequence"): | |
del self.ds.AcquisitionContextSequence | |
elif err.path_to_attribute.startswith( | |
"</RadiopharmaceuticalInformationSequence(0054,0016)[1]/RadiopharmaceuticalCodeSequence(0054,0304)" | |
): | |
if hasattr(self.ds, "RadiopharmaceuticalInformationSequence") and hasattr( | |
self.ds.RadiopharmaceuticalInformationSequence[0], | |
"RadiopharmaceuticalCodeSequence", | |
): | |
del self.ds.RadiopharmaceuticalInformationSequence[ | |
0 | |
].RadiopharmaceuticalCodeSequence | |
elif err.path_to_attribute == "</DecayFactor(0054,1321)>": | |
# fake data is not ideal, but since we are using this for segmentation dicom metadata, it should be fine | |
self.ds.DecayFactor = 1.0 | |
elif err.path_to_attribute == "</RescaleSlope(0028,1053)[1]>": | |
# fake data is not ideal, but since we are using this for segmentation dicom metadata, it should be fine | |
self.ds.RescaleSlope = 1.0 | |
elif err.path_to_attribute == "</PhotometricInterpretation(0028,0004)[1]>": | |
# value shouldn't matter since this is just a reference for the segmentation dicom | |
self.ds.PhotometricInterpretation = "MONOCHROME2" | |
# troublesome private tags | |
elif err.path_to_attribute == '</(0013,1010,"CTP")>': | |
del self.ds[0x0013, 0x1010] | |
elif m := re.match( | |
r'<\/\(([\da-f]{4}),([\da-f]{4}),"GEMS_PETD_01"\)>', err.path_to_attribute | |
): | |
a = int(m.group(1), 16) | |
b = int(m.group(2), 16) | |
del self.ds[a, b] | |
else: | |
raise NotImplementedError( | |
f"fix for '{err.t} - {err.path_to_attribute} - {err.message} - {err.value}' not implemented for modality: {self.ds.Modality}" | |
) | |
def test(self): | |
with TemporaryDirectory() as tempdir: | |
tempdir = Path(tempdir) | |
tempdir.mkdir(exist_ok=True) | |
temp_dcm = tempdir / "temp.dcm" | |
self.ds.save_as(temp_dcm) | |
cmd = ["dciodvfy", "-new", str(temp_dcm)] | |
proc = subprocess.run(cmd, capture_output=True) | |
errs = proc.stderr.decode("utf-8").split("\n") | |
errs = [DcmError(x) for x in errs if x.startswith("Error")] | |
return errs | |
def fix_dicom_dir(dicom_dir: Path, output_dir: Path): | |
"""Scan the dicom files in dicom_dir and check with dciodvfy for errors. If any errors are found, attempt to fix them and write the fixed dicom files to output_dir. | |
Args: | |
dicom_dir (Path): input directory of dicom files | |
output_dir (Path): writes fixed dicom files to this directory if any fixes are required | |
Returns: | |
_type_: output_dir if any fixes were required, otherwise dicom_dir | |
""" | |
dcm_files = [x for x in dicom_dir.rglob("*") if pydicom.misc.is_dicom(x)] | |
dss = [DcmBundle(dcm_file) for dcm_file in dcm_files] | |
error_cnt = sum([len(x.errs) for x in dss]) | |
if error_cnt == 0: | |
return dicom_dir | |
for x in dss: | |
out_file = output_dir / x.dicom_path.relative_to(dicom_dir) | |
x.fix() | |
if len(x.errs): | |
out_file.parent.mkdir(exist_ok=True, parents=True) | |
x.ds.save_as(out_file) | |
return output_dir | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Uses 'dciodvfy' to check dicom files for non-compliance and attempts to *fix* them enough that 'itkimage2segimage' won't complain" | |
) | |
parser.add_argument("dicom_dir", type=Path) | |
parser.add_argument("output_dir", type=Path) | |
args = parser.parse_args() | |
fix_dicom_dir(args.dicom_dir, args.output_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment