Skip to content

Instantly share code, notes, and snippets.

@zemmyang
Created January 22, 2023 11:42
Show Gist options
  • Save zemmyang/f2d7ac4130ca813ecfb3a824cb2ad9d8 to your computer and use it in GitHub Desktop.
Save zemmyang/f2d7ac4130ca813ecfb3a824cb2ad9d8 to your computer and use it in GitHub Desktop.
Sort DICOM files with sequence descriptions + selected metadata and save as CSV
from typing import List
from pathlib import Path
import SimpleITK as sitk
from enum import Enum
from tqdm import tqdm
class DICOMTag(Enum):
Unspecified = 0xFFFF, 0xFFFF
SeriesDescription = 0x0008, 0x103e
SeriesInstanceUID = 0x0020, 0x000E
SeriesNumber = 0x0020, 0x0011
PatientID = 0x0010, 0x0020
FileSetID = 0x0004, 0x1130
StudyTime = 0x0008, 0x0030
StudyDescription = 0x0008, 0x1030
AcquisitionDate = 0x0008, 0x0022
AcquisitionTime = 0x0008, 0x0032
PatientName = 0x0010, 0x0010
PatientAge = 0x0010, 0x1010
PatientSex = 0x0010, 0x0040
PatientPosition = 0x0018, 0x5100
Manufacturer = 0x0008, 0x0070
ManufacturerModelName = 0x0008, 0x1090
ProtocolName = 0x0018, 0x1030
@staticmethod
def from_sitk_string(sitk_string: str):
tag1, tag2 = sitk_string.split("|")
try:
tag = DICOMTag((int(tag1, 16), int(tag2, 16)))
except ValueError:
tag = DICOMTag.Unspecified
return tag
class SequenceType(Enum):
DICOM = "DICOM"
class Sequence:
__slots__ = (
"filetype",
"files",
"metadata"
)
@property
def image_orientation(self):
return ""
def get_sitk_image(self):
pass
def get_numpy_array(self):
pass
def __repr__(self):
return f"{self.filetype} with {len(self.files)} | {self.metadata}"
class Case:
__slots__ = (
"metadata",
"files",
"sequences"
)
def __repr__(self):
return f"Case with {len(self.files)} files, {len(self.sequences)} sequences | {self.metadata}"
class Database:
__slots__ = (
"root_folder",
"cases",
"_verbose",
"_pbar"
)
def __init__(self, root_folder: str | Path, **kwargs):
self.root_folder = Path(root_folder) if isinstance(root_folder, str) else root_folder
assert self.root_folder.is_dir(), "Root folder provided is not a directory"
self._verbose = kwargs["verbose"] if "verbose" in kwargs else False
self._pbar = kwargs["pbar"] if "pbar" in kwargs else False
self.cases: List[Case] = []
self._read_dicom_cases(
sort_by=kwargs["sort_dicom_by"] if "sort_dicom_by" in kwargs else DICOMTag.PatientID
)
if self._verbose:
print("Reading sequences")
pbar = tqdm(self.cases) if self._pbar else self.cases
for case in pbar:
self._read_dicom_sequences(case)
def _read_dicom_cases(self, sort_by: DICOMTag):
files: List[Path] = [f for f in self.root_folder.rglob("*")]
if self._verbose:
print("Identifying DICOM cases")
cases_found = {}
pbar = tqdm(files) if self._pbar else files
for f in pbar:
if f.is_dir():
continue
else:
for k, v in self.get_metadata_dict_from_itk(str(f), verbose=self._verbose).items():
if DICOMTag.from_sitk_string(k) == sort_by:
if v not in cases_found:
cases_found[v] = []
cases_found[v].append(f)
if self._verbose:
print("Reading case metadata")
pbar = tqdm(enumerate(cases_found.items())) if self._pbar else enumerate(cases_found.items())
for idx, (case_sorter, files) in pbar:
c = Case()
c.files = files
c.metadata = {}
for k, v in self.get_metadata_dict_from_itk(str(files[0])).items():
tag = DICOMTag.from_sitk_string(k)
if tag != DICOMTag.Unspecified:
if not tag.name.startswith("Series"):
c.metadata[tag] = v
self.cases.append(c)
def _read_dicom_sequences(self, case: Case):
case.sequences = []
sequences_found = {}
series_uid = None
series_desc, series_num = None, None
for file in case.files:
for k, v in self.get_metadata_dict_from_itk(str(file)).items():
match DICOMTag.from_sitk_string(k):
case DICOMTag.SeriesInstanceUID:
series_uid = v
case DICOMTag.SeriesNumber:
series_num = v
case DICOMTag.SeriesDescription:
series_desc = v
ids = (series_uid, series_num, series_desc)
if ids not in sequences_found:
sequences_found[ids] = []
sequences_found[ids].append(file)
sequences_with_one_file = []
for _, files in sequences_found.items():
if len(files) > 1:
seq = Sequence()
seq.filetype = SequenceType.DICOM
seq.files = files
seq.metadata = {}
for k, v in self.get_metadata_dict_from_itk(str(files[0])).items():
tag = DICOMTag.from_sitk_string(k)
if tag.name.startswith("Series"):
seq.metadata[tag] = v
case.sequences.append(seq)
else:
sequences_with_one_file.append(files)
if len(sequences_with_one_file):
if self._verbose:
print(f"Single-file sequences found with {len(sequences_with_one_file)} files")
seq = Sequence()
seq.filetype = SequenceType.DICOM
seq.files = sequences_with_one_file
seq.metadata = {DICOMTag.Unspecified: "Single-file sequence"}
for k, v in self.get_metadata_dict_from_itk(str(sequences_with_one_file[0])).items():
tag = DICOMTag.from_sitk_string(k)
if tag.name.startswith("Series"):
seq.metadata[tag] = v
case.sequences.append(seq)
@staticmethod
def get_metadata_dict_from_itk(f, verbose: bool = False):
out = {}
try:
file_reader = sitk.ImageFileReader()
file_reader.SetFileName(str(f))
file_reader.ReadImageInformation()
except RuntimeError as runtime_error:
if "Unable to determine ImageIO reader" in str(runtime_error):
if verbose:
print(f"sitk cannot read file {f}")
except Exception as e:
if verbose:
print(f"{f} error: {e}")
else:
out = {k: file_reader.GetMetaData(k) for k in file_reader.GetMetaDataKeys()}
return out
def generate_csv_report(self, file_destination: str | Path):
import csv
with open(file_destination, 'w', newline='') as csv_file:
# populate field names
field_names = ["RelativePath", "NumberOfSequences",
"(SeriesDescription, SeriesNumber, NumberOfFiles)"] \
+ [k.name for k in self.cases[0].metadata.keys()]
writer = csv.DictWriter(csv_file, fieldnames=field_names)
writer.writeheader()
for c in self.cases:
clean_dict = {
"RelativePath": set([str(f.parent.relative_to(self.root_folder)) for f in c.files]),
"NumberOfSequences": len(c.sequences)
}
# metadata
clean_dict |= {k.name: v for k, v in c.metadata.items()}
# sequence data
clean_dict |= {
"(SeriesDescription, SeriesNumber, NumberOfFiles)": [(
s.metadata[DICOMTag.SeriesDescription] if DICOMTag.SeriesDescription in s.metadata else "",
s.metadata[DICOMTag.SeriesNumber] if DICOMTag.SeriesNumber in s.metadata else "",
len(s.files)) for s in c.sequences]
}
writer.writerow(clean_dict)
if __name__ == "__main__":
ROOT_FOLDER = r"E:\DICOM_FILES"
db = Database(ROOT_FOLDER, pbar=True)
db.generate_csv_report(r"E:\out.csv")
  • assumes that the cases with single-file sequences all have the same series description

  • assumes that the patient IDs are unique for all cases in the folder

  • intention is to extend to folders with mixed file types

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment