Last active
May 11, 2017 19:53
-
-
Save seanupton/7489856 to your computer and use it in GitHub Desktop.
Example of possible thread-based scanner and email notification for unpatched Plone... YMMV, this code is completely untested aside from passing flake8 checks I have in my editor, I just put this together for explanatory purposes of my intent.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pkg_resources | |
import re | |
import smtplib | |
import socket | |
import threading | |
import time | |
from App.config import getConfiguration | |
from plone.break_core import VulnerabilityCheck as VC | |
MSG = """ | |
You have Plone {0} installed, for this version the following unapplied | |
patches should be installed: | |
* {1} | |
""" | |
MAILBODY = """ | |
From: %s | |
To: %s | |
Subject: %s | |
You are receving this notice, because your email address is listed as a | |
systems administration contact for %s. | |
%s | |
""".strip() | |
_prod_cfg = {} | |
_prod_cfg_locked = False | |
def loadcfg(key): | |
"""Load product configuration; changes require Zope instance restart""" | |
global _prod_cfg | |
global _prod_cfg_locked | |
if _prod_cfg: | |
return _prod_cfg # naively return cached, already loaded. | |
cfg = getConfiguration() | |
if hasattr(cfg, 'product_config'): | |
prodcfg = cfg.product_config.get(key, {}) | |
if not prodcfg: | |
return | |
while _prod_cfg_locked: | |
pass # wait to acquire lock, avoid race-condition. | |
_prod_cfg_locked = True | |
_prod_cfg = prodcfg | |
_prod_cfg_locked = False | |
def getcfg(): | |
global _prod_cfg | |
if not _prod_cfg: | |
loadcfg('plone.break_core') | |
return _prod_cfg | |
def notify_administrator(self, host, mFrom, mTo, subject, message): | |
body = MAILBODY % (mFrom, mTo, subject, socket.getfqdn(), message) | |
delivery = smtplib.SMTP(self.smtp_server) | |
delivery.sendmail(mFrom, mTo, body) | |
delivery.quit() | |
def vuln_scanner(self, packages=pkg_resources.working_set): | |
"""Scans, returns None if no issues, message if unpatched""" | |
_hotfix = lambda pkg: pkg.project_name.startswith('Products.PloneHotfix') | |
hotfixes = [pkg.project_name for pkg in packages if _hotfix(pkg)] | |
plone_version = pkg_resources.get_distribution("Plone").version | |
vc = VC(plone_version, hotfixes) | |
if not vc.isSecure(): | |
return MSG.format(plone_version, "\n * ".join(vc.getPatches())) | |
class ScannerThread(threading.Thread): | |
"""Executes package scans on interval, notifying on 24-hour interval""" | |
def __init__(self, mailhost=None, notify=(), scanner=None, interval=3600): | |
self.daemon = True | |
if not callable(scanner): | |
raise ValueError('no callable scanner passed') | |
self.scanner = scanner | |
self.notify = () | |
self.last_sent = datetime.datetime(2000, 1, 1) | |
self.interval = interval | |
self.mailhost = mailhost or 'localhost' | |
def notify(self, message): | |
host = self.mailhost | |
subject = 'Plone security notification for %s' % socket.getfqdn() | |
if (datetime.datetime.now() - self.last_sent).days >= 1: | |
for address in self.notify: | |
notify_administrator(host, address.strip(), subject, message) | |
def scan(self): | |
message = self.scanner() | |
if message is None: | |
return | |
self.notify(message) | |
def run(self): | |
while 1: | |
self.scan() | |
time.sleep(self.interval) | |
def initialize(context): | |
""" | |
Put configuration in your buildout: | |
[instance] | |
recipe = plone.recipe.zope2instance | |
#... options here ... | |
zope-conf-additional = | |
<product-config plone.break_core> | |
mailhost localhost | |
notify root@localhost, webmaster@localhost, sysadmin@example.com | |
</product-config> | |
""" | |
COMMA = re.compile(',[ ]*') | |
config = getcfg() | |
mailhost = config.get('mailhost', 'localhost') | |
notify = COMMA.split(config.get('notify', 'root@localhost')) | |
scanner = ScannerThread(mailhost, notify, vuln_scanner, 3600) | |
scanner.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment