Last active
April 16, 2021 09:51
-
-
Save KaoruNishikawa/9a632e44a34125fbeed3724b68a74886 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Get board temperature | |
via UDP in IPv4 (SOCK_DGRAM) | |
from "localhost:16210" | |
send it to "xffts_board##" and "xffts_power_board##" topics | |
Get spectral data | |
via TCP in IPv4 (SOCK_STREAM) | |
from "localhost:25144" | |
send it to "XFFTS_TEMP" topic | |
""" | |
import sys | |
import signal | |
import queue | |
import socket | |
import threading | |
import csv | |
import time | |
import struct | |
import numpy as np | |
import psutil | |
import rospy | |
sys.path.append("/home/amigos/ros/src/NASCORX_XFFTS") | |
import udp_client # noqa: E402 | |
from nascorx_xffts.msg import XFFTS_spec_msg # noqa: E402 | |
from nascorx_xffts.msg import XFFTS_totalp_msg # noqa: E402 | |
from nascorx_xffts.msg import XFFTS_temp_msg # noqa: E402 | |
class DataServer: | |
HEADER_SIZE = 64 | |
BE_NUM_MAX = 20 # maximum number of IFs | |
# All 20 IFs with XFFTS? | |
_SOCK = None | |
_STOP_LOOP = False | |
def __init__(self, host="localhost", port=25144): | |
self.connect(host, port) # create socket at {host:port} | |
rospy.init_node("XFFTS_data_server") | |
self.queue = queue.Queue() # by default, queue_size = inf., and FIFO. | |
def connect(self, host, port): | |
print(f"Create new socket: {host}:{port}") | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect((host, port)) | |
self._SOCK = sock | |
return | |
def stop_loop(self): | |
self._STOP_LOOP = True | |
return | |
def data_relaying_loop(self): | |
# data is acquired using {socket.socket(socket.AF_INET, socket.SOCK_STREAM)} | |
# @{localhost:25144} | |
print("*** Start : XFFTS Data Relaying Loop ***") | |
boards = {} | |
with open("beam_board.txt") as f: | |
content = csv.reader(f) | |
for data in content: | |
boards[data[0]] = data[1] | |
self.pub, self.pub2 = [], [] | |
for board in boards.keys(): | |
self.pub.append( | |
rospy.Publisher(f"xffts_{board}", XFFTS_spec_msg, queue_size=10) | |
) | |
self.pub2.append( | |
rospy.Publisher(f"xffts_power_{board}", XFFTS_totalp_msg, queue_size=10) | |
) | |
xffts_spec = [XFFTS_spec_msg() for i in range(self.BE_NUM_MAX)] | |
xffts_totalp = [XFFTS_totalp_msg() for i in range(self.BE_NUM_MAX)] | |
proc = psutil.Process() # get current process | |
proc.nice(-20) # set priority of the process | |
# where {-20} is of the highest priority, down to {20} the lowest. | |
while True: | |
if self._STOP_LOOP: | |
break | |
header = self.recv_header() | |
rawdata = self.recv_data(header.data_size) | |
timestamp = time.time() | |
spec, power = [], [] | |
counter = 0 | |
for if_num in range(1, 1 + self.BE_NUM_MAX): | |
if if_num <= header.BE_num: | |
BE_num_temp, ch_num = np.frombuffer( | |
rawdata[counter : counter + 8], ("<i", 2) | |
).tolist()[0] | |
# {BE_num_temp}: 'temp' may stand for temporary | |
data_temp = np.frombuffer( | |
rawdata[counter + 8 : counter + 8 + ch_num * 4], | |
("<f", 32768) | |
# is there no problem on hard-coding {32768}? | |
# isn't {ch_num} the number? | |
).tolist()[0] | |
spec.append(data_temp) | |
power.append(sum(data_temp)) | |
counter += 8 + ch_num * 4 | |
elif header.BE_num < if_num: | |
spec.append([0]) | |
power.append(0) | |
# attention not empty value | |
for i in range(self.BE_NUM_MAX): | |
xffts_spec[i].timestamp = timestamp | |
xffts_spec[i].spec = spec[i] | |
xffts_totalp[i].timestamp = timestamp | |
xffts_totalp[i].total_power = power[i] | |
self.queue.put([xffts_spec, xffts_totalp]) | |
time.sleep(0.001) # isn't this too short? | |
print("** Shut Down : XFFTS Data Relaying Loop **") | |
return | |
def publish(self): | |
while not rospy.is_shutdown(): | |
tmp = self.queue.get(block=True) # blocking (by default) is ok? | |
for i in range(self.BE_NUM_MAX): | |
self.pub[i].publish(tmp[0][i]) | |
self.pub2[i].publish(tmp[1][i]) | |
time.sleep(0.001) # isn't this too short? | |
def temp_relaying_loop(self): | |
"""TEMP is temperature here.""" | |
# data is acquired using {udp_client} which is equivalent to | |
# {socket.socket(socket.AF_INET, socket.SOCK_DGRAM)} | |
# @{localhost:16210} | |
pub3 = rospy.Publisher("XFFTS_TEMP", XFFTS_temp_msg, queue_size=10) | |
XFFTS_TEMP = XFFTS_temp_msg() | |
udp = udp_client.UdpClient() | |
# communicate up to 16384 bytes by default, is it ok? | |
udp.open() | |
while True: | |
if self._STOP_LOOP: | |
break | |
timestamp = time.time() | |
temps = udp.query_all_temperature() | |
for i, temp in enumerate(temps): | |
if temp is None: | |
temps[i] = [0] | |
XFFTS_TEMP.timestamp = timestamp | |
XFFTS_TEMP.TEMP_BE1 = temps[0] | |
XFFTS_TEMP.TEMP_BE2 = temps[1] | |
XFFTS_TEMP.TEMP_BE3 = temps[2] | |
XFFTS_TEMP.TEMP_BE4 = temps[3] | |
XFFTS_TEMP.TEMP_BE5 = temps[4] | |
XFFTS_TEMP.TEMP_BE6 = temps[5] | |
XFFTS_TEMP.TEMP_BE7 = temps[6] | |
XFFTS_TEMP.TEMP_BE8 = temps[7] | |
XFFTS_TEMP.TEMP_BE9 = temps[8] | |
XFFTS_TEMP.TEMP_BE10 = temps[9] | |
XFFTS_TEMP.TEMP_BE11 = temps[10] | |
XFFTS_TEMP.TEMP_BE12 = temps[11] | |
XFFTS_TEMP.TEMP_BE13 = temps[12] | |
XFFTS_TEMP.TEMP_BE14 = temps[13] | |
XFFTS_TEMP.TEMP_BE15 = temps[14] | |
XFFTS_TEMP.TEMP_BE16 = temps[15] | |
# inconsistency between {BE_NUM_MAX} and {TEMP_BE##}? | |
# XFFTS_TEMP.from_node = None | |
pub3.publish(XFFTS_TEMP) | |
time.sleep(1) | |
return | |
def recv_header(self): | |
""" | |
have data_size attr? | |
have BE_num attr? | |
""" | |
header = self._SOCK.recv(self.HEADER_SIZE) | |
# with no query? is xffts streaming the data? | |
# who is {localhost:25144}? port forwarded? | |
return DataHeader(header) | |
def recv_data(self, data_size): | |
""" | |
0-(8): int = (BE_num_temp, ch_num=32768) | |
8-(ch_num*4): float = (data)*32768 | |
...8+ch_num*4-(8): int = (BE_num_temp, ch_num=32768) | |
... | |
""" | |
data = self._SOCK.recv(data_size, socket.MSG_WAITALL) | |
# with no query? is xffts streaming the data? | |
# who is {localhost:25144}? port forwarded? | |
return data | |
def start_thread(self): | |
th = threading.Thread(target=self.data_relaying_loop) | |
th.setDaemon(True) | |
th.start() | |
th2 = threading.Thread(target=self.temp_relaying_loop) | |
th2.setDaemon(True) | |
th2.start() | |
th3 = threading.Thread(target=self.publish) | |
th3.setDaemon(True) | |
th3.start() | |
return | |
class DataHeader: | |
HEADER_SIZE = 64 | |
def __init__(self, header): | |
(self.ieee,) = struct.unpack("<4s", header[0:4]) | |
# {struct.unpack} returns tuple even if there's only one element exists | |
(self.data_format,) = struct.unpack("4s", header[4:8]) | |
(self.package_length,) = struct.unpack("I", header[8:12]) | |
(self.BE_name,) = struct.unpack("8s", header[12:20]) | |
(self.timestamp,) = struct.unpack("28s", header[20:48]) | |
(self.integration_time,) = struct.unpack("I", header[48:52]) | |
(self.phase_number,) = struct.unpack("I", header[52:56]) | |
(self.BE_num,) = struct.unpack("I", header[56:60]) | |
(self.blocking,) = struct.unpack("I", header[60:64]) | |
self.data_size = self.package_length - self.HEADER_SIZE | |
return | |
if __name__ == "__main__": | |
serv = DataServer() | |
# connected at "localhost:25144" (default IP:port) | |
# create ROS node named "XFFTS_data_server" | |
def signal_handler(num, frame): | |
serv.stop_loop() | |
sys.exit(1) | |
signal.signal(signal.SIGINT, signal_handler) | |
serv.start_thread() | |
# execute infinite loop in the following functions: | |
# {data_relaying_loop, temp_relaying_loop, publish} | |
rospy.spin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment