Skip to content

Instantly share code, notes, and snippets.

@catalli
Last active October 4, 2023 20:03
Show Gist options
  • Save catalli/1279cbed9306b9606ca7f6384fbc2964 to your computer and use it in GitHub Desktop.
Save catalli/1279cbed9306b9606ca7f6384fbc2964 to your computer and use it in GitHub Desktop.
Image processing helper class for Octopi

ProcessingHandler class

An arbitrary per-XY-coordinate post-acquisition image processing helper class for Octopi.

ProcessingHandler is initialized as an attribute of the MultiPointController (and aliased as an attribute of the MultiPointWorker object as MultiPointWorker.processingHandler) and maintains two queues which have work started on them in separate threads whenever a multipoint acquisition is started. The work is automatically ended by enqueueing termination signals when the multipoint acquisition is ended, and the threads run to completion. Both queues are python queue.Queue objects, which have tasks added to them via their put methods. While the multipoint acquisition is running, the user has the option of enqueueing processing tasks (this documentation assumes multipoint_custom_script_entry is being used to add custom processing, and thus the relevant MultiPointWorker object is accessed as multiPointWorker).

Enqueueing tasks

The handler thread for each queue expects objects in the queue to be dicts in the form {'function':(a callable), 'args':(list of positional arguments), 'kwargs':(dict of keyword arguments)}, and executes each task represented by a dict c by running c['function'](*c['args'],**c['kwargs']). The upload queue handler does not have any requirements on what its functions return, it is assumed that the user will have it pass the data to be uploaded (passed to the function in the dictionary as an arg in the dictionary) to some internal or external data handler accessible from the multiPointWorker. However, the function in any task queued into processingHandler.processing_queue must return a task, i.e. a dictionary of the form described previously. This is assumed to contain data to upload and a method for uploading it, which will then be automatically enqueued in the upload queue. Thus,

  • End users should ordinarily only be directly queueing tasks in multiPointWorker.processingHandler.processing_queue, by, given a task dict task, running multiPointWorker.processingHandeler.processing_queue.put(task).
  • The function (the callable at the key 'function' in the task) in any task queued in the processing_queue should return a task dict in the form {'function':(callable), 'args':(list of positional arguments), 'kwargs':(dict of keyword arguments)} to be enqueued in the upload_queue.

Example use in multipoint_custom_script_entry

Suppose we have a function process_image that takes an image ndarray I as its sole positional argument, and returns a scalar indicating the probability of the presence of a malaria parasite in the image (this is an oversimplified model). Also suppose we have a function upload that takes a scalar as its sole positional argument and passes it on to some data handler or cloud service. To use the processing handler in the custom script, the user will first have to write a function process_image_wrapper as follows (to make sure a task is returned):

def process_image_wrapper(I):
  score = process_image(I)
  return {'function':upload, 'args':[score], 'kwargs':{}}

Then, in their custom script entry, after whatever step it is in which they acquire a FOV I at a given Z-level in their XY-coordinates, the user can simply add the code

task_dict = {'function':process_image_wrapper, 'args':[I.copy()],'kwargs':{}}
multiPointWorker.processingHandler.processing_queue.put(task_dict)

and this will result in the processing and uploading taking place in the background while the microscope moves on to the next acquisition. Note, if working with image ndarrays, remember to pass them using ndarray.copy() to prevent them from being overwritten before processing.

import threading
import queue
import numpy as np
import pandas as pd
def default_image_preprocessor(image, callable_list):
"""
:param image: ndarray representing an image
:param callable_list: List of dictionaries in the form {'func': callable,
'args': list of positional args, 'kwargs': dict of keyword args}. The function
should take an image ndarray as its first positional argument,
and the image should
not be included in the collection of args/kwargs
:return: Image with the elements of callable_list applied in sequence
"""
output_image = np.copy(image)
for c in callable_list:
output_image = c['func'](output_image, *c['args'],**c['kwargs'])
return output_image
def default_upload_fn(I,score, dataHandler):
"""
:brief: designed to be called by default_process_fn that's using
the pre-existing process_fov method
"""
if len(I) == 0:
return
images = I*255
score_df = pd.DataFrame(score, columns=["output"])
if dataHandler.images is None:
dataHandler.load_images(images)
dataHandler.load_predictions(score_df)
else:
dataHandler.add_data(images,score_df)
def default_process_fn(process_fn, *process_args, **process_kwargs):
"""
:brief: meant to be queued with args being [process_fov, (all args for process_fov)]
and kwargs being {'dataHandler': self.microscope.dataHandler,
'upload_fn':default_upload_fn}
:return: A process task, i.e. dict of 'function' (callable), 'args' (list), and
'kwargs' (dict)
"""
dataHandler = process_kwargs['dataHandler'] # should be a DataHandler instance
process_kwargs.pop('dataHandler')
upload_fn = process_kwargs['upload_fn'] # this should be a callable
process_kwargs.pop('upload_fn')
I, score = process_fn(*process_args, **process_kwargs)
return_dict = {}
return_dict['function'] = upload_fn
return_dict['args'] = [I, score, dataHandler]
return_dict['kwargs'] = {}
return return_dict
class ProcessingHandler():
"""
:brief: Handler class for parallelizing FOV processing. GENERAL NOTE:
REMEMBER TO PASS COPIES OF IMAGES WHEN QUEUEING THEM FOR PROCESSING
"""
def __init__(self):
self.processing_queue = queue.Queue() # elements in this queue are
# dicts in the form
# {'function': callable, 'args':list
# of positional arguments to pass,
# 'kwargs': dict of kwargs to pass}
# a dict in the form {'function':'end'}
# will cause processing to terminate
# the function called should return
# a dict in the same form it received,
# in appropriate form to pass to the
# upload queue
self.upload_queue = queue.Queue() # elements in this queue are
# dicts in the form
# {'function': callable, 'args':list
# of positional arguments to pass,
# 'kwargs': dict of kwargs to pass}
# a dict in the form {'function':'end'}
# will cause the uploading to terminate
self.processing_thread = None
self.uploading_thread = None
def processing_queue_handler(self, queue_timeout=None):
while True:
processing_task = None
try:
processing_task = self.processing_queue.get(timeout=queue_timeout)
except queue.Empty:
break
if processing_task['function'] == 'end':
self.processing_queue.task_done()
break
else:
upload_task = processing_task['function'](
*processing_task['args'],
**processing_task['kwargs'])
self.upload_queue.put(upload_task)
self.processing_queue.task_done()
def upload_queue_handler(self, queue_timeout=None):
while True:
upload_task = None
try:
upload_task = self.upload_queue.get(timeout=queue_timeout)
except queue.Empty:
break
if upload_task['function'] == 'end':
self.upload_queue.task_done()
break
else:
upload_task['function'](*upload_task['args'],**upload_task['kwargs'])
self.upload_queue.task_done()
def start_processing(self, queue_timeout=None):
self.processing_thread =\
threading.Thread(target=self.processing_queue_handler, args=[queue_timeout])
self.processing_thread.start()
def start_uploading(self,queue_timeout=None):
self.uploading_thread =\
threading.Thread(target=self.upload_queue_handler,args=[queue_timeout])
self.uploading_thread.start()
def end_uploading(self, *args, **kwargs):
return {'function':'end'}
def end_processing(self):
self.processing_queue.put({'function':self.end_uploading,'args':[],
'kwargs':{}})
self.processing_queue.put({'function':'end'})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment