Skip to content

Instantly share code, notes, and snippets.

@tessus
Last active October 18, 2022 15:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tessus/cdbd2367f02211be9224b756b1ecddae to your computer and use it in GitHub Desktop.
Save tessus/cdbd2367f02211be9224b756b1ecddae to your computer and use it in GitHub Desktop.
opnsense_check: check via API call, if OPNsense update/upgrade is available
#!/usr/bin/env python3
###################################################################
# This script makes an API connection to OPNsense #
# and checks if there are any pending updates #
# if there are, it sends an email with details #
# #
# Authors: Bart J. Smit, 'ObecalpEffect' and Franco Fichtner #
# Helmut K. C. Tessarek #
# #
# Version 1.2 07/05/2016 #
# Version 1.3 2022-05-25 #
# Version 1.4 2022-08-11 #
# Version 1.5 2022-10-18 #
###################################################################
# import libraries
import sys, os, string, argparse, json, configparser, re, subprocess, signal, pprint, pytz, datetime
from dateutil.parser import parse
import json
import requests
import smtplib
import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def cleanup(signal, frame):
frame.f_locals['fp'].terminate()
frame.f_locals['client'].close()
sys.exit(0)
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
def pmsg(msg):
print("{0}: {1}".format(os.path.basename(__file__), msg))
def cfgerr():
pmsg("config file not found")
sys.exit(1)
def mydate(date):
if ctx.timezone in pytz.all_timezones:
return pytz.UTC.localize(date).astimezone(pytz.timezone(ctx.timezone)).strftime("%Y-%m-%d %H:%M:%S %z")
else:
return pytz.UTC.localize(date).strftime("%Y-%m-%d %H:%M:%S %z")
def opnsense_check(ctx):
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
dotrc = os.path.expanduser('~/.config/opnsense_check.rc')
# config file precedence: argument, ./opnsense_check.ini, ~/.config/opnsense_check.rc
if ctx.configfile:
if os.path.isfile(ctx.configfile):
config.read(ctx.configfile)
else:
cfgerr()
elif (os.path.isfile('./opnsense_check.ini')):
config.read('./opnsense_check.ini')
elif (os.path.isfile(dotrc)):
config.read(dotrc)
else:
cfgerr()
api = config['API']
opnsense = config['OPNsense']
email = config['Email']
api_key = ''
api_secret = ''
host = ''
if ctx.api_key:
api_key = ctx.api_key
elif api.get('key') != None:
api_key = api.get('key')
else:
pmsg("API key not set")
sys.exit(2)
if ctx.api_secret:
api_secret = ctx.api_secret
elif api.get('secret') != None:
api_secret = api.get('secret')
else:
pmsg("API secret not set")
sys.exit(2)
if ctx.host:
host = ctx.host
else:
host = opnsense.get('host','localhost')
if ctx.d:
print("[Debug] API key: {0}".format(api_key))
print("[Debug] API secret: {0}".format(api_secret))
print("[Debug] OPNsense host: {0}".format(host))
print("[Debug] TLS verify: {0}".format(ctx.tls_verify))
#if opnsense.get('nonexitent') == None:
# print("var not in ini file")
#sys.exit(0)
url = 'https://' + host + '/api/core/firmware/status'
sender = email.get('sender','notifications@opnsense')
recipient = email.get('recipient','root')
message = 'From: ' + email.get('sender_name','OPNsense Firewall') + '<' + sender + '>\r\n'
message += 'To: ' + email.get('recipient_name','') + '<' + recipient + '>\r\n'
message += 'MIME-Version: 1.0\r\n'
message += 'Content-type: text/html\r\n'
#message += formatdate(localtime=True) + '\r\n'
#message += time.strftime('%Y-%m-%d %H:%M:%S %z', time.localtime(time.time())) + '\r\n'
if ctx.d:
print("[Debug] API URL: {0}".format(url))
print("[Debug] Email sender: {0}".format(sender))
print("[Debug] Email recipient: {0}".format(recipient))
print("[Debug] Send email: {0}".format(ctx.send_email))
#sys.exit(0)
# request data, POST (required for synchronous check)
r = requests.post(url,verify=ctx.tls_verify,auth=(api_key, api_secret))
if r.status_code == 200:
response = json.loads(r.text)
if ctx.d:
print(json.dumps(response))
#sys.exit(0)
update_or_upgrade = False
if response['status'] == 'update':
update_or_upgrade = True
message += 'Subject: Updates for OPNsense\r\n'
message += '<h2>Firewall Updates Available</h2>'
message += '<br>' + response['status_msg'] + '<br>\r\n'
nps = response['new_packages']
if len(nps) > 0:
message += '\r\n<br><b>New:</b><br>\r\n'
if type(nps) == dict:
for n in nps:
message += nps[n]['name'] + ' version ' + nps[n]['version'] + '<br>\r\n'
else:
for n in nps:
message += n['name'] + ' version ' + n['version'] + '<br>\r\n'
ups = response['upgrade_packages']
if len(ups) > 0:
message += '\r\n<br><b>Upgrade:</b><br>\r\n'
if type(ups) == dict:
for u in ups:
message += ups[u]['name'] + ' from ' + ups[u]['current_version'] + ' to ' + ups[u]['new_version'] + '<br>\r\n'
else:
for u in ups:
message += u['name'] + ' from ' + u['current_version'] + ' to ' + u['new_version'] + '<br>\r\n'
rps = response['reinstall_packages']
if len(rps) > 0:
message += '\r\n<br><b>Reinstall:</b><br>\r\n'
if type(rps) == dict:
for r in rps:
message += rps[r]['name'] + ' version ' + rps[r]['version'] + '<br>\r\n'
else:
for r in rps:
message += r['name'] + ' version ' + r['version'] + '<br>\r\n'
message += '<br>Click <a href=\"https://' + host + '/ui/core/firmware#checkupdate\">here</a> to fetch them.<br>\r\n'
if response['needs_reboot'] == '1':
message += '<h3>This update requires a reboot.</h3>'
if response['status'] == 'upgrade':
update_or_upgrade = True
from_to = response['product_version'] + ' -> ' + response['upgrade_major_version']
message += 'Subject: Upgrade for OPNsense ' + from_to + '\r\n'
message += '<h2>Firewall Upgrade Available: ' + from_to + '</h2>'
message += '<br>' + response['upgrade_major_message'] + '<br>\r\n'
message += '<br>Click <a href=\"https://' + host + '/ui/core/firmware#checkupdate\">here</a> to fetch them.<br>\r\n'
if response['upgrade_needs_reboot'] == '1':
message += '<h3>This upgrade requires a reboot.</h3>'
if ctx.d:
print(message)
if ctx.send_email and update_or_upgrade:
s = smtplib.SMTP('localhost')
s.sendmail(sender, recipient, message)
else:
print('Connection / Authentication issue, response received:')
print(r.text)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='checks for OPNsense updates/upgrades')
# TODO: add email arguments
#email = parser.add_argument_group('Email', 'Options that can be set for sending email')
api = parser.add_argument_group('API', 'Options that can be set for API')
parser.add_argument('-c', '--config', action="store", dest='configfile', help='read config from CONFIGFILE')
api.add_argument('--api-key', action="store", dest='api_key', help='API key for OPNsense')
api.add_argument('--api-secret', action="store", dest='api_secret', help='API secret for OPNsense')
parser.add_argument('-n', '--no-email', action="store_false", dest='send_email', help='do not send an email' , default=True)
parser.add_argument('--host', action="store", dest='host', help='OPNsense host')
parser.add_argument('-v', '--verbose', action="store_true", dest='v', help='display info output', default=False)
parser.add_argument('--tls-no-verify', action="store_false", dest='tls_verify', help='do not verify the TLS certificate', default=True)
parser.add_argument('-d', '--debug', action="store_true", dest='d', help='display debug output', default=False)
parser.add_argument('-V', '--version', action="version", version='%(prog)s 1.5')
ctx = parser.parse_args()
opnsense_check(ctx)
@tessus
Copy link
Author

tessus commented Oct 18, 2022

opnsense_check.ini.template

[OPNsense]
host =

[API]
key    =
secret =

[Email]
sender         = notifications@${OPNsense:host}
sender_name    = OPNsense Firewall
recipient      =
recipient_name =

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment