Skip to content

Instantly share code, notes, and snippets.

@gvanem
Created September 25, 2021 17:19
Show Gist options
  • Save gvanem/1184e11fab3fea4511122094fa7e1f3a to your computer and use it in GitHub Desktop.
Save gvanem/1184e11fab3fea4511122094fa7e1f3a to your computer and use it in GitHub Desktop.
RTL_433 test script
#!/usr/bin/env python3
"""Compare actual output Json-formatted output lines of
rtl_433 with reference json."""
import sys
import os
import argparse
import fnmatch
import subprocess
import json
import inspect
from deepdiff import DeepDiff
from pprint import pprint
os.putenv ("WSOCK_TRACE_LEVEL", "0")
try:
import colorama
colorama.init()
have_colorama = True
class colour:
GREEN = colorama.Fore.GREEN + colorama.Style.BRIGHT
YELLOW = colorama.Fore.YELLOW + colorama.Style.BRIGHT
RED = colorama.Fore.RED + colorama.Style.BRIGHT
RESET = colorama.Style.RESET_ALL
except ImportError:
have_colorama = False
class colour:
GREEN = YELLOW = RED = RESET = ""
if os.sys.platform == "win32":
exe = ".exe"
dev_null = "NUL"
else:
exe = ""
dev_null = "/dev/null"
#
# 'my_root' is according this layout:
# <rtl_433_tests install-root>/bin/run_test.py
# <rtl_433_tests install-root>/tests/*
#
my_root = None
opt = None
def no_colour():
if have_colorama:
colorama.deinit()
colour.GREEN = colour.RED = colour.YELLOW = colour.RESET = ""
def trace(level, s):
if opt.verbose >= level:
frame = sys._getframe(0)
line = frame.f_back.f_lineno
file = inspect.getsourcefile (frame.f_back)
print("%s%s(%d):%s %s" % (colour.GREEN, os.path.basename(file), line, colour.RESET, s))
#
# This should be in current directory or on $PATH
# unless specified with the '-c' option.
#
def find_rtl_433():
prog = "rtl_433" + exe
path = os.getenv('PATH').split(os.pathsep)
for p in path:
fn = os.path.join(p, prog)
if os.path.exists(fn):
return fn, True
return prog, False
def run_rtl433(input_fn, samplerate=None, protocol=None):
"""Run rtl_433 and return output."""
args = ["-c", dev_null, "-M", "newmodel"]
if protocol:
args.extend(['-R', str(protocol)])
if samplerate:
args.extend(['-s', str(samplerate)])
args.extend(['-F', 'json', '-r', input_fn])
cmd = [opt.rtl_433_cmd] + args
if opt.dry_run:
print ("%s%s %s%s%s" % (colour.GREEN, opt.rtl_433_cmd, colour.YELLOW, ' '.join(args[::]), colour.RESET))
return (b"", 0, 0)
trace(1, " ".join(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
# Pass warning messages through
for line in err.decode("utf-8").split("\n"):
if "WARNING:" in line:
print_warn(line, False)
return (out, err, p.returncode)
def find_json():
"""Find all reference json files recursively."""
matches = []
tests_dir = os.path.join(my_root, 'tests')
for root, _dirnames, filenames in os.walk(tests_dir):
for filename in fnmatch.filter(filenames, '*.json'):
full_name = os.path.join(root, filename)
dir_spec = os.path.dirname(full_name) [len(tests_dir)+1:] + "/*"
trace(2, "dir_spec: %s" % dir_spec)
if fnmatch.fnmatch(os.path.basename(filename), opt.spec) or fnmatch.fnmatch(dir_spec, opt.spec):
trace(1, "Adding file: %s" % full_name)
matches.append(full_name)
else:
trace(2, "Ignoring file: %s" % full_name)
return matches
def remove_fields(data, fields):
"""Remove all data fields to be ignored."""
for outline in data:
for field in fields:
if field in outline:
del outline[field]
return data
def print_warn(s, warn_prefix=True):
if warn_prefix:
print("%sWARNING%s: %s" % (colour.RED, colour.RESET, s))
else:
print("%s%s%s" % (colour.RED, s, colour.RESET))
def examples():
return '''
Some examples:
run_test.py ford*
run_test.py acurite/Acurite_00275rm/*
'''
def list_files (files):
prev_dir = ""
for f in files:
base = os.path.basename(f)
this_dir = os.path.dirname(f)
if this_dir != prev_dir:
print ("\n%s:" % this_dir.replace("\\", "/"))
note = ""
cu8 = os.path.splitext(f)[0] + ".cu8"
if not os.path.isfile(cu8):
note = " Missing %s%s%s" % (colour.RED, os.path.basename(cu8), colour.RESET)
elif not opt.no_ignore and os.path.isfile("%s/ignore" % this_dir):
note = " %sIgnoring%s" % (colour.GREEN, colour.RESET)
print (" %s%s" % (base, note))
prev_dir = this_dir
def main():
"""Check all reference json files vs actual output."""
parser = argparse.ArgumentParser(description="Test rtl_433",
epilog=examples(),
formatter_class=argparse.RawDescriptionHelpFormatter)
rtl433_prog, rtl433_found = find_rtl_433()
parser.add_argument("-c", dest="rtl_433_cmd", default=rtl433_prog,
help='rtl_433 program to use (default: %s, %sfound)' % \
(rtl433_prog, ["not ", ""][rtl433_found]))
parser.add_argument("-I", dest="ignore_fields", default=[], action="append",
help="Field to ignore in JSON data")
parser.add_argument("-f", "--first-line", dest="first_line", default=False, action="store_true",
help="Only compare the first outputed line of rtl433 "
"with first line of reference json")
parser.add_argument("-l", "--list", dest="list", default=False, action="store_true",
help="print a list of input files matching <spec>")
parser.add_argument("-n", "--dry-run", dest="dry_run", default=False, action="store_true",
help="just list what input files would have been tested")
parser.add_argument("-N", "--no-ignore", dest="no_ignore", default=False, action="store_true",
help="Do not ignore tests with a 'ignore' file in it's directory")
parser.add_argument("-v", "--verbose", dest="verbose", action="count", default=0,
help="increase verbose mode")
parser.add_argument("--no-colors", dest="no_colours", action="store_true",
help="Do not print in colors (if 'colorama' was found)")
parser.add_argument("spec", nargs=argparse.REMAINDER,
help="test only input-files or sub-directories matching <spec>\n"
"E.g. 'ford*' or 'emont/01/*'")
global opt
opt = parser.parse_args()
global my_root
my_root, _ = os.path.split (sys.argv[0])
my_root = os.path.normpath (my_root + os.sep + '..')
trace(1, "my_root: %s" % my_root)
if opt.no_colours:
no_colour()
if opt.spec:
opt.spec = opt.spec[0]
else:
opt.spec = '*'
trace(1, "spec: %s" % opt.spec)
expected_json = find_json()
if opt.list:
opt.no_ignore = True
list_files (expected_json)
return
nb_ok = 0
nb_fail = 0
nb_ignored = 0
false_positives = dict()
for output_fn in expected_json:
input_fn = os.path.splitext(output_fn)[0] + ".cu8"
if not os.path.isfile(input_fn):
print_warn("Missing '%s'" % input_fn)
continue
input_fn = os.path.normpath (input_fn)
ignore_fn = os.path.join(os.path.dirname(output_fn), "ignore")
if not opt.no_ignore and os.path.isfile(ignore_fn):
print_warn("Ignoring '%s'" % input_fn)
nb_ignored += 1
continue
samplerate = 250000
samplerate_fn = os.path.join(os.path.dirname(output_fn), "samplerate")
if os.path.isfile(samplerate_fn):
with open(samplerate_fn, "r") as samplerate_file:
samplerate = int(samplerate_file.readline())
protocol = None
protocol_fn = os.path.join(os.path.dirname(output_fn), "protocol")
if os.path.isfile(protocol_fn):
with open(protocol_fn, "r") as protocol_file:
protocol = protocol_file.readline().strip()
# Open expected data
expected_data = []
with open(output_fn, "r") as output_file:
try:
for json_line in output_file.readlines():
if not json_line.strip():
continue
expected_data.append(json.loads(json_line))
except ValueError as _err:
print("%sERROR: invalid json:%s '%s'" % (colour.RED, colour.RESET, output_fn))
continue
expected_data = remove_fields(expected_data, opt.ignore_fields)
# Run rtl_433
rtl433out, _err, exitcode = run_rtl433(input_fn, samplerate, protocol)
if exitcode:
print("ERROR: Exited with %d '%s'" % (exitcode, input_fn))
# get JSON results
rtl433out = rtl433out.decode('utf8').strip()
results = []
for json_line in rtl433out.split("\n"):
if not json_line.strip():
continue
try:
data = json.loads(json_line)
if "model" in data:
expected_model = expected_data[0]["model"]
actual_model = data["model"]
if actual_model != expected_model:
if actual_model not in false_positives:
false_positives[actual_model] = dict()
false_positives[actual_model]["count"] = 1
false_positives[actual_model]["models"] = set()
false_positives[actual_model]["models"].add(expected_model)
else:
false_positives[actual_model]["count"] += 1
false_positives[actual_model]["models"].add(expected_model)
continue
results.append(data)
except ValueError:
nb_fail += 1
# TODO: factorise error print
print("%s## Failed%s: %s: invalid json output" % (colour.RED, colour.RESET, input_fn))
print("%s" % json_line)
continue
results = remove_fields(results, opt.ignore_fields)
if opt.first_line:
if len(results) == 0:
results.append({})
if len(expected_data) == 0:
expected_data.append({})
expected_data, results = expected_data[0], results[0]
# Compute the diff
diff = DeepDiff(expected_data, results)
if diff and not opt.dry_run:
nb_fail += 1
print("%s## Failed%s: %s" % (colour.RED, colour.RESET, input_fn))
if 0:
pprint (diff, indent=2)
else:
for error, details in diff.items():
print(" %s" % error)
for detail in details:
print(" * %s:" % detail)
print(" +%s%s%s" % (colour.GREEN, detail[1], colour.RESET))
print(" -%s%s%s" % (colour.RED, detail[2], colour.RESET))
else:
nb_ok += 1
print("%s## Okay%s: %s" % (colour.GREEN, colour.RESET, input_fn))
for model, values in false_positives.items():
count = values["count"]
models = values["models"]
print_warn(f"{model} generated {count} false positive(s) in other decoders: {models}")
# print some summary
if opt.dry_run:
print("%d records tested, %d ignored." % (nb_ok, nb_ignored))
else:
print("%d records tested, %d have failed, %d ignored." % (nb_ok+nb_fail, nb_fail, nb_ignored))
return nb_fail
if __name__ == '__main__':
try:
rc = main()
except KeyboardInterrupt:
print ("^C")
rc = 0
sys.exit(rc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment