Skip to content

Instantly share code, notes, and snippets.

@brookst
Last active June 25, 2024 12:26
Show Gist options
  • Save brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce to your computer and use it in GitHub Desktop.
Save brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce to your computer and use it in GitHub Desktop.
Serial interface to Bosean FS-5000 radiation detector
#!/usr/bin/env python3
__author__ = "Tim Brooks"
__email__ = "brooks@skoorb.net"
__date__ = "2024-04-23"
import datetime
from enum import Enum, Flag
import logging
import serial
import serial.tools.list_ports as list_ports
import struct
logging.addLevelName(5, "TRACE")
logging.TRACE = 5
COMMAND = {
"set_time": b"\x01",
"read_dose_curve": b"\x03",
"set_rate_limit": b"\x04",
"set_dose_limit": b"\x05",
"get_version": b"\x06",
"get_dose": b"\x07",
"set_alert": b"\x08",
"set_display": b"\x09",
"set_mode": b"\x0a",
"set_language": b"\x0b",
"timer_duration": b"\x0c",
"clear_dose": b"\x0d",
"read": b"\x0e",
"read_rate_curve": b"\x0f",
"read_alarms": b"\x10",
}
RESPONSE = {
"readback": b"\x04",
"success": b"\x06",
"read_starting": b"\x0e\x06\x01",
"read_stopping": b"\x0e\x06\x00",
}
VID_PID = (0x1A86, 0x7523)
class FS5000:
def __init__(self, port):
self.port = serial.Serial(port, 115200, timeout=2)
self.log = logging.getLogger("FS5000")
def log_bytes(self, data, purpose, level=logging.DEBUG):
"""Log raw hex bytes"""
# The stacklevel arg sets funcName to the caller, not this frame
if self.log.getEffectiveLevel() <= level:
title = f"{len(data)} bytes {purpose}: "
if len(title) + len(data) * 3 < 80:
for b in data:
title += f"{b:02x} "
self.log.log(level, title, stacklevel=2)
return
self.log.log(level, title, stacklevel=2)
string = ""
for b in data:
string += f"{b:02x} "
if len(string) >= 48:
self.log.log(level, string, stacklevel=2)
string = ""
if string:
self.log.log(level, string, stacklevel=2)
def write(self, data):
self.log_bytes(data, "written", logging.TRACE)
return self.port.write(data)
def read(self, length):
response = self.port.read(length)
self.log_bytes(response, "read", logging.TRACE)
return response
def checksum(self, data: bytes) -> bytes:
return bytes([sum(data) % 256])
def packet(self, payload: bytes):
self.log.log(logging.TRACE, f"{len(payload)=}")
data = b"\xaa"
# Length + checksum and trailer byte to follow:
data += bytes([len(payload) + 3])
data += payload
data += self.checksum(data)
data += b"\x55"
return data
def send(self, command: bytes):
data = self.packet(command)
self.write(data)
def recv(self):
header = self.read(2)
if len(header) == 0:
return None
if header[0] != 0xAA:
raise IOError(f"Read header 0x{header[0]:02x} not 0xaa")
length = header[1]
data = self.read(length - 1)
if data[-1] != 0x55:
raise IOError(f"Read trailer 0x{data[-1]:02x} not 0x55")
checksum = self.checksum(header + data[:-2])[0]
if checksum != data[-2]:
msg_checksum = data[-2]
self.log_bytes(data, "failed to verify")
raise IOError(f"Checksum failure {checksum:02x} != {msg_checksum:02x}")
return data[:-2]
def check_success(self, command):
response = self.recv()
expectation = bytes([command[0]]) + RESPONSE["success"]
if response[:2] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
return response[2:]
def set_time(self, time: datetime.datetime = None):
if time is None:
time = datetime.datetime.now()
command = COMMAND["set_time"]
command += bytes([time.year % 100, time.month, time.day])
command += bytes([time.hour, time.minute, time.second])
self.send(command)
self.check_success(command)
def read_dose_log(self):
"""Fetch log of total dose"""
self.send(COMMAND["read_dose_curve"])
response = self.check_success(COMMAND["read_dose_curve"])
packets, records = struct.unpack("!BH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_dose_curve"][0]:
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_dose_curve'][0]}")
if response[1] != packet:
raise RuntimeError(f"Received {response[1]=} not {packet=}")
log += response[2:]
self.log_bytes(log, "logged")
raise NotImplementedError("TODO: Parse dose curve")
DIGITS = ".0123456789"
MSV_H = b"mSvh"
USV_H = b"uSvh"
RATE_UNIT = {
MSV_H: "mSv/h",
USV_H: "μSv/h",
}
def set_rate_limit(self, value: str, unit: bytes = b"uSvh"):
if type(value) is not str:
raise TypeError("Rate limit must be 4 characters e.g. '2.50'")
if len(value) != 4:
raise ValueError("Rate limit must be 4 characters e.g. '2.50'")
if any(c not in self.DIGITS for c in value):
raise ValueError(f"Rate limit must be of characters {self.DIGITS}")
if unit not in self.RATE_UNIT:
raise ValueError("Rate limit must have unit 'uSvh' or 'mSvh'")
self.log.debug(f"{value=} {self.RATE_UNIT[unit]}")
command = COMMAND["set_rate_limit"]
command += value.encode("ascii") + unit
self.send(command)
self.check_success(command)
SV = b" Sv"
MSV = b"mSv"
USV = b"uSv"
DOSE_UNIT = {
SV: "Sv",
MSV: "mSv",
USV: "μSv",
}
def set_dose_limit(self, limit: str, unit: bytes = b"uSv"):
# raise NotImplementedError("Dose rate limit unit not yet understood.")
if type(limit) is not str:
raise TypeError("Dose limit must be 4 characters e.g. '2.50'")
if len(limit) != 4:
raise ValueError("Dose limit must be 4 characters e.g. '2.50'")
if any(c not in self.DIGITS for c in limit):
raise ValueError(f"Dose limit must be of characters {self.DIGITS}")
if unit not in self.DOSE_UNIT:
raise ValueError("Dose limit must have unit 'uSv', 'mSv' or ' Sv'")
self.log.debug(limit.encode("ascii") + unit)
command = COMMAND["set_dose_limit"]
command += limit.encode("ascii") + unit
self.send(command)
self.check_success(command)
def get_version(self):
self.send(COMMAND["get_version"])
response = self.recv()
return response
def get_dose(self):
"""Fetch current total dose"""
self.send(COMMAND["get_dose"])
response = self.check_success(COMMAND["get_dose"])
_, dose, *date = struct.unpack("!II5B", response)
dose *= 0.01 # Convert to μSv
year, month, day, hour, minute = date
year += 2000 # We saved an extra byte! This will surely never cause problems...
date = datetime.datetime(year, month, day, hour, minute)
self.log.info(f"{dose:.2f} μSv starting {date}")
return (dose, date)
class Notify(Flag):
LAMP = 0x01
SOUND = 0x02
VIBE = 0x04
CLICK = 0x08
def set_alert(self, value: Notify):
if type(value) is not self.Notify:
raise ValueError("Alert setting must be of type Notify")
command = COMMAND["set_alert"]
command += bytes([value.value])
self.send(command)
self.check_success(command)
def set_display(self, brightness: int, timeout: int):
if brightness < 0 or brightness > 5:
raise ValueError("Brightness must be in range [0-5]")
if timeout < 0 or timeout > 9999:
raise ValueError("Timeout must be in range [0-9999]")
command = COMMAND["set_display"]
command += bytes([brightness, timeout // 256, timeout % 256])
self.send(command)
self.check_success(command)
def set_mode(self, mode=bool):
"""Set True to enable long endurance mode"""
command = COMMAND["set_mode"]
command += bytes([mode])
self.send(command)
self.check_success(command)
class Language(Enum):
CHINESE = 0x00
ENGLISH = 0x01
def set_language(self, value: Language):
if type(value) is not self.Language:
raise ValueError("Language setting must be of type Language")
command = COMMAND["set_language"]
command += bytes([value.value])
self.send(command)
self.check_success(command)
def get_duration(self):
"""Get the period of a 'timed dose' measurement"""
# This seems like a rejected command, but the current value gets read back
command = COMMAND["timer_duration"] + b"\x01"
self.send(command)
response = self.recv()
expectation = COMMAND["timer_duration"] + RESPONSE["readback"] + b"\x00"
if response[:3] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
seconds = struct.unpack("!I", response[3:])[0]
self.log.info(f"Got timed duration {seconds} s")
return seconds
def set_duration(self, seconds):
"""Set the period of a 'timed dose' measurement in seconds"""
command = COMMAND["timer_duration"]
command += struct.pack("!BI", 0, seconds) # 0 to set value, non-zero gets it
self.send(command)
self.check_success()
def clear_dose(self):
"""Clear the accumulated dose total, returns date-time of reset"""
command = COMMAND["clear_dose"]
self.send(command)
response = self.check_success()
date = struct.unpack("!6B", response)
year, month, day, hour, minute = date
year += 2000 # We saved an extra byte! This will surely never cause problems...
date = datetime.datetime(year, month, day, hour, minute)
return date
def start_read(self):
"""Command start of continuous data readout"""
self.send(COMMAND["read"] + b"\x01")
message = self.recv()
if message != RESPONSE["read_starting"]:
raise RuntimeError(f"Expected start of counts, got {message}")
def stop_read(self):
"""Wait until the stop response is read back"""
self.send(COMMAND["read"] + b"\x00")
while True:
try:
message = self.recv()
if message != RESPONSE["read_stopping"]:
raise RuntimeError(f"Expected stopping of counts, got {message}")
break
except IOError:
pass
def yield_data(self):
"""Continuously yield a semicolon separated record of date-time, instantaneous dose rate, total dose,
counts per second, counts per minute, average dose rate, timer, timed dose and alarm status"""
self.start_read()
try:
while True:
message = self.recv()
if message is None:
continue
now = datetime.datetime.now()
if message[0] != COMMAND["read"][0]:
raise RuntimeError(f"Unexpected datum marker: {message[0]=} != {COMMAND['read'][0]}")
now = now.isoformat(timespec="seconds")
yield now + ";" + message[1:].decode()
finally:
self.stop_read()
def read_out(self):
"""Read out data continuously"""
try:
for datum in self.yield_data():
self.log.info(datum)
except KeyboardInterrupt:
pass
def read_rate_log(self):
"""Fetch log of dose rate"""
self.send(COMMAND["read_rate_curve"])
response = self.check_success(COMMAND["read_rate_curve"])
# No idea why there's a null in the centre here:
packets, _, records = struct.unpack("!HBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
command, packet_id = struct.unpack("!BH", response[:3])
if command != COMMAND["read_rate_curve"][0]:
raise RuntimeError(f"Received {command=} not {COMMAND['read_rate_curve'][0]}")
if packet_id != packet:
raise RuntimeError(f"Received {packet_id=} not {packet=}")
log += response[3:]
self.log_bytes(log, "logged")
raise NotImplementedError("TODO: Parse dose rate curve")
def read_alarms(self):
self.send(COMMAND["read_alarms"])
response = self.check_success(COMMAND["read_alarms"])
_, packets, _, records = struct.unpack("!BBBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_alarms"][0]:
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_alarms'][0]}")
if response[2] != packet:
raise RuntimeError(f"Received {response[2]=} not {packet=}")
log += response[3:]
self.log_bytes(log, "logged")
for record in range(records):
data = log[record * 16 : (record + 1) * 16]
values = struct.unpack("!BH5B4s4s", data)
alarm = values[0]
date = datetime.datetime(*values[1:7])
limit, unit = values[7:9]
if alarm == 0x01:
UNIT = self.RATE_UNIT
elif alarm == 0x02:
unit = unit[1:] # The log has an extra space in this case
UNIT = self.DOSE_UNIT
if unit not in UNIT:
self.log.error(f"Unknown unit: {unit}")
continue
self.log.log(logging.TRACE, f"{limit=} {unit=}")
self.log.info(f"#{record+1} {date} >={limit.decode()} {UNIT[unit]}")
class MockFS5000(FS5000):
def __init__(self, port):
self.last = None
self.log = logging.getLogger("MockFS5000")
self.outbox = b""
def write(self, value):
self.log_bytes(value, "written", logging.TRACE)
length = value[1]
self.last = command = value[2]
return len(value)
def read(self, length):
"""Just report that the previous command succeeded"""
if self.last is None:
return b""
# Partial message still waiting to be read
outbox = self.outbox
if len(outbox):
self.outbox = outbox[length:]
self.log_bytes(outbox[:length], "read", logging.TRACE)
return outbox[:length]
# Craft a new message to return to reader
response = self.packet(bytes([self.last]) + RESPONSE["success"])
self.outbox = response[length:]
response = response[:length]
self.log_bytes(response, "read", logging.TRACE)
return response
def get_port():
ports = list_ports.comports()
for port in ports:
if (port.vid, port.pid) == VID_PID:
return port.device
else:
raise FileNotFoundError()
def main():
form = "%(levelname)s:%(name)s:%(funcName)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=form)
# logging.basicConfig(level=logging.TRACE, format=form)
port = get_port()
dev = FS5000(port)
# dev = MockFS5000('/dev/null')
# dev.set_time()
# dev.read_dose_log()
# dev.set_rate_limit("0.50", "uSvh")
# dev.set_rate_limit("0.50", FS5000.USV_H)
# dev.set_dose_limit("0.01", " Sv")
# dev.set_dose_limit("2.50", FS5000.USV)
# version = dev.get_version()
# version = version.split(b'\x00')
# logging.info(f"{version=}")
# dev.get_dose()
# dev.set_alert(FS5000.Notify.LAMP | FS5000.Notify.VIBE)
# dev.set_display(0, 60)
# dev.set_mode(False)
# dev.set_language(FS5000.Language.ENGLISH)
# dev.get_duration()
# dev.set_duration(2 * 60 * 60)
# dev.clear_dose()
# now = datetime.datetime.now().isoformat(timespec="minutes").translate({58:45})
# with open(f"fs5000_{now}.log", "w") as file:
# try:
# for datum in dev.yield_data():
# print(datum, file=file, flush=True)
# except KeyboardInterrupt:
# pass
dev.read_out()
# dev.read_rate_log()
# dev.read_alarms()
# dev.send(COMMAND['set_dose_limit'])
# dev.send(b'\x0c\x02')
# while True:
# response = dev.recv()
# if response is None:
# break
# if len(response) == 0:
# break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment