Skip to content

Instantly share code, notes, and snippets.

@brookst
Last active March 4, 2025 18:31
Show Gist options
  • Save brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce to your computer and use it in GitHub Desktop.
Save brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce to your computer and use it in GitHub Desktop.
Serial interface to Bosean FS-5000 radiation detector
#!/usr/bin/env python3
__author__ = "Tim Brooks"
__email__ = "brooks@skoorb.net"
__date__ = "2024-04-23"
import datetime
from enum import Enum, Flag
import logging
import serial
import serial.tools.list_ports as list_ports
import struct
logging.addLevelName(5, "TRACE")
logging.TRACE = 5
COMMAND = {
"set_time": b"\x01",
"read_dose_curve": b"\x03",
"set_rate_limit": b"\x04",
"set_dose_limit": b"\x05",
"get_version": b"\x06",
"get_dose": b"\x07",
"set_alert": b"\x08",
"set_display": b"\x09",
"set_mode": b"\x0a",
"set_language": b"\x0b",
"timer_duration": b"\x0c",
"clear_dose": b"\x0d",
"read": b"\x0e",
"read_rate_curve": b"\x0f",
"read_alarms": b"\x10",
}
RESPONSE = {
"readback": b"\x04",
"success": b"\x06",
"read_starting": b"\x0e\x06\x01",
"read_stopping": b"\x0e\x06\x00",
}
VID_PID = (0x1A86, 0x7523)
class FS5000:
def __init__(self, port):
self.port = serial.Serial(port, 115200, timeout=2)
self.log = logging.getLogger("FS5000")
def log_bytes(self, data, purpose, level=logging.DEBUG):
"""Log raw hex bytes"""
# The stacklevel arg sets funcName to the caller, not this frame
if self.log.getEffectiveLevel() <= level:
title = f"{len(data)} bytes {purpose}: "
if len(title) + len(data) * 3 < 80:
for b in data:
title += f"{b:02x} "
self.log.log(level, title, stacklevel=2)
return
self.log.log(level, title, stacklevel=2)
string = ""
for b in data:
string += f"{b:02x} "
if len(string) >= 48:
self.log.log(level, string, stacklevel=2)
string = ""
if string:
self.log.log(level, string, stacklevel=2)
def write(self, data):
self.log_bytes(data, "written", logging.TRACE)
return self.port.write(data)
def read(self, length):
response = self.port.read(length)
self.log_bytes(response, "read", logging.TRACE)
return response
def checksum(self, data: bytes) -> bytes:
return bytes([sum(data) % 256])
def packet(self, payload: bytes):
self.log.log(logging.TRACE, f"{len(payload)=}")
data = b"\xaa"
# Length + checksum and trailer byte to follow:
data += bytes([len(payload) + 3])
data += payload
data += self.checksum(data)
data += b"\x55"
return data
def send(self, command: bytes):
data = self.packet(command)
self.write(data)
def recv(self):
header = self.read(2)
if len(header) == 0:
return None
if header[0] != 0xAA:
raise IOError(f"Read header 0x{header[0]:02x} not 0xaa")
length = header[1]
data = self.read(length - 1)
if data[-1] != 0x55:
raise IOError(f"Read trailer 0x{data[-1]:02x} not 0x55")
checksum = self.checksum(header + data[:-2])[0]
if checksum != data[-2]:
msg_checksum = data[-2]
self.log_bytes(data, "failed to verify")
raise IOError(f"Checksum failure {checksum:02x} != {msg_checksum:02x}")
return data[:-2]
def check_success(self, command):
response = self.recv()
expectation = bytes([command[0]]) + RESPONSE["success"]
if response[:2] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
return response[2:]
def set_time(self, time: datetime.datetime = None):
if time is None:
time = datetime.datetime.now()
command = COMMAND["set_time"]
command += bytes([time.year % 100, time.month, time.day])
command += bytes([time.hour, time.minute, time.second])
self.send(command)
self.check_success(command)
def read_dose_log(self):
"""Fetch log of total dose"""
self.send(COMMAND["read_dose_curve"])
response = self.check_success(COMMAND["read_dose_curve"])
packets, records = struct.unpack("!BH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_dose_curve"][0]:
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_dose_curve'][0]}")
if response[1] != packet:
raise RuntimeError(f"Received {response[1]=} not {packet=}")
log += response[2:]
self.log_bytes(log, "logged")
raise NotImplementedError("TODO: Parse dose curve")
DIGITS = ".0123456789"
MSV_H = b"mSvh"
USV_H = b"uSvh"
RATE_UNIT = {
MSV_H: "mSv/h",
USV_H: "μSv/h",
}
def set_rate_limit(self, value: str, unit: bytes = b"uSvh"):
if type(value) is not str:
raise TypeError("Rate limit must be 4 characters e.g. '2.50'")
if len(value) != 4:
raise ValueError("Rate limit must be 4 characters e.g. '2.50'")
if any(c not in self.DIGITS for c in value):
raise ValueError(f"Rate limit must be of characters {self.DIGITS}")
if unit not in self.RATE_UNIT:
raise ValueError("Rate limit must have unit 'uSvh' or 'mSvh'")
self.log.debug(f"{value=} {self.RATE_UNIT[unit]}")
command = COMMAND["set_rate_limit"]
command += value.encode("ascii") + unit
self.send(command)
self.check_success(command)
SV = b" Sv"
MSV = b"mSv"
USV = b"uSv"
DOSE_UNIT = {
SV: "Sv",
MSV: "mSv",
USV: "μSv",
}
def set_dose_limit(self, limit: str, unit: bytes = b"uSv"):
# raise NotImplementedError("Dose rate limit unit not yet understood.")
if type(limit) is not str:
raise TypeError("Dose limit must be 4 characters e.g. '2.50'")
if len(limit) != 4:
raise ValueError("Dose limit must be 4 characters e.g. '2.50'")
if any(c not in self.DIGITS for c in limit):
raise ValueError(f"Dose limit must be of characters {self.DIGITS}")
if unit not in self.DOSE_UNIT:
raise ValueError("Dose limit must have unit 'uSv', 'mSv' or ' Sv'")
self.log.debug(limit.encode("ascii") + unit)
command = COMMAND["set_dose_limit"]
command += limit.encode("ascii") + unit
self.send(command)
self.check_success(command)
def get_version(self):
self.send(COMMAND["get_version"])
response = self.recv()
return response
def get_dose(self):
"""Fetch current total dose"""
self.send(COMMAND["get_dose"])
response = self.check_success(COMMAND["get_dose"])
_, dose, *date = struct.unpack("!II5B", response)
dose *= 0.01 # Convert to μSv
year, month, day, hour, minute = date
year += 2000 # We saved an extra byte! This will surely never cause problems...
date = datetime.datetime(year, month, day, hour, minute)
self.log.info(f"{dose:.2f} μSv starting {date}")
return (dose, date)
class Notify(Flag):
LAMP = 0x01
SOUND = 0x02
VIBE = 0x04
CLICK = 0x08
def set_alert(self, value: Notify):
if type(value) is not self.Notify:
raise ValueError("Alert setting must be of type Notify")
command = COMMAND["set_alert"]
command += bytes([value.value])
self.send(command)
self.check_success(command)
def set_display(self, brightness: int, timeout: int):
if brightness < 0 or brightness > 5:
raise ValueError("Brightness must be in range [0-5]")
if timeout < 0 or timeout > 9999:
raise ValueError("Timeout must be in range [0-9999]")
command = COMMAND["set_display"]
command += bytes([brightness, timeout // 256, timeout % 256])
self.send(command)
self.check_success(command)
def set_mode(self, mode=bool):
"""Set True to enable long endurance mode"""
command = COMMAND["set_mode"]
command += bytes([mode])
self.send(command)
self.check_success(command)
class Language(Enum):
CHINESE = 0x00
ENGLISH = 0x01
def set_language(self, value: Language):
if type(value) is not self.Language:
raise ValueError("Language setting must be of type Language")
command = COMMAND["set_language"]
command += bytes([value.value])
self.send(command)
self.check_success(command)
def get_duration(self):
"""Get the period of a 'timed dose' measurement"""
# This seems like a rejected command, but the current value gets read back
command = COMMAND["timer_duration"] + b"\x01"
self.send(command)
response = self.recv()
expectation = COMMAND["timer_duration"] + RESPONSE["readback"] + b"\x00"
if response[:3] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
seconds = struct.unpack("!I", response[3:])[0]
self.log.info(f"Got timed duration {seconds} s")
return seconds
def set_duration(self, seconds):
"""Set the period of a 'timed dose' measurement in seconds"""
command = COMMAND["timer_duration"]
command += struct.pack("!BI", 0, seconds) # 0 to set value, non-zero gets it
self.send(command)
self.check_success()
def clear_dose(self):
"""Clear the accumulated dose total, returns date-time of reset"""
command = COMMAND["clear_dose"]
self.send(command)
response = self.check_success()
date = struct.unpack("!6B", response)
year, month, day, hour, minute = date
year += 2000 # We saved an extra byte! This will surely never cause problems...
date = datetime.datetime(year, month, day, hour, minute)
return date
def start_read(self):
"""Command start of continuous data readout"""
self.send(COMMAND["read"] + b"\x01")
message = self.recv()
if message != RESPONSE["read_starting"]:
raise RuntimeError(f"Expected start of counts, got {message}")
def stop_read(self):
"""Wait until the stop response is read back"""
self.send(COMMAND["read"] + b"\x00")
while True:
try:
message = self.recv()
if message != RESPONSE["read_stopping"]:
raise RuntimeError(f"Expected stopping of counts, got {message}")
break
except IOError:
pass
def yield_data(self):
"""Continuously yield a semicolon separated record of date-time, instantaneous dose rate, total dose,
counts per second, counts per minute, average dose rate, timer, timed dose and alarm status"""
self.start_read()
try:
while True:
message = self.recv()
if message is None:
continue
now = datetime.datetime.now()
if message[0] != COMMAND["read"][0]:
raise RuntimeError(f"Unexpected datum marker: {message[0]=} != {COMMAND['read'][0]}")
now = now.isoformat(timespec="seconds")
yield now + ";" + message[1:].decode()
finally:
self.stop_read()
def read_out(self):
"""Read out data continuously"""
try:
for datum in self.yield_data():
self.log.info(datum)
except KeyboardInterrupt:
pass
def read_rate_log(self):
"""Fetch log of dose rate"""
self.send(COMMAND["read_rate_curve"])
response = self.check_success(COMMAND["read_rate_curve"])
# No idea why there's a null in the centre here:
packets, _, records = struct.unpack("!HBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
command, packet_id = struct.unpack("!BH", response[:3])
if command != COMMAND["read_rate_curve"][0]:
raise RuntimeError(f"Received {command=} not {COMMAND['read_rate_curve'][0]}")
if packet_id != packet:
raise RuntimeError(f"Received {packet_id=} not {packet=}")
log += response[3:]
self.log_bytes(log, "logged")
raise NotImplementedError("TODO: Parse dose rate curve")
def read_alarms(self):
self.send(COMMAND["read_alarms"])
response = self.check_success(COMMAND["read_alarms"])
_, packets, _, records = struct.unpack("!BBBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_alarms"][0]:
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_alarms'][0]}")
if response[2] != packet:
raise RuntimeError(f"Received {response[2]=} not {packet=}")
log += response[3:]
self.log_bytes(log, "logged")
for record in range(records):
data = log[record * 16 : (record + 1) * 16]
values = struct.unpack("!BH5B4s4s", data)
alarm = values[0]
date = datetime.datetime(*values[1:7])
limit, unit = values[7:9]
if alarm == 0x01:
UNIT = self.RATE_UNIT
elif alarm == 0x02:
unit = unit[1:] # The log has an extra space in this case
UNIT = self.DOSE_UNIT
if unit not in UNIT:
self.log.error(f"Unknown unit: {unit}")
continue
self.log.log(logging.TRACE, f"{limit=} {unit=}")
self.log.info(f"#{record+1} {date} >={limit.decode()} {UNIT[unit]}")
class MockFS5000(FS5000):
def __init__(self, port):
self.last = None
self.log = logging.getLogger("MockFS5000")
self.outbox = b""
def write(self, value):
self.log_bytes(value, "written", logging.TRACE)
length = value[1]
self.last = command = value[2]
return len(value)
def read(self, length):
"""Just report that the previous command succeeded"""
if self.last is None:
return b""
# Partial message still waiting to be read
outbox = self.outbox
if len(outbox):
self.outbox = outbox[length:]
self.log_bytes(outbox[:length], "read", logging.TRACE)
return outbox[:length]
# Craft a new message to return to reader
response = self.packet(bytes([self.last]) + RESPONSE["success"])
self.outbox = response[length:]
response = response[:length]
self.log_bytes(response, "read", logging.TRACE)
return response
def get_port():
ports = list_ports.comports()
for port in ports:
if (port.vid, port.pid) == VID_PID:
return port.device
else:
raise FileNotFoundError()
def main():
form = "%(levelname)s:%(name)s:%(funcName)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=form)
# logging.basicConfig(level=logging.TRACE, format=form)
port = get_port()
dev = FS5000(port)
# dev = MockFS5000('/dev/null')
# dev.set_time()
# dev.read_dose_log()
# dev.set_rate_limit("0.50", "uSvh")
# dev.set_rate_limit("0.50", FS5000.USV_H)
# dev.set_dose_limit("0.01", " Sv")
# dev.set_dose_limit("2.50", FS5000.USV)
# version = dev.get_version()
# version = version.split(b'\x00')
# logging.info(f"{version=}")
# dev.get_dose()
# dev.set_alert(FS5000.Notify.LAMP | FS5000.Notify.VIBE)
# dev.set_display(0, 60)
# dev.set_mode(False)
# dev.set_language(FS5000.Language.ENGLISH)
# dev.get_duration()
# dev.set_duration(2 * 60 * 60)
# dev.clear_dose()
# now = datetime.datetime.now().isoformat(timespec="minutes").translate({58:45})
# with open(f"fs5000_{now}.log", "w") as file:
# try:
# for datum in dev.yield_data():
# print(datum, file=file, flush=True)
# except KeyboardInterrupt:
# pass
dev.read_out()
# dev.read_rate_log()
# dev.read_alarms()
# dev.send(COMMAND['set_dose_limit'])
# dev.send(b'\x0c\x02')
# while True:
# response = dev.recv()
# if response is None:
# break
# if len(response) == 0:
# break
if __name__ == "__main__":
main()
@davidwowa
Copy link

Hi, is this code tested with FS500 ? Thank you.

@samm-git
Copy link

Works just fine on macos, thank you! where did you got these protocol defs? Windows program sniffing?

@brookst
Copy link
Author

brookst commented Jan 17, 2025

Hi, is this code tested with FS500 ? Thank you.

I only have the FS-5000 and I have now switched to RadPro firmware. As far as I know, there are only a FS600, FS1000 and FS5000 model - at least those are the only ones on Boseans website right now. I developed this from testing with the FS-5000.

@brookst
Copy link
Author

brookst commented Jan 17, 2025

Works just fine on macos, thank you! where did you got these protocol defs? Windows program sniffing?

I started from this comment on the RadPro repo. I just went through each command seeing what it sent back or changed in the UI. I think there's probably some commands missing there (command 0x02?) but I think I cover all the normal features exposed in the UI. Figuring out the format used was a fun challenge - it took me a while to realise the units were just ASCII strings, they just get rendered differently on the device (which does no input validation).

@samm-git
Copy link

thank you again, work just great for me (macOS, fs5000). BTW, is RadPro better? I think to switch too, but will need to order stm programmer.

@davidwowa
Copy link

Common question which I have: is needed specific firmware from RadPro if I will receive serial data ? Or it is possible to connect like Arduino and hear on Serial port ? Thank you

@brookst
Copy link
Author

brookst commented Jan 18, 2025

thank you again, work just great for me (macOS, fs5000). BTW, is RadPro better? I think to switch too, but will need to order stm programmer.

The RadPro firmware is nicer but it doesn't really add any new functionality. I got a cheap programmer, soldered a 4 pin header to the board and the flashing was really easy. There's a python tool that will pull logs and stream live data over USB with a much nicer protocol that actually has documentation: https://github.com/Gissio/radpro/blob/main/docs/comm.md

@brookst
Copy link
Author

brookst commented Jan 18, 2025

Common question which I have: is needed specific firmware from RadPro if I will receive serial data ? Or it is possible to connect like Arduino and hear on Serial port ? Thank you

I'm not sure what you're asking there. RadPro has a serial protocol that works over USB as a serial COM or TTY port. This is the protocol: https://github.com/Gissio/radpro/blob/main/docs/comm.md

There's a tool with RadPro that pulls recorded or live data from the device to a PC. Or you can use GigerLog to read it.

This script does pretty much the same thing with the original manufacturer's firmware though it's up to you to format it for graphing or sending to a service like radmon.org.

@davidwowa
Copy link

I want to measure radiation with FS5000.

I want to regularly query data via USB and then share the data with radmon.org. How exactly can I regularly query data via USB? Thanks for the help

@davidwowa
Copy link

fs5000.py works

(myenv) david@mac FS5000 % python3 fs5000.py INFO:FS5000:read_out: 2025-01-18T22:33:26;DR:0.23uSv/h;D:3.70uSv;CPS:0001;CPM:000029;AVG:0.16uSv/h;DT:0000000;S:0.00uSv;W:0 INFO:FS5000:read_out: 2025-01-18T22:33:27;DR:0.23uSv/h;D:3.70uSv;CPS:0000;CPM:000029;AVG:0.16uSv/h;DT:0000000;S:0.00uSv;W:0 INFO:FS5000:read_out: 2025-01-18T22:33:28;DR:0.23uSv/h;D:3.70uSv;CPS:0001;CPM:000030;AVG:0.17uSv/h;DT:0000000;S:0.00uSv;W:0

but how is possible to get this data in normal shell, for example in screen ? I mean linux screen command:

with screen /dev/tty.usbserial-1440 115200 should be possible to read data, but I don't know how.

Thx.

@samm-git
Copy link

@davidwowa take a look on the script. its not a dumb protocol, you need to initiate transfer. easiest way would be to modify py script for your needs

@samm-git
Copy link

thank you again, work just great for me (macOS, fs5000). BTW, is RadPro better? I think to switch too, but will need to order stm programmer.

The RadPro firmware is nicer but it doesn't really add any new functionality. I got a cheap programmer, soldered a 4 pin header to the board and the flashing was really easy. There's a python tool that will pull logs and stream live data over USB with a much nicer protocol that actually has documentation: https://github.com/Gissio/radpro/blob/main/docs/comm.md

Thank you again ) ordered stm32 programmer @ ali, will flash once it arrive. The RadPro protocol seems to be much easier, yes.

@davidwowa
Copy link

My solution which was needed for me in Java ->

https://github.com/davidwowa/tech.cybersword.sensors/blob/main/src/main/java/tech/cybersword/FS5000.java

if helps for others.

Thank's for supporting

@mouldybread
Copy link

mouldybread commented Mar 4, 2025

I wanted to send the output from an FS-5000 to MQTT/Home Assistant for monitoring but I can't write python. I haven't been able to find a solution so as an experiment I asked an AI to modify this script to suit my needs. Much to my surprise it worked. I connected the FS-5000 to an old Raspberry Pi via USB. Here's the modified script, in case somebody else wishes to do the same:
https://gist.github.com/mouldybread/b1bf8f6bcd047192b21fa78e08a9e6c5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment