Skip to content

Instantly share code, notes, and snippets.

@seanupton
Last active May 11, 2017 19:53
Show Gist options
  • Save seanupton/7489856 to your computer and use it in GitHub Desktop.
Save seanupton/7489856 to your computer and use it in GitHub Desktop.
Example of possible thread-based scanner and email notification for unpatched Plone... YMMV, this code is completely untested aside from passing flake8 checks I have in my editor, I just put this together for explanatory purposes of my intent.
import datetime
import pkg_resources
import re
import smtplib
import socket
import threading
import time
from App.config import getConfiguration
from plone.break_core import VulnerabilityCheck as VC
MSG = """
You have Plone {0} installed, for this version the following unapplied
patches should be installed:
* {1}
"""
MAILBODY = """
From: %s
To: %s
Subject: %s
You are receving this notice, because your email address is listed as a
systems administration contact for %s.
%s
""".strip()
_prod_cfg = {}
_prod_cfg_locked = False
def loadcfg(key):
"""Load product configuration; changes require Zope instance restart"""
global _prod_cfg
global _prod_cfg_locked
if _prod_cfg:
return _prod_cfg # naively return cached, already loaded.
cfg = getConfiguration()
if hasattr(cfg, 'product_config'):
prodcfg = cfg.product_config.get(key, {})
if not prodcfg:
return
while _prod_cfg_locked:
pass # wait to acquire lock, avoid race-condition.
_prod_cfg_locked = True
_prod_cfg = prodcfg
_prod_cfg_locked = False
def getcfg():
global _prod_cfg
if not _prod_cfg:
loadcfg('plone.break_core')
return _prod_cfg
def notify_administrator(self, host, mFrom, mTo, subject, message):
body = MAILBODY % (mFrom, mTo, subject, socket.getfqdn(), message)
delivery = smtplib.SMTP(self.smtp_server)
delivery.sendmail(mFrom, mTo, body)
delivery.quit()
def vuln_scanner(self, packages=pkg_resources.working_set):
"""Scans, returns None if no issues, message if unpatched"""
_hotfix = lambda pkg: pkg.project_name.startswith('Products.PloneHotfix')
hotfixes = [pkg.project_name for pkg in packages if _hotfix(pkg)]
plone_version = pkg_resources.get_distribution("Plone").version
vc = VC(plone_version, hotfixes)
if not vc.isSecure():
return MSG.format(plone_version, "\n * ".join(vc.getPatches()))
class ScannerThread(threading.Thread):
"""Executes package scans on interval, notifying on 24-hour interval"""
def __init__(self, mailhost=None, notify=(), scanner=None, interval=3600):
self.daemon = True
if not callable(scanner):
raise ValueError('no callable scanner passed')
self.scanner = scanner
self.notify = ()
self.last_sent = datetime.datetime(2000, 1, 1)
self.interval = interval
self.mailhost = mailhost or 'localhost'
def notify(self, message):
host = self.mailhost
subject = 'Plone security notification for %s' % socket.getfqdn()
if (datetime.datetime.now() - self.last_sent).days >= 1:
for address in self.notify:
notify_administrator(host, address.strip(), subject, message)
def scan(self):
message = self.scanner()
if message is None:
return
self.notify(message)
def run(self):
while 1:
self.scan()
time.sleep(self.interval)
def initialize(context):
"""
Put configuration in your buildout:
[instance]
recipe = plone.recipe.zope2instance
#... options here ...
zope-conf-additional =
<product-config plone.break_core>
mailhost localhost
notify root@localhost, webmaster@localhost, sysadmin@example.com
</product-config>
"""
COMMA = re.compile(',[ ]*')
config = getcfg()
mailhost = config.get('mailhost', 'localhost')
notify = COMMA.split(config.get('notify', 'root@localhost'))
scanner = ScannerThread(mailhost, notify, vuln_scanner, 3600)
scanner.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment