Last active
August 7, 2020 17:29
-
-
Save dgobbi/c5e9bfbe0459e3ca0471bbaaec35d4c5 to your computer and use it in GitHub Desktop.
Parse the output from dicomdump and write out as json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
This program reads the output from "dicomdump" and converts it to json. | |
The latest version of this code can be found at gist.github.com/dgobbi | |
Note that this code is incomplete, incorrect, and may destroy your data. | |
It comes with absolutely no warranties. Use at your own risk. | |
""" | |
import argparse | |
import sys | |
import re | |
import json | |
import collections | |
# the python dict type we want to use is "OrderedDict" | |
dicttype = collections.OrderedDict | |
# regular expressions for parsing dicomdump output | |
re_dataset = re.compile("==== (.*) ====") | |
re_data = re.compile("( *)\\((....),(....)\\) (..|) \"([^\"]*)\" : \\[(.*)\\] (\\{[^}]*\\}|) *\\(([^)]*)\\)") | |
re_multi = re.compile("( *)\\((....),(....)\\) (..|) \"([^\"]*)\" : \\(multiple values\\)") | |
re_instance = re.compile("( *) 0*([0-9]*) \\[(.*)\\] (\\{[^}]*\\}|) *\\(([^)]*)\\)") | |
re_instance_sq = re.compile("( *) 0*([0-9]*) \\(([0-9]*) item[s]?([^)]*)\\)") | |
re_sequence = re.compile("( *)\\((....),(....)\\) (..|) \"([^\"]*)\" : \\(([0-9]*) item[s]?([^)]*)\\)") | |
re_item = re.compile("( *)---- SQ Item 0*([0-9]*) at offset ([0-9]*) ----") | |
re_mismatch = re.compile("( *)VR mismatch! (..|) != (..) (.*)") | |
re_indent = re.compile("( *).*") | |
def build_value(vr, vl, value): | |
"""Convert an attribute value from a dicomdump file to DICOM's json format. | |
""" | |
# if VL is zero, then no value is given | |
if vl == '0 bytes': | |
return dicttype([("vr", vr)]) | |
# if bulk data, use empty BulkDataURI (TODO: InlineBinary) | |
if vr in ['OB', 'OD', 'OF', 'OL', 'OV', 'OW']: | |
return dicttype([("vr", vr), ("BulkDataURI", "")]) | |
# these text VRs are always single-valued | |
if vr in ['LT', 'ST', 'UT']: | |
value_list = [ value ] | |
# for AT, convert dicomdump sytax to DICOM json syntax | |
elif vr == 'AT': | |
value_list = [] | |
for ptr in value.split('\\'): | |
value_list.append(ptr[1:5]+ptr[6:10]) | |
# for integers, convert to int | |
elif vr in ['IS', 'SS', 'US', 'SL', 'UL', 'SV', 'UV']: | |
value_list = [] | |
for v in value.split('\\'): | |
try: | |
value_list.append(int(v)) | |
except ValueError: | |
# TODO: warn | |
pass | |
# for decimal, convert to float (inexact) | |
elif vr in ['DS', 'FL', 'FD']: | |
value_list = [] | |
for v in value.split('\\'): | |
try: | |
value_list.append(float(v)) | |
except ValueError: | |
# TODO: warn | |
pass | |
# for PN, handle "Alphabetic", "Ideographic", "Phonetic" groups | |
elif vr == 'PN': | |
value_list = [] | |
for name in value.split('\\'): | |
name_attrs = {} | |
parts = name.split('=') | |
name_attrs['Alphabetic'] = parts[0] | |
if len(parts) > 1: | |
name_attrs['Ideographic'] = parts[1] | |
if len(parts) > 2: | |
name_attrs['Phonetic'] = parts[2] | |
value_list.append(name_attrs) | |
# for all other VRs | |
else: | |
value_list = value.split('\\') | |
# replace any empty values with null | |
for i in range(len(value_list)): | |
if value_list[i] == "": | |
value_list[i] = None | |
return dicttype([("vr", vr), ("Value", value_list)]) | |
def handle_instances(sequence, instances): | |
"""Handle "multiple values" by recreating multiple datasets. | |
""" | |
if instances: | |
# make copies of last sequence | |
last_dataset = sequence[-1] | |
sequence.pop() | |
n = 0 | |
for tag in instances: | |
n = max(n, len(instances[tag])) | |
for i in range(n): | |
dataset = dicttype(last_dataset) | |
for tag in instances: | |
try: | |
dataset[tag] = instances[tag][i] | |
except IndexError: | |
# TODO: warning | |
pass | |
sequence.append(dataset) | |
instances.clear() | |
def skip_tag(tag): | |
"""Returns True for tags that should be skipped. | |
""" | |
# group length tags | |
if tag[-4:] == '0000': | |
return True | |
# tags in group 0002, 0004, etc | |
elif tag[0:4] < '0008': | |
return True | |
return False | |
def read_dicomdump(lines): | |
"""Parse a dicomdump file that has been read with "readlines". | |
""" | |
# a sequence of datasets will be read (usually just one) | |
sequence = [] | |
dataset = None | |
# a stack is needed for handling the depth of the tree | |
stack = [] | |
# for dicomdump's "multiple values" across a series | |
instance_tag = None | |
instance_vr = None | |
instances = {} | |
# for dealing with a bug in dicomdump for series where | |
# the first dataset is missing elements | |
vr_mismatch = ("", "") | |
# go through the dump line-by-line | |
for line in lines: | |
line = line.rstrip() | |
# empty line: ignore | |
if len(line) == 0: | |
continue | |
# mismatched VR warning: ignore | |
m = re_mismatch.match(line) | |
if m: | |
groups = m.groups() | |
vr_mismatch = (groups[1], groups[2]) | |
continue | |
# check the indentation, which indicates depth | |
if re_instance.match(line): | |
# always at the root, depth of zero | |
depth = 0 | |
elif re_instance_sq.match(line): | |
# always within a sequence at the root, hence depth is 1 | |
depth = 1 | |
else: | |
# the depth is given by the indentation | |
depth = len(re_indent.match(line).group(1))/2 | |
# check for extra indentation that isn't in a sequence | |
if depth > len(stack): | |
sys.stderr.write("Improper indentation:\n" + line + "\n") | |
continue | |
# check for decreasing indentation (marks end of a block) | |
while len(stack) > depth: | |
sequence, dataset = stack[-1] | |
stack.pop() | |
# new dataset (indicated by "====" in the file) | |
m = re_dataset.match(line) | |
if m: | |
handle_instances(sequence, instances) | |
# start a fresh dataset | |
dataset = dicttype() | |
sequence.append(dataset) | |
continue | |
# new item (indicated by "----" in the file) | |
m = re_item.match(line) | |
if m: | |
dataset = dicttype() | |
sequence.append(dataset) | |
continue | |
# sequence value (increase depth) | |
m = re_sequence.match(line) | |
if m: | |
groups = m.groups() | |
tag = "".join(groups[1:3]).upper() | |
vr = groups[3] | |
stack.append((sequence, dataset)) | |
sequence = [] | |
dataset[tag] = dicttype([("vr", vr), ("Value", sequence)]) | |
dataset = None | |
continue | |
# any other value | |
m = re_data.match(line) | |
if m: | |
groups = m.groups() | |
tag = "".join(groups[1:3]).upper() | |
# skip group length tags | |
if skip_tag(tag): | |
continue | |
keyword = groups[4] | |
vr = groups[3] | |
vl = groups[7] | |
value = groups[5] | |
dataset[tag] = build_value(vr, vl, value) | |
continue | |
# ---- | |
# special code for the dicomdump "multiple values" lines | |
m = re_multi.match(line) | |
if m: | |
groups = m.groups() | |
instance_tag = "".join(groups[1:3]).upper() | |
instance_vr = groups[3] | |
if instance_vr == "" and vr_mismatch[0] == "": | |
instance_vr = vr_mismatch[1] | |
# skip group length tags | |
if skip_tag(instance_tag): | |
continue | |
dataset[instance_tag] = dicttype([("vr", instance_vr)]) | |
instances[instance_tag] = [] | |
if instance_vr == 'SQ': | |
stack.append((sequence, dataset)) | |
continue | |
# one instance of a "multiple value" attribute | |
m = re_instance.match(line) | |
if m: | |
groups = m.groups() | |
# skip group length tags | |
if skip_tag(instance_tag): | |
continue | |
value = groups[2] | |
vl = groups[4] | |
instances[instance_tag].append(build_value(instance_vr, vl, value)) | |
continue | |
# one instance of a "multiple value" attribute that is SQ | |
m = re_instance_sq.match(line) | |
if m: | |
sequence = [] | |
dataset = None | |
instances[instance_tag].append(dicttype([("vr", instance_vr), ("Value", sequence)])) | |
continue | |
# none of the regular expressions matched! | |
sys.stderr.write("Unrecognized syntax:\n" + line + "\n") | |
# at end of dump, pop back to root | |
while len(stack) > 0: | |
sequence, dataset = stack[-1] | |
stack.pop() | |
# change out "multiple value" data elements into a series of datasets | |
handle_instances(sequence, instances) | |
return sequence | |
def main(): | |
parser = argparse.ArgumentParser(description="Read dicomdump output.") | |
parser.add_argument('input', help="Input file (DICOM).") | |
parser.add_argument('-o', '--output', required=False, | |
help="Output file (json).") | |
args = parser.parse_args() | |
with open(args.input) as f: | |
tree = read_dicomdump(f.readlines()) | |
json_opts = { | |
"indent" : 2, | |
"separators" : (",", " : "), | |
} | |
if args.output: | |
with open(args.output, 'w') as f: | |
json.dump(tree, f, **json_opts) | |
else: | |
json.dump(tree, sys.stdout, **json_opts) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment