Skip to content

Instantly share code, notes, and snippets.

@openfly
Created January 17, 2018 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save openfly/ec5c6e9d670e5386cb9b54df4d3f26d0 to your computer and use it in GitHub Desktop.
Save openfly/ec5c6e9d670e5386cb9b54df4d3f26d0 to your computer and use it in GitHub Desktop.
Example Python Sensu Check with Unit Test Flag
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Example Unit Tests using Sensu Plugin
'''
__author__ = 'Matt Joyce'
__email__ = 'matt@surly.bike'
import argparse
import json
import requests
import sys
import httpretty
import unittest
from sensu_plugin import SensuPluginCheck
requests.packages.urllib3.disable_warnings()
class Healthchecks(SensuPluginCheck):
def __init__(self, *args, **kwargs):
super(Healthchecks, self).__init__(*args, **kwargs)
def get_health_uri(self):
''' Get the aggregated healthcheck from the target node specified '''
URI = 'https://' + target + ':8443/HealthCheck'
try:
req_resp = requests.get(URI, verify='/etc/ssl/certs/ca-bundle.trust.crt')
assert req_resp.status_code == 200
except requests.exceptions.RequestException as exception_s:
self.critical(str(exception_s))
try:
status_dict = json.loads(req_resp.text)
failed = []
for key, value in status_dict.iteritems():
if value is False and key != '_directory':
failed.append(key)
except Exception as err:
print err
self.critical(str(err))
if 'failed' in dir():
return failed
return []
def run(self):
''' main application loop '''
try:
status = self.get_health_uri()
print 'status gotten'
print status
if len(status) > 0:
self.critical(str(status).strip('[]'))
else:
self.ok(str(status))
except Exception as runtime_error:
print runtime_error
sys.exit(2)
@httpretty.activate
class test_Healthchecks(unittest.TestCase):
''' test case '''
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
super(test_Healthchecks, self).__init__(*args, **kwargs)
self.__uri__ = "https://fake.endpoint:8443"
def test_success(self):
''' test successful healthcheck '''
# register response
httpretty.register_uri(httpretty.GET, self.__uri__ + "/HealthCheck",
body='{"value1"=true,"value2"=true,"value3"=true,"value4"=true}',
status=200,
content_type='text/json')
# run test query
# status_code, response = self.pod.member_add('stream_id', '123456')
with self.assertRaises(SystemExit) as cm:
Healthchecks()
self.assertEqual(cm.exception.code, 0)
# verify return
def test_failone(self):
''' test single failure in check '''
# register response
httpretty.register_uri(httpretty.GET, self.__uri__ + "/HealthCheck",
body='{"value1"=true,"value2"=true,"value3"=false,"value4"=true}',
status=200,
content_type='text/json')
# run test query
with self.assertRaises(SystemExit) as cm:
Healthchecks()
self.assertEqual(cm.exception.code, 2)
def test_failmany(self):
''' test many failure in check '''
# register response
httpretty.register_uri(httpretty.GET, self.__uri__ + "/HealthCheck",
body='{"value1"=false,"value2"=true,"value3"=false,"value4"=false}',
status=200,
content_type='text/json')
# run test query
with self.assertRaises(SystemExit) as cm:
Healthchecks()
self.assertEqual(cm.exception.code, 2)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Example HealthCheck')
parser.add_argument(
'target', type=str,
help='specify path of file to toss')
parser.add_argument(
'-t', '--test', action='store_true',
help='run test case')
args = parser.parse_args()
if args.test:
global target
target = 'fake.endpoint'
# unittest.test_Healthchecks()
suite = unittest.TestSuite()
suite.addTest(test_Healthchecks('test_failmany'))
suite.addTest(test_Healthchecks('test_failone'))
suite.addTest(test_Healthchecks('test_success'))
runner = unittest.TextTestRunner()
runner.run(suite)
else:
global target
target = args.target
try:
Healthchecks()
except Exception as healthcheck_exception:
print 'Exception: %s' % healthcheck_exception
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment