Skip to content

Instantly share code, notes, and snippets.

@bennihepp
Created January 19, 2012 09:51
Show Gist options
  • Save bennihepp/1639109 to your computer and use it in GitHub Desktop.
Save bennihepp/1639109 to your computer and use it in GitHub Desktop.
CellProfiler module to execute small python scripts
'''<b>RunScript</b> - an easy way to write scripts for CellProfiler
<hr>
This module allows you to write small python scripts that are run as part
of the CellProfiler pipeline.
'''
#################################
#
# Imports from useful Python libraries
#
#################################
import os
import itertools
import numpy as np
import scipy as sp
#################################
#
# Imports from CellProfiler
#
# The package aliases are the standard ones we use
# throughout the code.
#
##################################
import cellprofiler.cpimage as cpi
import cellprofiler.cpmodule as cpm
import cellprofiler.measurements as cpmeas
import cellprofiler.objects as cpo
import cellprofiler.settings as cps
from cellprofiler.preferences import \
DEFAULT_INPUT_FOLDER_NAME, DEFAULT_OUTPUT_FOLDER_NAME, NO_FOLDER_NAME, \
ABSOLUTE_FOLDER_NAME
DIR_ALL = [DEFAULT_INPUT_FOLDER_NAME, DEFAULT_OUTPUT_FOLDER_NAME,
NO_FOLDER_NAME, ABSOLUTE_FOLDER_NAME]
WC_OBJECTS = 'CellProfiler Objects'
WC_FILTERED_OBJECTS = 'Filtered segmentation'
WC_UNFILTERED_OBJECTS = 'Unfiltered segmentation'
WC_SMALLREMOVED_OBJECTS = 'Segmentation with small objects removed'
WD_NO = "No"
WD_PDB = "Use pdb"
WD_WINGDB = "Use wingdbstub"
WD_RPDB2 = "Use rpdb2"
WT_FLOAT = "Float"
WT_INT = "Integer"
WT_LIST = "List"
WT_DICT = "Dictionary"
WT_NPARRAY = "Numpy float array"
WT_FLOATDICT = "Dictionary with float values"
WT_STRING = "String"
SETTINGS_OFFSET = 5
MT_TYPES = [
cpmeas.COLTYPE_FLOAT,
cpmeas.COLTYPE_INTEGER,
]
class RunScript(cpm.CPModule):
module_name = "RunScript"
category = "Other"
variable_revision_number = 1
def create_settings(self):
# add choice for debugging modes
self.wants_debug_mode = cps.Choice("Run script in debug mode?",
[WD_NO, WD_PDB,
WD_WINGDB, WD_RPDB2])
# add containers for groups
self.input_image_groups = []
self.input_object_groups = []
self.input_measurement_groups = []
self.input_constant_groups = []
self.output_image_groups = []
self.output_object_groups = []
self.output_measurement_groups = []
# add hidden counts for groups
self.input_image_count = cps.HiddenCount(
self.input_image_groups, 'Input image count')
self.input_object_count = cps.HiddenCount(
self.input_object_groups, 'input object count')
self.input_measurement_count = cps.HiddenCount(
self.input_measurement_groups, 'Input measurement count')
self.input_constant_count = cps.HiddenCount(
self.input_constant_groups, 'Input constant count')
self.output_image_count = cps.HiddenCount(
self.output_image_groups, 'Output image count')
self.output_object_count = cps.HiddenCount(
self.output_object_groups, 'Output object count')
self.output_measurement_count = cps.HiddenCount(
self.output_measurement_groups, 'Output measurement count')
# add buttons for adding inputs and outputs
self.add_input_image = cps.DoSomething(
"", "Add another input image",
self.add_input_image_cb)
self.add_input_object = cps.DoSomething(
"", "Add another input object",
self.add_input_object_cb)
self.add_input_measurement = cps.DoSomething(
"", "Add another input measurement",
self.add_input_measurement_cb)
self.add_input_constant = cps.DoSomething(
"", "Add another input constant",
self.add_input_constant_cb)
self.input_output_divider = cps.Divider()
self.add_output_image = cps.DoSomething(
"", "Add another output image",
self.add_output_image_cb)
self.add_output_object = cps.DoSomething(
"", "Add another output object",
self.add_output_object_cb)
self.add_output_measurement = cps.DoSomething(
"", "Add another output measurement",
self.add_output_measurement_cb)
# add binary choice for having a text field to enter the script
# or load the script from a file
self.script_choice = cps.Binary(
"Load script from a file?",
True,
doc="""You can either load the script from """
"""a file or enter it directly into a text box."""
)
# add directory input box for loading script
self.script_dir = cps.DirectoryPath(
"Name of the script file directory",
dir_choices=DIR_ALL,
allow_metadata=False
)
# add file input box for loading script
self.script_file = cps.FilenameText(
"Name of the script file",
"None",
exts=[("Script file (*.py)", "*.py"), ("Any file (*)", "*")]
)
# add text field for entering script.
# keyword arguments for the text field
text_kwargs = {'doc': "Enter the python script to be run",
'multiline': True}
# check if we are allowed to change the size of the text field
if getattr(cps.Text, '__change_size_support__', False):
text_kwargs['size'] = (600, 400)
self.script_text = cps.Text(
"Script to run:",
"results = 5",
**text_kwargs
)
self.__settings = (
(
self.add_input_image, self.input_image_groups,
("image", "py_name", "wants_raw"),
("divider", "image", "py_name", "wants_raw", "remover"),
self.add_input_image_cb
),
(
self.add_input_object, self.input_object_groups,
("objects", "py_name", "objects_choice"),
("divider", "objects", "py_name", "objects_choice", "remover"),
self.add_input_object_cb
),
(
self.add_input_measurement, self.input_measurement_groups,
("wants_image", "use_object_name", "measurement", "py_name"),
("divider", "wants_image", "use_object_name",
"measurement", "py_name", "remover"),
self.add_input_measurement_cb
),
(
self.add_input_constant, self.input_constant_groups,
("constant", "type_choice", "py_name"),
("divider", "constant", "type_choice", "py_name", "remover"),
self.add_input_constant_cb
),
(
self.add_output_image, self.output_image_groups,
("image_name", "py_name"),
("divider", "image_name", "py_name", "remover"),
self.add_output_image_cb
),
(
self.add_output_object, self.output_object_groups,
("objects_name", "py_name"),
("divider", "objects_name", "py_name", "remover"),
self.add_output_object_cb
),
(
self.add_output_measurement, self.output_measurement_groups,
("image", "wants_image", "use_object_name", "type_choice",
"measurement_category", "measurement_name", "py_name"),
("divider", "image", "wants_image", "use_object_name",
"type_choice", "measurement_category", "measurement_name",
"py_name", "remover"),
self.add_output_measurement_cb
),
)
def settings(self):
result = [
self.input_image_count,
self.input_object_count,
self.input_measurement_count,
self.input_constant_count,
self.output_image_count,
self.output_object_count,
self.output_measurement_count,
]
result += [self.wants_debug_mode]
result += [self.script_choice]
result += [self.script_dir]
result += [self.script_file]
result += [self.script_text]
for group_btn, groups, attr_names, visible_attr_names, add_cb in \
self.__settings:
for group in groups:
for attr_name in attr_names:
result += [getattr(group, attr_name)]
return result
def visible_settings(self):
result = [self.wants_debug_mode]
for group_btn, groups, attr_names, visible_attr_names, add_cb in \
self.__settings:
for group in groups:
skip_use_object_name = False
for attr_name in visible_attr_names:
if attr_name == 'wants_image' \
and getattr(group, attr_name).value:
skip_use_object_name = True
if attr_name != 'use_object_name' \
or not skip_use_object_name:
result += [getattr(group, attr_name)]
#result += group.visible_settings()
result += [group_btn]
result += [cps.Divider()]
result += [self.script_choice]
if self.script_choice.value:
result += [self.script_dir]
result += [self.script_file]
else:
result += [self.script_text]
return result
def prepare_settings(self, setting_values):
'''Prepare the module to receive the settings
setting_values - one string per setting to be initialized
Adjust the number of input and output objects to
match the number indicated in the settings.
'''
for count, groups, add_cb in zip(
map(lambda x: int(x), setting_values[0:6 + 1]),
*itertools.compress(zip(*self.__settings), [0, 1, 0, 0, 1])
):
del groups[count:]
while len(groups) < count:
add_cb()
def add_input_image_cb(self):
'''Add an image to the input_image_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('image', cps.ImageNameSubscriber(
"Select an input image to use", None,
doc="""Select an image you want to use as input in your script"""
))
group.append('py_name', cps.Text(
"Name for the object-variable in Python",
"cp_image_in",
doc="""Select the name of the variable that """
"""can be used to access the image. The pixels can"""
"""then be accessed in python as <py_name>.pixel_data."""
))
group.append(
'wants_raw',
cps.Binary("Access raw pixel data?", False,
doc="You can choose to access the raw pixel data " \
"instead of the CPImage object, where the pixels "\
"can be accessed as 'image.pixel_data'."
))
group.append('remover',
cps.RemoveSettingButton(
"", "Remove this image", self.input_image_groups, group)
)
self.input_image_groups.append(group)
def add_input_object_cb(self):
'''Add an object to the input_object_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('objects', cps.ObjectNameSubscriber(
"Select an input object to use", None,
doc="""Select an object you want to use as input in your script"""
))
group.append('py_name', cps.Text(
"Name for the object-variable in Python",
"cp_object_in",
doc="""Select the name of the variable that """
"""can be used to access the object"""
))
group.append(
'objects_choice',
cps.Choice('What part of the objects do you want to access?',
[WC_OBJECTS,
WC_FILTERED_OBJECTS,
WC_UNFILTERED_OBJECTS,
WC_SMALLREMOVED_OBJECTS]
))
#wants_raw = cps.Binary(
# "Access raw labels matrix?", False,
# doc="You can choose to access the raw unfiltered labels matrix " \
# "instead of the Objects object, where the labels "\
# "matrix can be accessed as 'objects.unedited_segmented'.")
#group.append('wants_raw', wants_raw)
group.append(
"remover",
cps.RemoveSettingButton(
"", "Remove this object", self.input_object_groups, group)
)
self.input_object_groups.append(group)
def add_input_measurement_cb(self):
'''Add a measurement to the input_measurement_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
wants_image_measurement = cps.Binary("Use an image measurement?", True)
group.append('wants_image', wants_image_measurement)
use_object_name = cps.ObjectNameSubscriber("Object name:")
group.append('use_object_name', use_object_name)
def object_fn():
if wants_image_measurement.value:
return cpmeas.IMAGE
else:
return use_object_name.value
group.append('measurement', cps.Measurement(
"Select an input measurement to use", object_fn,
doc="""Select an image you want to use as input in your script"""
))
group.append('py_name', cps.Text(
"Name for the measurement-variable in Python",
"cp_measurement_in",
doc="""Select the name of the variable that """
"""can be used to access the measurement"""
))
group.append("remover",
cps.RemoveSettingButton(
"", "Remove this measurement",
self.input_measurement_groups, group)
)
self.input_measurement_groups.append(group)
def add_input_constant_cb(self):
'''Add a constant to the input_constant_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('constant', cps.Text(
"Enter the constant input to use",
"1.0",
doc="""Enter a constant you want to use as input in your script"""
))
group.append('type_choice',
cps.Choice("Choose the type of the constant",
[WT_FLOAT,
WT_INT,
WT_LIST,
WT_DICT,
WT_NPARRAY,
WT_FLOATDICT,
WT_STRING]
))
group.append('py_name', cps.Text(
"Name for the constant-variable in Python",
"cp_constant_in",
doc="""Select the name of the variable that """
"""can be used to access the constant"""
))
group.append(
"remover",
cps.RemoveSettingButton(
"", "Remove this constant", self.input_constant_groups, group)
)
self.input_constant_groups.append(group)
def add_output_image_cb(self):
'''Add an image to the output_image_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('image_name', cps.ImageNameProvider(
"Output image name:",
"OutputImage"
))
group.append('py_name', cps.Text(
"Name for the image-variable in Python",
"cp_image_out",
doc="""Select the name of the variable that """
"""can be used to access the image"""
))
group.append(
"remover",
cps.RemoveSettingButton(
"", "Remove this image", self.output_image_groups, group)
)
self.output_image_groups.append(group)
def add_output_object_cb(self):
'''Add an object to the output_object_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('objects_name', cps.ObjectNameProvider(
"Name the output objects:",
"OutputObject"
))
group.append('py_name', cps.Text(
"Name for the object-variable in Python",
"cp_object_out",
doc="""Select the name of the variable that """
"""can be used to access the object"""
))
group.append(
"remover",
cps.RemoveSettingButton(
"", "Remove this object", self.output_object_groups, group)
)
self.output_object_groups.append(group)
def add_output_measurement_cb(self):
'''Add a measurement to output_measurement_groups collection'''
group = cps.SettingsGroup()
group.append("divider", cps.Divider())
group.append('image', cps.ImageNameSubscriber(
"Image to measure", None,
doc="""Select an image on which to perform the measurement"""
))
group.append('wants_image', cps.Binary(
"Create an image measurement?", False
))
group.append('use_object_name', cps.ObjectNameSubscriber(
"Object name",
doc="""Select an object on which to perform the measurement"""
))
group.append('measurement_category', cps.Text(
"Measurement category:",
"RunScript"
))
group.append('type_choice',
cps.Choice("Choose the type of the measurement",
MT_TYPES
))
group.append('measurement_name', cps.Text(
"Measurement name:",
"OutputMeasurement"
))
group.append('py_name', cps.Text(
"Name for the measurement-variable in Python",
"cp_measurement_out",
doc="""Select the name of the variable that """
"""can be used to access the measurement"""
))
group.append(
"remover",
cps.RemoveSettingButton(
"", "Remove this measurement",
self.output_measurement_groups, group)
)
self.output_measurement_groups.append(group)
def get_measurement_columns(self, pipeline):
'''Return column definitions for measurements made by this module'''
columns = []
for group in self.output_measurement_groups:
object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
columns.append((
object_name,
'%s_%s_%s' % (group.measurement_category.value,
group.measurement_name.value,
group.image.value),
group.type_choice.value
))
return columns
def get_categories(self, pipeline, object_name):
categories = []
for group in self.output_measurement_groups:
group_object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
if object_name == group_object_name:
categories.append(group.measurement_category.value)
return categories
def get_measurements(self, pipeline, object_name, category):
measurements = []
for group in self.output_measurement_groups:
group_object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
group_category = group.measurement_category.value
if object_name == group_object_name \
and category == group_category:
measurements.append(group.measurement_name.value)
return measurements
def get_measurement_images(self, pipeline, object_name, \
category, measurement):
images = []
for group in self.output_measurement_groups:
group_object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
group_category = group.measurement_category.value
group_name = group.measurement_name.value
group_image = group.image.value
if object_name == group_object_name \
and category == group_category \
and measurement == group_name:
images.append(group_image)
return images
#def get_measurement_objects(self, pipeline, object_name, \
#category, measurement):
#objects = []
#for group in self.output_measurement_groups:
#group_object_name = (cpmeas.IMAGE if group.wants_image.value
#else group.use_object_name.value)
#group_category = group.measurement_category.value
#group_name = group.measurement_name.value
#group_image = group.image.value
#if not group.wants_image.value \
#and object_name == group_object_name \
#and category == group_category \
#and measurement == group_name:
#objects.append(group_object_name)
#return objects
#
# CellProfiler calls "run" on each image set in your pipeline.
# This is where you do the real work.
#
def run(self, workspace):
# prepare namespace for python script
global_ns = {}
user_ns = {}
global_ns['cpi'] = cpi
global_ns['cpm'] = cpm
global_ns['cpmeas'] = cpmeas
global_ns['cpo'] = cpo
global_ns['cps'] = cps
global_ns['np'] = np
global_ns['sp'] = sp
# attribute-accessable dictionary for easier use in the script
class AttributeDict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
# add a value to the script namespace
def add_value_to_script_namespace(name, value):
name_parts = name.split('.')
d = user_ns
for name_part in name_parts[:-1]:
if name_part not in user_ns:
d[name_part] = AttributeDict()
d = d[name_part]
d[name_parts[-1]] = value
# add input images to the script namespace
for group in self.input_image_groups:
image = workspace.image_set.get_image(group.image.value)
py_name = group.py_name.value
if group.wants_raw.value:
image = image.pixel_data
add_value_to_script_namespace(py_name, image)
# add input objects to the script namespace
for group in self.input_object_groups:
objects = workspace.object_set.get_objects(group.objects.value)
py_name = group.py_name.value
if group.objects_choice.value == WC_FILTERED_OBJECTS:
objects = objects.segmented
elif group.objects_choice.value == WC_UNFILTERED_OBJECTS:
objects = objects.unedited_segmented
elif group.objects_choice.value == WC_SMALLREMOVED_OBJECTS:
objects = objects.small_removed_segmented
add_value_to_script_namespace(py_name, objects)
# add input measurements to the script namespace
for group in self.input_measurement_groups:
use_object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
measurement = workspace.measurements.get_current_measurement(
use_object_name, group.measurement.value)
py_name = group.py_name.value
add_value_to_script_namespace(py_name, measurement)
# add input constants to the script namespace
for group in self.input_constant_groups:
constant = group.constant.value
type_choice = group.type_choice.value
py_name = group.py_name.value
if type_choice == WT_FLOAT:
constant = float(constant)
elif type_choice == WT_INT:
constant = int(constant)
elif type_choice == WT_LIST:
constant = constant.split(',')
elif type_choice == WT_DICT:
constant = dict(
map(lambda x: x.split(':'), constant.split(','))
)
elif type_choice == WT_NPARRAY:
constant = np.array(
map(lambda x: float(x), constant.split(','))
)
elif type_choice == WT_FLOATDICT:
constant = dict(
map(lambda k, v: (k, float(v)),
map(lambda x: x.split(':'), constant.split(','))
)
)
add_value_to_script_namespace(py_name, constant)
# load the script
if self.script_choice.value:
dir_name = self.script_dir.get_absolute_path()
script_name = self.script_file.value
path = os.path.join(dir_name, script_name)
print 'script_name:', path
with open(path) as f:
#script = unicode(f.read())
lines = f.readlines()
script = ''.join(lines)
else:
script_name = '<runscript script>'
script = self.script_text.value
# maybe add some debugging stuff
if self.wants_debug_mode.value == WD_PDB:
script = u"import pdb\npdb.set_trace()\n" + script
elif self.wants_debug_mode == WD_WINGDB:
script = u"import wingdbstub\nwingdbstub.debugger.Break()" + script
elif self.wants_debug_mode == WD_RPDB2:
script = u"import rpdb2\nrpdb2.set_trace()\n" + script
print 'script:', script
# compile the script
codeobj = compile(script, script_name, 'exec')
# run script
exec codeobj in global_ns, user_ns
# retrieve output images from the script namespace
for group in self.output_image_groups:
image = user_ns[group.py_name.value]
if not isinstance(image, cpi.Image):
image = cpi.Image(image)
workspace.image_set.add(group.image_name.value, image)
# retrieve output objects from the script namespace
for group in self.output_object_groups:
objects = user_ns[group.py_name.value]
if not isinstance(objects, cpo.Objects):
new_objects = cpo.Objects()
new_objects.segmented = objects
objects = new_objects
workspace.object_set.add_objects(objects, group.objects_name.value)
# retrieve output measurements from the script namespace
for group in self.output_measurement_groups:
use_object_name = (cpmeas.IMAGE if group.wants_image.value
else group.use_object_name.value)
py_name = group.py_name.value
measurement_name = "%s_%s_%s" % (
group.measurement_category.value,
group.measurement_name.value,
group.image.value
)
measurement = user_ns[py_name]
if group.wants_image.value:
workspace.measurements.add_image_measurement(
measurement_name,
measurement
)
else:
workspace.measurements.add_measurement(
use_object_name,
measurement_name,
measurement
)
################################
#
# DISPLAY
#
def is_interactive(self):
return False
def display(self, workspace):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment