Skip to content

Instantly share code, notes, and snippets.

@gwillem
Created May 19, 2015 11:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gwillem/55d31f6b0f4f7f367843 to your computer and use it in GitHub Desktop.
Save gwillem/55d31f6b0f4f7f367843 to your computer and use it in GitHub Desktop.
Functional testing of config management (Ansible, Puppet, Chef, etc)
"""
Various helper functions to aid in node testing (aka provisioning testing aka unit testing)
"""
from functools import wraps
import hashlib
import os
from pwd import getpwuid
from grp import getgrgid
import re
from stat import S_IMODE
import subprocess
import socket
import requests
def assert_file_contains(path, string):
assert string in open(path).read(), \
"File %s does NOT contain %s" % (path, string)
def assert_file_does_not_contain(path, string):
assert string not in open(path).read(), \
"File %s unexpectedly contains %s" % (path, string)
def assert_file_size(path, size):
assert os.path.getsize(path) == size, \
"File %s is not %d bytes but %d bytes" % (
path, size, os.path.getsize(path))
def assert_executable(program):
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
success = False
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
success = True
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
success = True
assert success, "Program %s doesn't exist or isn't executable" % program
def assert_port(port, address='127.0.0.1'):
try:
s = socket.socket()
s.connect((address, port))
success = True
except:
success = False
assert success, "Port %d is apparently not open but should be!" % port
def assert_symlink(src, dst):
assert os.path.islink(src), "%s is not a symlink!" % src
assert os.path.realpath(src) == dst, "%s does not point to %s" % (src, dst)
def assert_file(fpath, mode=None, user=None, group=None):
assert os.path.isfile(fpath), "Path %s is not a file, but should be!" % fpath
assert_stat(fpath, mode=mode, user=user, group=group)
def assert_path_does_not_exist(fpath):
assert not os.path.exists(fpath), "Path %s exists, but shouldn't!" % fpath
def assert_command(cmd, required_kw=None, required_re=None, illegal_kw=None, illegal_re=None, code=0):
returncode, output = run_cmd(cmd, shell=True)
assert returncode == code, "%s: exit code is %s instead of %s:\n%s" % (cmd, returncode, code, output)
if required_kw:
assert required_kw in output, "%s: Can't find '%s' in output:\n%s" % (cmd, required_kw, output)
if required_re:
assert re.search(required_re, output), "%s: output does not match regex /%s/:\n%s" % (cmd, required_re, output)
if illegal_kw:
assert illegal_kw not in output, "%s: found '%s' in output:\n%s" % (cmd, illegal_kw, output)
if illegal_re:
assert not re.search(illegal_re, output), "%s: output unexpectedly matches regex /%s/:\n%s" % (cmd, required_re, output)
return returncode, output
def assert_dir(directory, mode=None, user=None, group=None):
assert os.path.isdir(directory), "Dir '%s' doesn't exist, but should!" % directory
assert_stat(directory, mode=mode, user=user, group=group)
def assert_stat(path, mode=None, user=None, group=None):
if mode:
fmode = S_IMODE(os.stat(path).st_mode)
assert fmode == mode, "Mode for %s should be %o, but is %o" % (path, mode, fmode)
if user:
real_user = os.stat(path).st_uid
if isinstance(user, str):
real_user = getpwuid(real_user).pw_name
assert real_user == user, "Owner for %s should be %s, but is %s" % (path, user, real_user)
if group:
real_group = os.stat(path).st_gid
if isinstance(group, str):
real_group = getgrgid(real_group).gr_name
assert real_group == group, "Group should be %s, but is %s" % (group, real_group)
def assert_package(pkgname):
exitcode, output = run_cmd("dpkg-query -W -f '${Status}' " + pkgname, shell=True)
assert exitcode == 0 and output == 'install ok installed',\
"Package %s is not installed, but should! dpkg says: %s" % (pkgname, output)
def assert_package_purged(pkgname):
exitcode, output = run_cmd("dpkg-query -W -f '${Status}' " + pkgname, shell=True)
assert (exitcode == 0 and output == 'unknown ok not-installed') \
or (exitcode == 1 and "no packages found matching " + pkgname in output.lower()),\
"Package %s does not seem to be purged! dpkg says: %s" % (pkgname, output)
def assert_url_get(url, status=200, required_kw=None, illegal_kw=None, host=None):
headers = {}
if host:
headers['host'] = host
r = requests.get(url, headers=headers)
assert r.status_code == status, r.status_code
if required_kw is not None:
assert required_kw in r.content, r.content
if illegal_kw is not None:
assert illegal_kw not in r.content, r.content
return r
def assert_query_result(cursor, query, test=1, hint=None):
"""
test = string to compare with, or None if no rows should be returned
if test is a list, then assert that query result matches ANY of the list
"""
cursor.execute(query)
if hint is None:
hint = query
row = cursor.fetchone()
if test is None:
assert row is None, "Mysql didnt return ZERO rows (%s)" % hint
return
if not (type(test) is list):
test = [test]
assert any([k == row[0] for k in test]), \
"Mysql value is '%s' instead of one of %s! (%s)" % \
(row[0], test, hint)
def assert_sha256_checksum(fpath, checksum):
assert checksum == hashlib.sha256(open(fpath, 'rb').read()).hexdigest()
def run_cmd(cmd, shell=False):
ret = 0
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=shell)
except subprocess.CalledProcessError as e:
ret = e.returncode
output = e.output
return ret, output
def skip_if_lxc(f):
@wraps(f)
def wrapped(*args, **kwargs):
try:
assert_file_contains('/proc/1/cgroup', '/hypernode-control_')
except AssertionError:
f(*args, **kwargs)
return wrapped
def assert_python_module(module):
success = True
try:
__import__(module)
except ImportError:
success = False
assert success, "Could not load python module '%s'" % module
def assert_dir_permissions(pathname, uid, gid, mode):
s = os.stat(pathname)
dir_mode = oct(s.st_mode)[-4:]
assert (s.st_uid == uid and s.st_gid == gid and dir_mode == mode), \
"Path %s doesnt have right permissions" % pathname
def assert_file_permissions(pathname, uid, gid, mode):
s = os.stat(pathname)
folder_mode = oct(s.st_mode)[-4:]
assert (s.st_uid == uid and s.st_gid == gid and folder_mode == mode), \
"Path %s doesnt have right permissions" % pathname
def send_smtp_mail(receipt):
import smtplib
h = subprocess.Popen("/bin/hostname -f", stdout=subprocess.PIPE, shell=True)
stdout, stderr = h.communicate()
hostname = stdout
server = '127.0.0.1'
recipients = [receipt, ]
sender = 'mailtest_1nodedb@%s' % hostname
message = 'Testing mailflow for %s @ one-node.net' % hostname
try:
session = smtplib.SMTP(server)
session.sendmail(sender, recipients, message)
session.quit()
return True
except Exception:
return False
def send_sendmail_mail(receipt):
from email.mime.text import MIMEText
msg = MIMEText("Here is the body of my message")
msg["From"] = "mailtest_1nodedb@cloud.byte.nl"
msg["To"] = receipt
msg["Subject"] = "Testing mailflow for one node db cloud db node"
p = subprocess.Popen(["/usr/sbin/sendmail", "-t"], stdin=subprocess.PIPE)
return p.communicate(msg.as_string())
def assert_process_running(procname):
command = '/usr/bin/pgrep %s' % procname
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
pid, notused = process.communicate()
try:
os.kill(int(pid), 0)
success = True
except (OSError, ValueError):
success = False
assert success, "Process %s is not running but should!" % procname
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment