Skip to content

Instantly share code, notes, and snippets.

@JonathanThorpe
Created November 5, 2016 23:06
Show Gist options
  • Save JonathanThorpe/82274ea39930fcfee86e248a8a3a5a16 to your computer and use it in GitHub Desktop.
Save JonathanThorpe/82274ea39930fcfee86e248a8a3a5a16 to your computer and use it in GitHub Desktop.
Quick and dirty OpenWebRX Nagios Plugin
#!/usr/bin/env python3
import time
import websocket
import re
import requests
import math
import statistics
import threading
import sys
import argparse
from datetime import datetime
class OpenWebRXStats:
re_message_smeter = re.compile(b"MSG s=([0-9\.\-\e]+)")
re_message_setup = re.compile(b"MSG center_freq=([0-9]+) bandwidth=([0-9]+) fft_size=([0-9]+) fft_fps=([0-9]+).* setup")
samples = []
freq = 0
ws = None
def __init__(self, freq, host, timeout=10):
self.freq = freq
self.host = host
self.timeout = timeout
def start(self):
id, offset = self.getId("http://%s/" % self.host)
if (id is not None):
websocket.enableTrace(False)
self.ws = websocket.WebSocketApp("ws://%s/ws/%s" % (self.host, id),
on_message = self.on_message,
on_open = self.on_open,
on_error = self.on_error)
t = threading.Timer(self.timeout, self.endStats)
t.start()
self.ws.run_forever()
def endStats(self):
self.ws.close()
def getId(self, url):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
r = requests.get(url, timeout=5, headers=headers)
if r.status_code == 200:
re_client = re.compile("var client_id=\"([a-z0-9]+)\";")
re_offset = re.compile("var starting_offset_frequency = ([0-9]+);")
client_id = None
offset_freq = None
for line in r.text.split("\n"):
m_client = re_client.search(line)
m_offset = re_offset.search(line)
if (m_client):
client_id = m_client.group(1)
elif (m_offset):
offset_freq = m_offset.group(1)
if (client_id and offset_freq):
return client_id, int(offset_freq)
return None, None
def on_message(self, ws, message):
match_smeter = self.re_message_smeter.match(message)
match_setup = self.re_message_setup.match(message)
if (match_setup):
ws.send("SET low_cut=-4000 high_cut=4000 offset_freq=%s" % (self.freq - int(match_setup.group(1))))
elif (match_smeter):
self.samples.append(10 * math.log10(float(match_smeter.group(1))))
def on_error(self, ws, error):
print("error: %s" % error)
def on_open(self, ws):
ws.send("SERVER DE CLIENT openwebrx.js")
ws.send("SET output_rate=11025 action=start")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--server', default="127.0.0.1:8073")
parser.add_argument('-f', '--freq', default="1638000", type=int)
parser.add_argument('-t', '--timeout', default="5", type=int)
parser.add_argument('-w', '--warning', default="-30", type=int)
parser.add_argument('-c', '--critical', default="-45", type=int)
args = parser.parse_args()
owrx = OpenWebRXStats(args.freq, args.server, args.timeout)
owrx.start()
if (len(owrx.samples) == 0):
print("No samples received")
sys.exit(1)
else:
mean = statistics.mean(owrx.samples)
if (mean < args.critical):
print("Carrier Critical: %d KHz average over %d samples %.2f dB" % (owrx.freq / 1000, len(owrx.samples), mean))
exit(2)
if (mean < args.warning):
print("Carrier Low: %d KHz average over %d samples %.2f dB" % (owrx.freq / 1000, len(owrx.samples), mean))
exit(1)
else:
print("OK: %d KHz average over %d samples %.2f dB" % (owrx.freq / 1000, len(owrx.samples), mean))
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment