|
import socket |
|
import struct |
|
import numpy as np |
|
|
|
class TevIpc: |
|
|
|
def __init__(self, hostname = "localhost", port = 14158): |
|
self._hostname = hostname |
|
self._port = port |
|
self._socket = None |
|
|
|
def __enter__(self): |
|
if self._socket is not None: |
|
raise Exception("Communication already started") |
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM means a TCP socket |
|
self._socket.__enter__() |
|
self._socket.connect((self._hostname, self._port)) |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
if self._socket is None: |
|
raise Exception("Communication was not started") |
|
self._socket.__exit__(exc_type, exc_val, exc_tb) |
|
|
|
def create_image(self, name: str, *, grab_focus = True, width: int, height: int, channel_names): |
|
if self._socket is None: |
|
raise Exception("Communication was not started") |
|
|
|
data_bytes = bytearray() |
|
data_bytes.extend(struct.pack("<I", 0)) # reserved for length |
|
data_bytes.extend(struct.pack("<b", 4)) # create image |
|
data_bytes.extend(struct.pack("<b", grab_focus)) # grab focus |
|
data_bytes.extend(bytes(name, "ascii")) # image name |
|
data_bytes.extend(struct.pack("<b", 0)) # string terminator |
|
data_bytes.extend(struct.pack("<i", width)) # width |
|
data_bytes.extend(struct.pack("<i", height)) # height |
|
data_bytes.extend(struct.pack("<i", len(channel_names))) # number of channels |
|
for cname in channel_names: |
|
data_bytes.extend(bytes(cname, "ascii")) # channel name |
|
data_bytes.extend(struct.pack("<b", 0)) # string terminator |
|
data_bytes[0:4] = struct.pack("<I", len(data_bytes)) |
|
|
|
self._socket.sendall(data_bytes) |
|
|
|
def close_image(self, name: str): |
|
if self._socket is None: |
|
raise Exception("Communication was not started") |
|
|
|
data_bytes = bytearray() |
|
data_bytes.extend(struct.pack("<I", 0)) # reserved for length |
|
data_bytes.extend(struct.pack("<b", 2)) # close image |
|
data_bytes.extend(bytes(name, "ascii")) # image name |
|
data_bytes.extend(struct.pack("<b", 0)) # string terminator |
|
data_bytes[0:4] = struct.pack("<I", len(data_bytes)) |
|
|
|
self._socket.sendall(data_bytes) |
|
|
|
def update_image(self, name: str, channel_name: str, image_data, *, grab_focus = False, x = 0, y = 0): |
|
if self._socket is None: |
|
raise Exception("Communication was not started") |
|
|
|
image = np.full(image_data.shape, 0.0, dtype="<f") |
|
image[:,:] = image_data |
|
|
|
data_bytes = bytearray() |
|
data_bytes.extend(struct.pack("<I", 0)) # reserved for length |
|
data_bytes.extend(struct.pack("<b", 3)) # update image |
|
data_bytes.extend(struct.pack("<b", grab_focus)) # grab focus |
|
data_bytes.extend(bytes(name, "ascii")) # image name |
|
data_bytes.extend(struct.pack("<b", 0)) # string terminator |
|
data_bytes.extend(bytes(channel_name, "ascii")) # channel name |
|
data_bytes.extend(struct.pack("<b", 0)) # string terminator |
|
data_bytes.extend(struct.pack("<i", x)) # x |
|
data_bytes.extend(struct.pack("<i", y)) # y |
|
data_bytes.extend(struct.pack("<i", image.shape[1])) # width |
|
data_bytes.extend(struct.pack("<i", image.shape[0])) # height |
|
data_bytes.extend(image.tobytes()) # data |
|
data_bytes[0:4] = struct.pack("<I", len(data_bytes)) |
|
|
|
self._socket.sendall(data_bytes) |