Skip to content

Instantly share code, notes, and snippets.

@Yu-AnChen
Last active March 4, 2019 03:54
Show Gist options
  • Save Yu-AnChen/0466a29c1baed9ca18ec57d7ee022562 to your computer and use it in GitHub Desktop.
Save Yu-AnChen/0466a29c1baed9ca18ec57d7ee022562 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import csv
from subprocess import call
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
import argparse
import os
from bin_rcpnl_module import *
import warnings
from ashlar import reg, thumbnail
def text_to_bool(text):
return False \
if not str(text) \
else str(text).lower() in '1,yes,y,true,t'
def path_to_date(path):
return os.path.getmtime(str(path))
parser = argparse.ArgumentParser(
description='Read a csv config file and run ashlar'
)
parser.add_argument(
'csv_filepath', metavar='CSVFILE',
help='a csv file with header row: Directory, Correction, Pyramid'
)
parser.add_argument(
'-f', '--from-dir', dest='from_dir', default=0, type=int, metavar='FROM',
help=('starting directory; numbering starts at 0')
)
parser.add_argument(
'-t', '--to-dir', dest='to_dir', default=None, type=int, metavar='TO',
help=('ending directory; numbering starts at 0')
)
args = parser.parse_args()
csv_path = pathlib.Path(args.csv_filepath)
if not csv_path.is_file() or csv_path.suffix != '.csv':
print('A csv file is required to proceed.')
parser.print_usage()
parser.exit()
# EXTRA COMMAND
# cmd_str = 'xcopy Z:\\Connor\\Topacio_P2_AF\\ashlar\\0061\\*.rcpnl D:\\20190225_topocio2\\0061'
# print(cmd_str)
# call(cmd_str, shell=True)
# cmd_str = 'xcopy Z:\\Connor\\Topacio_P2_AF\\ashlar\\0058\\*.rcpnl D:\\20190225_topocio2\\0058'
# print(cmd_str)
# call(cmd_str, shell=True)
with open(str(csv_path)) as exp_config:
exp_config_reader = csv.DictReader(exp_config)
exps = [dict(row) for row in exp_config_reader]
for exp in exps[args.from_dir : args.to_dir]:
path_exp = pathlib.Path(exp['Directory'])
files_exp = sorted(path_exp.glob('*rcpnl'))
files_exp.sort(key=path_to_date)
if len(files_exp) == 0:
print('No rcpnl files found in', str(path_exp))
continue
print('Processing files in', str(path_exp))
print('\r Looking for metadata.xml')
xml_paths = path_exp.glob('*metadata.xml')
xml_path = next(xml_paths, False)
if not xml_path:
print('\r Extracting and formating metadata.xml')
xml_path = files_exp[0].with_suffix('.metadata.xml')
extract_xml_metadata(str(files_exp[0]), str(xml_path))
format_xml(str(files_exp[0]), str(xml_path))
print('\r ', str(xml_path.name), 'is ready')
print('\r Generating ome-tiff')
for rcpnl in files_exp:
ometif_file = rcpnl.with_suffix('.ome.tif')
if ometif_file.exists():
print('\r ', str(ometif_file.name), 'already exists')
continue
bin_rcpnl(str(rcpnl))
inject_xml_metadata(str(ometif_file), str(xml_path))
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore', r'.* is a low contrast image', UserWarning,
'^skimage\.io'
)
thumbnail.thumbnail(reg.BioformatsReader(str(ometif_file)))
from __future__ import print_function, division
from ashlar import reg
from subprocess import call
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
def tiffdata_el_str(channels, starting_plane, filename, img_uuid):
base_str = (
'<TiffData FirstC="{channel}" FirstT="0" FirstZ="0"'
' IFD="{ifd}" PlaneCount="1">'
'<UUID FileName="{filename}">{uuid}</UUID>'
'</TiffData>'
)
planes = [(i + starting_plane) for i in range(channels)]
output = []
for j in range(channels):
output.append(base_str.format(
channel=j, ifd=planes[j], filename=filename, uuid=img_uuid
))
return ''.join(output)
def extract_xml_metadata(img_filepath, out_xml_path):
img_filepath = pathlib.Path(img_filepath)
out_xml_path = pathlib.Path(out_xml_path)
showinf_path = pathlib.Path('bftools/showinf')
basic_call = str(showinf_path) + ' -omexml -nopix -novalid '
call(basic_call + str(img_filepath) + ' > ' + str(out_xml_path), shell=True)
def format_xml(img_filepath, xml_filepath):
img_filepath = pathlib.Path(img_filepath)
xml_filepath = pathlib.Path(xml_filepath)
metadata = reg.BioformatsMetadata(str(img_filepath))
num_images = metadata.num_images
num_channels = metadata.num_channels
img_size = metadata.size
filename = 'placeholder_filename'
with open(str(xml_filepath)) as f:
xml_str = f.read()
start = xml_str.find('<?xml')
end = xml_str.find('<StructuredAnnotations>')
xml_str = xml_str[start:end]
xml_str += '</OME>'
img_uuid = 'placeholder_uuid'
xml_str = (xml_str
.replace('<OME ', '<OME UUID="' + img_uuid + '" ')
.replace(img_filepath.name, 'placeholder_filename')
.replace(str(img_size[1]), 'placeholder_sizeX')
.replace(str(img_size[0]), 'placeholder_sizeY')
.split('<MetadataOnly/>')
)
starting_planes = list(range(0, num_channels * num_images, num_channels))
output = [(xml_str[i] + tiffdata_el_str(num_channels, starting_planes[i], filename, img_uuid)) for i in range(num_images)]
output = ''.join(output) + xml_str[-1]
# output = xml_str
with open(str(xml_filepath), 'w') as f:
f.write(output)
from ashlar import reg
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
import numpy as np
import warnings
import skimage.io
import sys
def bin_rcpnl(img_filepath, factor=2):
img_filepath = pathlib.Path(img_filepath)
reader = reg.BioformatsReader(str(img_filepath))
binned_filepath = img_filepath.with_suffix('.ome.tif')
img_size = reader.metadata.size
print('\r ', img_filepath.name, '>', binned_filepath.name)
count = 0
for i in range(reader.metadata.num_images):
for j in range(reader.metadata.num_channels):
img = reader.read(i, j)
img = np.clip(
# img.reshape(1080, 2, 1280, 2).sum(-1).sum(1),
img.reshape(int(img_size[0] / factor), factor, int(img_size[1] / factor), factor).sum(-1).sum(1),
0,
65535
).astype(reader.metadata.pixel_dtype)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore', r'.* is a low contrast image', UserWarning,
'^skimage\.io'
)
skimage.io.imsave(
str(binned_filepath), img,
bigtiff=True, photometric='minisblack',
metadata=None, description='!!xml!!',
append=True
)
count += 1
sys.stdout.write('\r saving tile %d/%d'
% (count,
reader.metadata.num_images * reader.metadata.num_channels))
sys.stdout.flush()
print('')
import io
import uuid
from ashlar import reg
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
import re
import struct
def inject_xml_metadata(img_filepath, xml_filepath):
img_filepath = pathlib.Path(img_filepath)
xml_filepath = pathlib.Path(xml_filepath)
img_uuid = uuid.uuid4().urn
img_filename = img_filepath.name
img_size = reg.BioformatsMetadata(str(img_filepath)).size
with open(str(xml_filepath)) as f:
xml_str = f.read()
xml_str = (xml_str
.replace('placeholder_filename', img_filename)
.replace('placeholder_uuid', img_uuid)
.replace('placeholder_sizeX', str(img_size[1]))
.replace('placeholder_sizeY', str(img_size[0]))
.replace('0.32499998807907104', '0.6499999761581421')
.replace('\xb5', '\xc2\xb5')
)
xml_str = unicode(xml_str, 'utf-8')
xml = io.StringIO()
xml.write(xml_str)
xml_bytes = xml.getvalue().encode('utf-8') + b'\x00'
with open(str(img_filepath), 'rb+') as f:
f.seek(0, io.SEEK_END)
xml_offset = f.tell()
f.write(xml_bytes)
f.seek(0)
ifd_block = f.read(500)
match = re.search(b'!!xml!!\x00', ifd_block)
if match is None:
raise RuntimeError("Did not find placeholder string in IFD")
f.seek(match.start() - 8)
f.write(struct.pack('<Q', len(xml_bytes)))
f.write(struct.pack('<Q', xml_offset))
from __future__ import print_function
import csv
from subprocess import call
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
import argparse
import os
from ashlar import reg
import skimage.io
import io
import numpy as np
import struct
import re
import xml.etree.ElementTree as ET
import uuid
import sys
import warnings
def text_to_bool(text):
return False \
if not str(text) \
else str(text).lower() in '1,yes,y,true,t'
def path_to_date(path):
return os.path.getmtime(str(path))
def tiffdata_el_str(channels, starting_plane, filename, img_uuid):
base_str = (
'<TiffData FirstC="{channel}" FirstT="0" FirstZ="0"'
' IFD="{ifd}" PlaneCount="1">'
'<UUID FileName="{filename}">{uuid}</UUID>'
'</TiffData>'
)
planes = [(i + starting_plane) for i in range(channels)]
output = []
for j in range(channels):
output.append(base_str.format(
channel=j, ifd=planes[j], filename=filename, uuid=img_uuid
))
return ''.join(output)
parser = argparse.ArgumentParser(
description='Read a csv config file and run ashlar'
)
parser.add_argument(
'csv_filepath', metavar='CSVFILE',
help='a csv file with header row: Directory, Correction, Pyramid'
)
parser.add_argument(
'-f', '--from-dir', dest='from_dir', default=0, type=int, metavar='FROM',
help=('starting directory; numbering starts at 0')
)
parser.add_argument(
'-t', '--to-dir', dest='to_dir', default=None, type=int, metavar='TO',
help=('ending directory; numbering starts at 0')
)
args = parser.parse_args()
csv_path = pathlib.Path(args.csv_filepath)
if not csv_path.is_file() or csv_path.suffix != '.csv':
print('A csv file is required to proceed.')
parser.print_usage()
parser.exit()
with open(str(csv_path)) as exp_config:
exp_config_reader = csv.DictReader(exp_config)
exps = [dict(row) for row in exp_config_reader]
bftools_path = pathlib.Path('bftools')
if not bftools_path.exists():
pass
for exp in exps[args.from_dir : args.to_dir]:
path_exp = pathlib.Path(exp['Directory'])
files_exp = sorted(path_exp.glob('*rcpnl'))
files_exp.sort(key=path_to_date)
if len(files_exp) == 0:
print('No rcpnl files found in', str(path_exp))
continue
print('Processing files in', str(path_exp))
print('\r Generating reference ome-tiff')
ref_ome_tiff_path = files_exp[0].with_suffix('.ref.ome.tif')
xml_path = files_exp[0].with_suffix('.metadata.xml')
if not xml_path.exists():
call(str(bftools_path / 'bfconvert') + ' ' + str(files_exp[0]) + ' ' + str(ref_ome_tiff_path) + ' > ' + str(ref_ome_tiff_path.parent / 'bfconvertlog.txt'), shell=True)
call(str(bftools_path / 'tiffcomment') + ' ' + str(ref_ome_tiff_path) + ' > ' + str(xml_path), shell=True)
with open(str(xml_path), 'rb') as f:
f_str = f.read()
f_str = f_str.replace('\xb5', '\xc2\xb5')
end_pos = f_str.find('<StructuredAnnotations>')
f_str = f_str[:end_pos]
f_str += '</OME>'
# f.write(f_str)
f.close()
with open(str(xml_path), 'wb') as f:
f.write(f_str)
f.close()
img_uuid = uuid.uuid4().urn
tree = ET.parse(str(xml_path))
root = tree.getroot()
prefix = '{http://www.openmicroscopy.org/Schemas/OME/2016-06}'
ref_uuid = root.get('UUID')
# tree.write(str(xml_path), encoding='utf-8')
for rcpnl in files_exp:
reader = reg.BioformatsReader(str(rcpnl))
binned_filepath = rcpnl.with_suffix('.ome.tif')
print('\r ', rcpnl.name, '>', binned_filepath.name)
count = 0
for i in range(reader.metadata.num_images):
for j in range(reader.metadata.num_channels):
img = reader.read(i, j)
img = np.clip(
img.reshape(1080, 2, 1280, 2).sum(-1).sum(1),
# img.reshape(540, 2, 640, 2).sum(-1).sum(1),
0,
65535
).astype(reader.metadata.pixel_dtype)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore', r'.* is a low contrast image', UserWarning,
'^skimage\.io'
)
skimage.io.imsave(
str(binned_filepath), img,
bigtiff=True, photometric='minisblack',
metadata=None, description='!!xml!!',
append=True
)
count += 1
sys.stdout.write('\r saving tile %d/%d'
% (count,
reader.metadata.num_images * reader.metadata.num_channels))
sys.stdout.flush()
print('')
xml_file = open(str(xml_path))
xml_str = xml_file.read()
xml_file.close()
xml_str = (
xml_str
.replace('2560', '1280')
.replace('2160', '1080')
# .replace('1280', '640')
# .replace('1080', '540')
.replace('0.32499998807907104', '0.6499999761581421')
.replace(files_exp[0].name, rcpnl.name.replace('rcpnl', 'ome.tif'))
.replace(ref_ome_tiff_path.name, rcpnl.name.replace('rcpnl', 'ome.tif'))
.replace('ns0:', '').replace(':ns0', '')
.replace(ref_uuid, img_uuid)
)
xml_str = unicode(xml_str, 'utf-8')
xml = io.StringIO()
xml.write(u'<?xml version="1.0" encoding="UTF-8"?>')
xml.write(xml_str)
xml_bytes = xml.getvalue().encode('utf-8') + b'\x00'
with open(str(binned_filepath), 'rb+') as f:
f.seek(0, io.SEEK_END)
xml_offset = f.tell()
f.write(xml_bytes)
f.seek(0)
ifd_block = f.read(500)
match = re.search(b'!!xml!!\x00', ifd_block)
if match is None:
raise RuntimeError("Did not find placeholder string in IFD")
f.seek(match.start() - 8)
f.write(struct.pack('<Q', len(xml_bytes)))
f.write(struct.pack('<Q', xml_offset))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment