Skip to content

Instantly share code, notes, and snippets.

@KaoruNishikawa
Last active April 16, 2021 09:51
Show Gist options
  • Save KaoruNishikawa/9a632e44a34125fbeed3724b68a74886 to your computer and use it in GitHub Desktop.
Save KaoruNishikawa/9a632e44a34125fbeed3724b68a74886 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Get board temperature
via UDP in IPv4 (SOCK_DGRAM)
from "localhost:16210"
send it to "xffts_board##" and "xffts_power_board##" topics
Get spectral data
via TCP in IPv4 (SOCK_STREAM)
from "localhost:25144"
send it to "XFFTS_TEMP" topic
"""
import sys
import signal
import queue
import socket
import threading
import csv
import time
import struct
import numpy as np
import psutil
import rospy
sys.path.append("/home/amigos/ros/src/NASCORX_XFFTS")
import udp_client # noqa: E402
from nascorx_xffts.msg import XFFTS_spec_msg # noqa: E402
from nascorx_xffts.msg import XFFTS_totalp_msg # noqa: E402
from nascorx_xffts.msg import XFFTS_temp_msg # noqa: E402
class DataServer:
HEADER_SIZE = 64
BE_NUM_MAX = 20 # maximum number of IFs
# All 20 IFs with XFFTS?
_SOCK = None
_STOP_LOOP = False
def __init__(self, host="localhost", port=25144):
self.connect(host, port) # create socket at {host:port}
rospy.init_node("XFFTS_data_server")
self.queue = queue.Queue() # by default, queue_size = inf., and FIFO.
def connect(self, host, port):
print(f"Create new socket: {host}:{port}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self._SOCK = sock
return
def stop_loop(self):
self._STOP_LOOP = True
return
def data_relaying_loop(self):
# data is acquired using {socket.socket(socket.AF_INET, socket.SOCK_STREAM)}
# @{localhost:25144}
print("*** Start : XFFTS Data Relaying Loop ***")
boards = {}
with open("beam_board.txt") as f:
content = csv.reader(f)
for data in content:
boards[data[0]] = data[1]
self.pub, self.pub2 = [], []
for board in boards.keys():
self.pub.append(
rospy.Publisher(f"xffts_{board}", XFFTS_spec_msg, queue_size=10)
)
self.pub2.append(
rospy.Publisher(f"xffts_power_{board}", XFFTS_totalp_msg, queue_size=10)
)
xffts_spec = [XFFTS_spec_msg() for i in range(self.BE_NUM_MAX)]
xffts_totalp = [XFFTS_totalp_msg() for i in range(self.BE_NUM_MAX)]
proc = psutil.Process() # get current process
proc.nice(-20) # set priority of the process
# where {-20} is of the highest priority, down to {20} the lowest.
while True:
if self._STOP_LOOP:
break
header = self.recv_header()
rawdata = self.recv_data(header.data_size)
timestamp = time.time()
spec, power = [], []
counter = 0
for if_num in range(1, 1 + self.BE_NUM_MAX):
if if_num <= header.BE_num:
BE_num_temp, ch_num = np.frombuffer(
rawdata[counter : counter + 8], ("<i", 2)
).tolist()[0]
# {BE_num_temp}: 'temp' may stand for temporary
data_temp = np.frombuffer(
rawdata[counter + 8 : counter + 8 + ch_num * 4],
("<f", 32768)
# is there no problem on hard-coding {32768}?
# isn't {ch_num} the number?
).tolist()[0]
spec.append(data_temp)
power.append(sum(data_temp))
counter += 8 + ch_num * 4
elif header.BE_num < if_num:
spec.append([0])
power.append(0)
# attention not empty value
for i in range(self.BE_NUM_MAX):
xffts_spec[i].timestamp = timestamp
xffts_spec[i].spec = spec[i]
xffts_totalp[i].timestamp = timestamp
xffts_totalp[i].total_power = power[i]
self.queue.put([xffts_spec, xffts_totalp])
time.sleep(0.001) # isn't this too short?
print("** Shut Down : XFFTS Data Relaying Loop **")
return
def publish(self):
while not rospy.is_shutdown():
tmp = self.queue.get(block=True) # blocking (by default) is ok?
for i in range(self.BE_NUM_MAX):
self.pub[i].publish(tmp[0][i])
self.pub2[i].publish(tmp[1][i])
time.sleep(0.001) # isn't this too short?
def temp_relaying_loop(self):
"""TEMP is temperature here."""
# data is acquired using {udp_client} which is equivalent to
# {socket.socket(socket.AF_INET, socket.SOCK_DGRAM)}
# @{localhost:16210}
pub3 = rospy.Publisher("XFFTS_TEMP", XFFTS_temp_msg, queue_size=10)
XFFTS_TEMP = XFFTS_temp_msg()
udp = udp_client.UdpClient()
# communicate up to 16384 bytes by default, is it ok?
udp.open()
while True:
if self._STOP_LOOP:
break
timestamp = time.time()
temps = udp.query_all_temperature()
for i, temp in enumerate(temps):
if temp is None:
temps[i] = [0]
XFFTS_TEMP.timestamp = timestamp
XFFTS_TEMP.TEMP_BE1 = temps[0]
XFFTS_TEMP.TEMP_BE2 = temps[1]
XFFTS_TEMP.TEMP_BE3 = temps[2]
XFFTS_TEMP.TEMP_BE4 = temps[3]
XFFTS_TEMP.TEMP_BE5 = temps[4]
XFFTS_TEMP.TEMP_BE6 = temps[5]
XFFTS_TEMP.TEMP_BE7 = temps[6]
XFFTS_TEMP.TEMP_BE8 = temps[7]
XFFTS_TEMP.TEMP_BE9 = temps[8]
XFFTS_TEMP.TEMP_BE10 = temps[9]
XFFTS_TEMP.TEMP_BE11 = temps[10]
XFFTS_TEMP.TEMP_BE12 = temps[11]
XFFTS_TEMP.TEMP_BE13 = temps[12]
XFFTS_TEMP.TEMP_BE14 = temps[13]
XFFTS_TEMP.TEMP_BE15 = temps[14]
XFFTS_TEMP.TEMP_BE16 = temps[15]
# inconsistency between {BE_NUM_MAX} and {TEMP_BE##}?
# XFFTS_TEMP.from_node = None
pub3.publish(XFFTS_TEMP)
time.sleep(1)
return
def recv_header(self):
"""
have data_size attr?
have BE_num attr?
"""
header = self._SOCK.recv(self.HEADER_SIZE)
# with no query? is xffts streaming the data?
# who is {localhost:25144}? port forwarded?
return DataHeader(header)
def recv_data(self, data_size):
"""
0-(8): int = (BE_num_temp, ch_num=32768)
8-(ch_num*4): float = (data)*32768
...8+ch_num*4-(8): int = (BE_num_temp, ch_num=32768)
...
"""
data = self._SOCK.recv(data_size, socket.MSG_WAITALL)
# with no query? is xffts streaming the data?
# who is {localhost:25144}? port forwarded?
return data
def start_thread(self):
th = threading.Thread(target=self.data_relaying_loop)
th.setDaemon(True)
th.start()
th2 = threading.Thread(target=self.temp_relaying_loop)
th2.setDaemon(True)
th2.start()
th3 = threading.Thread(target=self.publish)
th3.setDaemon(True)
th3.start()
return
class DataHeader:
HEADER_SIZE = 64
def __init__(self, header):
(self.ieee,) = struct.unpack("<4s", header[0:4])
# {struct.unpack} returns tuple even if there's only one element exists
(self.data_format,) = struct.unpack("4s", header[4:8])
(self.package_length,) = struct.unpack("I", header[8:12])
(self.BE_name,) = struct.unpack("8s", header[12:20])
(self.timestamp,) = struct.unpack("28s", header[20:48])
(self.integration_time,) = struct.unpack("I", header[48:52])
(self.phase_number,) = struct.unpack("I", header[52:56])
(self.BE_num,) = struct.unpack("I", header[56:60])
(self.blocking,) = struct.unpack("I", header[60:64])
self.data_size = self.package_length - self.HEADER_SIZE
return
if __name__ == "__main__":
serv = DataServer()
# connected at "localhost:25144" (default IP:port)
# create ROS node named "XFFTS_data_server"
def signal_handler(num, frame):
serv.stop_loop()
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
serv.start_thread()
# execute infinite loop in the following functions:
# {data_relaying_loop, temp_relaying_loop, publish}
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment