Skip to content

Instantly share code, notes, and snippets.

@tomasiser
Last active April 22, 2021 12:18
Show Gist options
  • Save tomasiser/5e3bacd72df30f7efc3037cb95a039d3 to your computer and use it in GitHub Desktop.
Save tomasiser/5e3bacd72df30f7efc3037cb95a039d3 to your computer and use it in GitHub Desktop.
Python tev/IPC wrapper

A simple Python wrapper to communicate with tev over IPC

This wrapper allows remotely creating, updating, and closing images in the tev image viewer. It is quite fast and is even suitable for semi-real-time usage like sending a raw video feed from a camera. See this tev issue for more information about its IPC.

Example usage:

with TevIpc() as tev:
    image_data = np.full((100,100,3), 1.0)
    image_data[40:61,:,0] = 0.0
    image_data[:,40:61,1] = 0.0
    image_data[50:71,50:71,2] = 0.0

    bonus_data = image_data[:,:,0] + image_data[:,:,1] + image_data[:,:,2]

    tev.create_image("Test Image", width=100, height=100, channel_names=["R","G","B","Bonus"])
    tev.update_image("Test Image", "R", image_data[:,:,0])
    tev.update_image("Test Image", "G", image_data[:,:,1])
    tev.update_image("Test Image", "B", image_data[:,:,2])
    tev.update_image("Test Image", "Bonus", bonus_data)

# At some later time...
with TevIpc() as tev:
    tev.close_image("Test Image")

example_usage_image

import socket
import struct
import numpy as np
class TevIpc:
def __init__(self, hostname = "localhost", port = 14158):
self._hostname = hostname
self._port = port
self._socket = None
def __enter__(self):
if self._socket is not None:
raise Exception("Communication already started")
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM means a TCP socket
self._socket.__enter__()
self._socket.connect((self._hostname, self._port))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._socket is None:
raise Exception("Communication was not started")
self._socket.__exit__(exc_type, exc_val, exc_tb)
def create_image(self, name: str, *, grab_focus = True, width: int, height: int, channel_names):
if self._socket is None:
raise Exception("Communication was not started")
data_bytes = bytearray()
data_bytes.extend(struct.pack("<I", 0)) # reserved for length
data_bytes.extend(struct.pack("<b", 4)) # create image
data_bytes.extend(struct.pack("<b", grab_focus)) # grab focus
data_bytes.extend(bytes(name, "ascii")) # image name
data_bytes.extend(struct.pack("<b", 0)) # string terminator
data_bytes.extend(struct.pack("<i", width)) # width
data_bytes.extend(struct.pack("<i", height)) # height
data_bytes.extend(struct.pack("<i", len(channel_names))) # number of channels
for cname in channel_names:
data_bytes.extend(bytes(cname, "ascii")) # channel name
data_bytes.extend(struct.pack("<b", 0)) # string terminator
data_bytes[0:4] = struct.pack("<I", len(data_bytes))
self._socket.sendall(data_bytes)
def close_image(self, name: str):
if self._socket is None:
raise Exception("Communication was not started")
data_bytes = bytearray()
data_bytes.extend(struct.pack("<I", 0)) # reserved for length
data_bytes.extend(struct.pack("<b", 2)) # close image
data_bytes.extend(bytes(name, "ascii")) # image name
data_bytes.extend(struct.pack("<b", 0)) # string terminator
data_bytes[0:4] = struct.pack("<I", len(data_bytes))
self._socket.sendall(data_bytes)
def update_image(self, name: str, channel_name: str, image_data, *, grab_focus = False, x = 0, y = 0):
if self._socket is None:
raise Exception("Communication was not started")
image = np.full(image_data.shape, 0.0, dtype="<f")
image[:,:] = image_data
data_bytes = bytearray()
data_bytes.extend(struct.pack("<I", 0)) # reserved for length
data_bytes.extend(struct.pack("<b", 3)) # update image
data_bytes.extend(struct.pack("<b", grab_focus)) # grab focus
data_bytes.extend(bytes(name, "ascii")) # image name
data_bytes.extend(struct.pack("<b", 0)) # string terminator
data_bytes.extend(bytes(channel_name, "ascii")) # channel name
data_bytes.extend(struct.pack("<b", 0)) # string terminator
data_bytes.extend(struct.pack("<i", x)) # x
data_bytes.extend(struct.pack("<i", y)) # y
data_bytes.extend(struct.pack("<i", image.shape[1])) # width
data_bytes.extend(struct.pack("<i", image.shape[0])) # height
data_bytes.extend(image.tobytes()) # data
data_bytes[0:4] = struct.pack("<I", len(data_bytes))
self._socket.sendall(data_bytes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment