-
assumes that the cases with single-file sequences all have the same series description
-
assumes that the patient IDs are unique for all cases in the folder
-
intention is to extend to folders with mixed file types
Created
January 22, 2023 11:42
-
-
Save zemmyang/f2d7ac4130ca813ecfb3a824cb2ad9d8 to your computer and use it in GitHub Desktop.
Sort DICOM files with sequence descriptions + selected metadata and save as CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
from pathlib import Path | |
import SimpleITK as sitk | |
from enum import Enum | |
from tqdm import tqdm | |
class DICOMTag(Enum): | |
Unspecified = 0xFFFF, 0xFFFF | |
SeriesDescription = 0x0008, 0x103e | |
SeriesInstanceUID = 0x0020, 0x000E | |
SeriesNumber = 0x0020, 0x0011 | |
PatientID = 0x0010, 0x0020 | |
FileSetID = 0x0004, 0x1130 | |
StudyTime = 0x0008, 0x0030 | |
StudyDescription = 0x0008, 0x1030 | |
AcquisitionDate = 0x0008, 0x0022 | |
AcquisitionTime = 0x0008, 0x0032 | |
PatientName = 0x0010, 0x0010 | |
PatientAge = 0x0010, 0x1010 | |
PatientSex = 0x0010, 0x0040 | |
PatientPosition = 0x0018, 0x5100 | |
Manufacturer = 0x0008, 0x0070 | |
ManufacturerModelName = 0x0008, 0x1090 | |
ProtocolName = 0x0018, 0x1030 | |
@staticmethod | |
def from_sitk_string(sitk_string: str): | |
tag1, tag2 = sitk_string.split("|") | |
try: | |
tag = DICOMTag((int(tag1, 16), int(tag2, 16))) | |
except ValueError: | |
tag = DICOMTag.Unspecified | |
return tag | |
class SequenceType(Enum): | |
DICOM = "DICOM" | |
class Sequence: | |
__slots__ = ( | |
"filetype", | |
"files", | |
"metadata" | |
) | |
@property | |
def image_orientation(self): | |
return "" | |
def get_sitk_image(self): | |
pass | |
def get_numpy_array(self): | |
pass | |
def __repr__(self): | |
return f"{self.filetype} with {len(self.files)} | {self.metadata}" | |
class Case: | |
__slots__ = ( | |
"metadata", | |
"files", | |
"sequences" | |
) | |
def __repr__(self): | |
return f"Case with {len(self.files)} files, {len(self.sequences)} sequences | {self.metadata}" | |
class Database: | |
__slots__ = ( | |
"root_folder", | |
"cases", | |
"_verbose", | |
"_pbar" | |
) | |
def __init__(self, root_folder: str | Path, **kwargs): | |
self.root_folder = Path(root_folder) if isinstance(root_folder, str) else root_folder | |
assert self.root_folder.is_dir(), "Root folder provided is not a directory" | |
self._verbose = kwargs["verbose"] if "verbose" in kwargs else False | |
self._pbar = kwargs["pbar"] if "pbar" in kwargs else False | |
self.cases: List[Case] = [] | |
self._read_dicom_cases( | |
sort_by=kwargs["sort_dicom_by"] if "sort_dicom_by" in kwargs else DICOMTag.PatientID | |
) | |
if self._verbose: | |
print("Reading sequences") | |
pbar = tqdm(self.cases) if self._pbar else self.cases | |
for case in pbar: | |
self._read_dicom_sequences(case) | |
def _read_dicom_cases(self, sort_by: DICOMTag): | |
files: List[Path] = [f for f in self.root_folder.rglob("*")] | |
if self._verbose: | |
print("Identifying DICOM cases") | |
cases_found = {} | |
pbar = tqdm(files) if self._pbar else files | |
for f in pbar: | |
if f.is_dir(): | |
continue | |
else: | |
for k, v in self.get_metadata_dict_from_itk(str(f), verbose=self._verbose).items(): | |
if DICOMTag.from_sitk_string(k) == sort_by: | |
if v not in cases_found: | |
cases_found[v] = [] | |
cases_found[v].append(f) | |
if self._verbose: | |
print("Reading case metadata") | |
pbar = tqdm(enumerate(cases_found.items())) if self._pbar else enumerate(cases_found.items()) | |
for idx, (case_sorter, files) in pbar: | |
c = Case() | |
c.files = files | |
c.metadata = {} | |
for k, v in self.get_metadata_dict_from_itk(str(files[0])).items(): | |
tag = DICOMTag.from_sitk_string(k) | |
if tag != DICOMTag.Unspecified: | |
if not tag.name.startswith("Series"): | |
c.metadata[tag] = v | |
self.cases.append(c) | |
def _read_dicom_sequences(self, case: Case): | |
case.sequences = [] | |
sequences_found = {} | |
series_uid = None | |
series_desc, series_num = None, None | |
for file in case.files: | |
for k, v in self.get_metadata_dict_from_itk(str(file)).items(): | |
match DICOMTag.from_sitk_string(k): | |
case DICOMTag.SeriesInstanceUID: | |
series_uid = v | |
case DICOMTag.SeriesNumber: | |
series_num = v | |
case DICOMTag.SeriesDescription: | |
series_desc = v | |
ids = (series_uid, series_num, series_desc) | |
if ids not in sequences_found: | |
sequences_found[ids] = [] | |
sequences_found[ids].append(file) | |
sequences_with_one_file = [] | |
for _, files in sequences_found.items(): | |
if len(files) > 1: | |
seq = Sequence() | |
seq.filetype = SequenceType.DICOM | |
seq.files = files | |
seq.metadata = {} | |
for k, v in self.get_metadata_dict_from_itk(str(files[0])).items(): | |
tag = DICOMTag.from_sitk_string(k) | |
if tag.name.startswith("Series"): | |
seq.metadata[tag] = v | |
case.sequences.append(seq) | |
else: | |
sequences_with_one_file.append(files) | |
if len(sequences_with_one_file): | |
if self._verbose: | |
print(f"Single-file sequences found with {len(sequences_with_one_file)} files") | |
seq = Sequence() | |
seq.filetype = SequenceType.DICOM | |
seq.files = sequences_with_one_file | |
seq.metadata = {DICOMTag.Unspecified: "Single-file sequence"} | |
for k, v in self.get_metadata_dict_from_itk(str(sequences_with_one_file[0])).items(): | |
tag = DICOMTag.from_sitk_string(k) | |
if tag.name.startswith("Series"): | |
seq.metadata[tag] = v | |
case.sequences.append(seq) | |
@staticmethod | |
def get_metadata_dict_from_itk(f, verbose: bool = False): | |
out = {} | |
try: | |
file_reader = sitk.ImageFileReader() | |
file_reader.SetFileName(str(f)) | |
file_reader.ReadImageInformation() | |
except RuntimeError as runtime_error: | |
if "Unable to determine ImageIO reader" in str(runtime_error): | |
if verbose: | |
print(f"sitk cannot read file {f}") | |
except Exception as e: | |
if verbose: | |
print(f"{f} error: {e}") | |
else: | |
out = {k: file_reader.GetMetaData(k) for k in file_reader.GetMetaDataKeys()} | |
return out | |
def generate_csv_report(self, file_destination: str | Path): | |
import csv | |
with open(file_destination, 'w', newline='') as csv_file: | |
# populate field names | |
field_names = ["RelativePath", "NumberOfSequences", | |
"(SeriesDescription, SeriesNumber, NumberOfFiles)"] \ | |
+ [k.name for k in self.cases[0].metadata.keys()] | |
writer = csv.DictWriter(csv_file, fieldnames=field_names) | |
writer.writeheader() | |
for c in self.cases: | |
clean_dict = { | |
"RelativePath": set([str(f.parent.relative_to(self.root_folder)) for f in c.files]), | |
"NumberOfSequences": len(c.sequences) | |
} | |
# metadata | |
clean_dict |= {k.name: v for k, v in c.metadata.items()} | |
# sequence data | |
clean_dict |= { | |
"(SeriesDescription, SeriesNumber, NumberOfFiles)": [( | |
s.metadata[DICOMTag.SeriesDescription] if DICOMTag.SeriesDescription in s.metadata else "", | |
s.metadata[DICOMTag.SeriesNumber] if DICOMTag.SeriesNumber in s.metadata else "", | |
len(s.files)) for s in c.sequences] | |
} | |
writer.writerow(clean_dict) | |
if __name__ == "__main__": | |
ROOT_FOLDER = r"E:\DICOM_FILES" | |
db = Database(ROOT_FOLDER, pbar=True) | |
db.generate_csv_report(r"E:\out.csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment