Skip to content

Instantly share code, notes, and snippets.

@kmdouglass
Created July 4, 2024 15:24
Show Gist options
  • Save kmdouglass/4223127775ec7957b8e0b999c5f3ff83 to your computer and use it in GitHub Desktop.
Save kmdouglass/4223127775ec7957b8e0b999c5f3ff83 to your computer and use it in GitHub Desktop.
A simple live image viewer for imaging experiments

Simple Live Image Viewer

This is a simple live image viewer using matplotlib to display the images. The goals of this design are:

Goals

  1. Can display 512 x 512 images at 10 fps or faster
  2. Depends only on numpy and matplotlib
  3. Only show the most up-to-date image; we don't need to display every one

Constraints

  1. Data acquisition and display should be independent due to differences in the rates at which we grab frames and display them.
  2. In-process concurrency in Python requires that the frame grabber library explicitly releases the GIL in the C layer. We can only hope this is the case.
  3. Concurrency with multiple processes requires data de/serialization . This will likely be a bottleneck for large images at high frame rates.

Design

The design is based around the following components:

  1. A numpy array backed by a shared memory buffer to share data between processes
  2. A main process that puts camera frame buffers into the shared numpy array
  3. A display process that displays the images
  4. Two unidirectional pipes between the processes; one from the main process to the display, and vice-versa

The shared memory buffer is just a 1D numpy array with a custom put method. Calling it will insert the data at the current index of the buffer. Once the end of the buffer is reached, it wraps around to the beginning. The shared memory avoids de/serializing image data. Instead, the image's start and stop indexes are shared through the main-to-display pipe.

Backpressure is applied by the display process by sending a Ready message to indicate that it is ready to display a new frame. The main process polls this pipe and only sends index data when it receives a Ready message.

There is no lock on the shared memory; instead, I assume there is only ever one writer (the main process). Any process that reads from this array must be fast enough to read a block of data before the main process arrives back at the same block after looping through the buffer. This means that you should set the capacity based on your use case to avoid this race condition.

Notes

Another process could be added to save the frames, but in this case care must be taken to ensure that every frame is saved. (The display process only shows the most recent frame.) The buffer capacity should therefore be large enough so that the data acquisition cannot loop around the buffer and pass the current location in the buffer of the save process.

You would also likely need to use a Queue to pass messages between the main and save processes since messages might need to be buffered as well.

Disclaimer

I have done minimal testing of this script as it was good enough for my purposes. On a Windows 11 machine, I could simulate 512 x 512 images at about 20 fps with one dropped frame every ~10 frames. Your mileage may vary.

"""Simulates image acquisiton and a live display of the images."""
from dataclasses import dataclass
import multiprocessing as mp
from multiprocessing.connection import PipeConnection
import time
from matplotlib import pyplot as plt
import numpy as np
from shared_buffer import SharedBuffer
@dataclass(frozen=True)
class Initialize:
"""Information required to initialize the state of the display process."""
buffer_name: str
buffer_capacity: int
num_rows: int
num_cols: int
@dataclass(frozen=True)
class DataIndexes:
"""The start and stop indexes of the most recent data in the shared buffer."""
start: int
stop: int
@dataclass(frozen=True)
class Ready():
"""Informs the main process that the display process is ready to receive data."""
@dataclass(frozen=True)
class Shutdown:
"""Signals that the display process should shutdown."""
READY = Ready()
SHUTDOWN = Shutdown()
def display(capacity: int, name: str, rx: PipeConnection, tx: PipeConnection):
# Receive the initialization data
init_message: Initialize = rx.recv()
data = SharedBuffer(init_message.buffer_capacity, name=init_message.buffer_name)
plt.ion()
fig, ax = plt.subplots(1, 1)
img = ax.imshow(np.zeros((init_message.num_rows, init_message.num_cols), dtype=np.uint16), vmin=0, vmax=2**16 - 1)
while True:
# Indicate that we are ready to draw
tx.send(READY)
try:
# Block the process until a message is received
msg = rx.recv()
print("Received message:", msg)
match msg:
case DataIndexes(start, end):
start_idx = start
end_idx = end
case SHUTDOWN:
break
except EOFError:
print("Pipe closed")
break
if start < 0:
# Handle negative indexes from buffer wrap-around
indexes = list(range(start_idx, end_idx))
img.set_data(data[indexes].reshape((init_message.num_rows, init_message.num_cols)))
else:
img.set_data(data[start_idx:end_idx].reshape((init_message.num_rows, init_message.num_cols)))
# Update the data in the image
fig.canvas.draw()
fig.canvas.flush_events()
def main():
# Setup the memory buffer
frames, rows, cols = 100, 512, 512
bytes_per_pixel = 2
num_bytes = frames * rows * cols * bytes_per_pixel
data = SharedBuffer(capacity=num_bytes, create=True)
# Spawn the process that will display the data
rx_disp, tx_main = mp.Pipe(duplex=False)
rx_main, tx_disp = mp.Pipe(duplex=False)
p = mp.Process(target=display, args=(num_bytes, data.name, rx_disp, tx_disp))
p.start()
# Initialize the display process
tx_main.send(Initialize(data.name, num_bytes, rows, cols))
# Generate a random image
img = np.random.randint(0, 2**16 - 1, size=(frames, rows, cols), dtype=np.uint16)
for frame in img:
# Put the data in the shared buffer
start, end = data.put(frame)
# Send the message to the display process only when it signals it is ready
if rx_main.poll():
tx_main.send(DataIndexes(start, end))
rx_main.recv() # Clear the pipe
time.sleep(0.05)
tx_main.send(SHUTDOWN)
p.join()
if __name__ == "__main__":
main()

The MIT License (MIT)

Copyright © 2024 Kyle M. Douglass

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

"""Shared circular memory buffers backed by NumPy arrays."""
import atexit
import logging
from multiprocessing import shared_memory
from typing import Any, Optional
import numpy as np
import numpy.typing as npt
logger = logging.getLogger(__name__)
def _bytes_needed(capacity: int, dtype: npt.DTypeLike) -> int:
"""Finds bytes needed to hold an integer number of items within a given capacity."""
arr_dtype = np.dtype(dtype)
if capacity < arr_dtype.itemsize:
raise ValueError(
"Requested buffer capacity %d is smaller than the size of an item: %s",
capacity,
arr_dtype.itemsize,
)
num_bytes = (capacity // dtype.itemsize) * dtype.itemsize
return num_bytes
class SharedBuffer(np.ndarray):
"""A shared circular memory buffer backed by a NumPy array.
The capacity value that is passed to create the buffer is the maximum size of the buffer in
bytes; the final capacity may be smaller than the requested size to fit an integer number of
items of type dtype into the buffer.
"""
def __new__(
subtype,
capacity: int,
name: Optional[str] = None,
create: bool = False,
dtype: npt.DTypeLike = np.uint16,
offset: int = 0,
strides: tuple[int, ...] = None,
order=None,
) -> "SharedBuffer":
dtype = np.dtype(dtype)
capacity = _bytes_needed(capacity, dtype)
shm = shared_memory.SharedMemory(name=name, create=create, size=capacity)
size = capacity // dtype.itemsize # maximum number of items in the buffer
obj = super().__new__(
subtype,
(size,),
dtype=dtype,
buffer=shm.buf,
offset=offset,
strides=strides,
order=order,
)
# Convert from np.ndarray to SharedBuffer
obj = obj.view(subtype)
obj._writeable = create
obj._shm = shm
obj._write_idx = 0
atexit.register(obj.close)
# Prevents consumers from modifying the data; don't touch this
obj.flags.writeable = create
return obj
def __array_finalize__(self, obj: None | npt.NDArray[Any], /) -> None:
if obj is None:
return
self._writeable = getattr(obj, "_writeable", False)
def close(self) -> None:
self._shm.close()
if self._writeable:
try:
self._shm.unlink()
except FileNotFoundError:
logger.debug(
"Shared memory buffer %s was already closed and unlinked", self._shm.name
)
@property
def name(self) -> str:
return self._shm.name
def put(self, data: npt.NDArray) -> tuple[int, int]:
"""Puts an existing ndarray into the buffer at the next available location.
Returns
-------
tuple[int, int]
The start and end indices of the inserted data. The start index is inclusive, and the
end index is exclusive.
"""
self._validate(data)
# If self.size = 8, self._write_idx = 5, and arr.size = 4, then current_write_idx is -3 and
# next_write_idx is 1.
current_write_idx = self._write_idx
quotient, mod = divmod(self._write_idx + data.size, self.size)
if quotient > 0:
# Array will be inserted partly at the end and partly at the beginning of the buffer
current_write_idx = self._write_idx - self.size
next_write_idx = mod
assert current_write_idx < 0
else:
next_write_idx = self._write_idx + data.size
if current_write_idx < 0:
# Integer ranges won't work when wrapping around the end of the array, so use an array of
# indexes instead
indexes = list(range(current_write_idx, next_write_idx))
self[indexes] = np.ravel(data)
else:
self[current_write_idx:next_write_idx] = np.ravel(data)
self._write_idx = next_write_idx
return (current_write_idx, next_write_idx)
def _validate(self, arr: np.ndarray):
"""Validates input array data."""
if arr.nbytes > self.nbytes:
raise ValueError("Item is too large to put into the buffer")
if arr.dtype != self.dtype:
raise ValueError("dtypes of input array and BufferedArrays do not match")
if __name__ == "__main__":
capacity = 16 # bytes
dtype = np.uint16 # 2 bytes / integer
arr = SharedBuffer(capacity=capacity, create=True, dtype=dtype)
item1 = np.array([1, 1, 1, 1, 1], dtype)
item2 = np.array([2, 2, 2, 2], dtype)
item3 = np.array([3], dtype)
arr.put(item1)
assert np.all(arr[0:5] == 1)
assert np.all(arr[5:] == 0)
arr.put(item2)
assert arr[0] == 2
assert np.all(arr[1:5] == 1)
assert np.all(arr[5:] == 2)
arr.put(item3)
assert arr[0] == 2
assert arr[1] == 3
assert np.all(arr[2:5] == 1)
assert np.all(arr[5:] == 2)
arr.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment