Created
May 19, 2015 11:29
-
-
Save gwillem/55d31f6b0f4f7f367843 to your computer and use it in GitHub Desktop.
Functional testing of config management (Ansible, Puppet, Chef, etc)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Various helper functions to aid in node testing (aka provisioning testing aka unit testing) | |
""" | |
from functools import wraps | |
import hashlib | |
import os | |
from pwd import getpwuid | |
from grp import getgrgid | |
import re | |
from stat import S_IMODE | |
import subprocess | |
import socket | |
import requests | |
def assert_file_contains(path, string): | |
assert string in open(path).read(), \ | |
"File %s does NOT contain %s" % (path, string) | |
def assert_file_does_not_contain(path, string): | |
assert string not in open(path).read(), \ | |
"File %s unexpectedly contains %s" % (path, string) | |
def assert_file_size(path, size): | |
assert os.path.getsize(path) == size, \ | |
"File %s is not %d bytes but %d bytes" % ( | |
path, size, os.path.getsize(path)) | |
def assert_executable(program): | |
def is_exe(fpath): | |
return os.path.isfile(fpath) and os.access(fpath, os.X_OK) | |
success = False | |
fpath, fname = os.path.split(program) | |
if fpath: | |
if is_exe(program): | |
success = True | |
else: | |
for path in os.environ["PATH"].split(os.pathsep): | |
path = path.strip('"') | |
exe_file = os.path.join(path, program) | |
if is_exe(exe_file): | |
success = True | |
assert success, "Program %s doesn't exist or isn't executable" % program | |
def assert_port(port, address='127.0.0.1'): | |
try: | |
s = socket.socket() | |
s.connect((address, port)) | |
success = True | |
except: | |
success = False | |
assert success, "Port %d is apparently not open but should be!" % port | |
def assert_symlink(src, dst): | |
assert os.path.islink(src), "%s is not a symlink!" % src | |
assert os.path.realpath(src) == dst, "%s does not point to %s" % (src, dst) | |
def assert_file(fpath, mode=None, user=None, group=None): | |
assert os.path.isfile(fpath), "Path %s is not a file, but should be!" % fpath | |
assert_stat(fpath, mode=mode, user=user, group=group) | |
def assert_path_does_not_exist(fpath): | |
assert not os.path.exists(fpath), "Path %s exists, but shouldn't!" % fpath | |
def assert_command(cmd, required_kw=None, required_re=None, illegal_kw=None, illegal_re=None, code=0): | |
returncode, output = run_cmd(cmd, shell=True) | |
assert returncode == code, "%s: exit code is %s instead of %s:\n%s" % (cmd, returncode, code, output) | |
if required_kw: | |
assert required_kw in output, "%s: Can't find '%s' in output:\n%s" % (cmd, required_kw, output) | |
if required_re: | |
assert re.search(required_re, output), "%s: output does not match regex /%s/:\n%s" % (cmd, required_re, output) | |
if illegal_kw: | |
assert illegal_kw not in output, "%s: found '%s' in output:\n%s" % (cmd, illegal_kw, output) | |
if illegal_re: | |
assert not re.search(illegal_re, output), "%s: output unexpectedly matches regex /%s/:\n%s" % (cmd, required_re, output) | |
return returncode, output | |
def assert_dir(directory, mode=None, user=None, group=None): | |
assert os.path.isdir(directory), "Dir '%s' doesn't exist, but should!" % directory | |
assert_stat(directory, mode=mode, user=user, group=group) | |
def assert_stat(path, mode=None, user=None, group=None): | |
if mode: | |
fmode = S_IMODE(os.stat(path).st_mode) | |
assert fmode == mode, "Mode for %s should be %o, but is %o" % (path, mode, fmode) | |
if user: | |
real_user = os.stat(path).st_uid | |
if isinstance(user, str): | |
real_user = getpwuid(real_user).pw_name | |
assert real_user == user, "Owner for %s should be %s, but is %s" % (path, user, real_user) | |
if group: | |
real_group = os.stat(path).st_gid | |
if isinstance(group, str): | |
real_group = getgrgid(real_group).gr_name | |
assert real_group == group, "Group should be %s, but is %s" % (group, real_group) | |
def assert_package(pkgname): | |
exitcode, output = run_cmd("dpkg-query -W -f '${Status}' " + pkgname, shell=True) | |
assert exitcode == 0 and output == 'install ok installed',\ | |
"Package %s is not installed, but should! dpkg says: %s" % (pkgname, output) | |
def assert_package_purged(pkgname): | |
exitcode, output = run_cmd("dpkg-query -W -f '${Status}' " + pkgname, shell=True) | |
assert (exitcode == 0 and output == 'unknown ok not-installed') \ | |
or (exitcode == 1 and "no packages found matching " + pkgname in output.lower()),\ | |
"Package %s does not seem to be purged! dpkg says: %s" % (pkgname, output) | |
def assert_url_get(url, status=200, required_kw=None, illegal_kw=None, host=None): | |
headers = {} | |
if host: | |
headers['host'] = host | |
r = requests.get(url, headers=headers) | |
assert r.status_code == status, r.status_code | |
if required_kw is not None: | |
assert required_kw in r.content, r.content | |
if illegal_kw is not None: | |
assert illegal_kw not in r.content, r.content | |
return r | |
def assert_query_result(cursor, query, test=1, hint=None): | |
""" | |
test = string to compare with, or None if no rows should be returned | |
if test is a list, then assert that query result matches ANY of the list | |
""" | |
cursor.execute(query) | |
if hint is None: | |
hint = query | |
row = cursor.fetchone() | |
if test is None: | |
assert row is None, "Mysql didnt return ZERO rows (%s)" % hint | |
return | |
if not (type(test) is list): | |
test = [test] | |
assert any([k == row[0] for k in test]), \ | |
"Mysql value is '%s' instead of one of %s! (%s)" % \ | |
(row[0], test, hint) | |
def assert_sha256_checksum(fpath, checksum): | |
assert checksum == hashlib.sha256(open(fpath, 'rb').read()).hexdigest() | |
def run_cmd(cmd, shell=False): | |
ret = 0 | |
try: | |
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=shell) | |
except subprocess.CalledProcessError as e: | |
ret = e.returncode | |
output = e.output | |
return ret, output | |
def skip_if_lxc(f): | |
@wraps(f) | |
def wrapped(*args, **kwargs): | |
try: | |
assert_file_contains('/proc/1/cgroup', '/hypernode-control_') | |
except AssertionError: | |
f(*args, **kwargs) | |
return wrapped | |
def assert_python_module(module): | |
success = True | |
try: | |
__import__(module) | |
except ImportError: | |
success = False | |
assert success, "Could not load python module '%s'" % module | |
def assert_dir_permissions(pathname, uid, gid, mode): | |
s = os.stat(pathname) | |
dir_mode = oct(s.st_mode)[-4:] | |
assert (s.st_uid == uid and s.st_gid == gid and dir_mode == mode), \ | |
"Path %s doesnt have right permissions" % pathname | |
def assert_file_permissions(pathname, uid, gid, mode): | |
s = os.stat(pathname) | |
folder_mode = oct(s.st_mode)[-4:] | |
assert (s.st_uid == uid and s.st_gid == gid and folder_mode == mode), \ | |
"Path %s doesnt have right permissions" % pathname | |
def send_smtp_mail(receipt): | |
import smtplib | |
h = subprocess.Popen("/bin/hostname -f", stdout=subprocess.PIPE, shell=True) | |
stdout, stderr = h.communicate() | |
hostname = stdout | |
server = '127.0.0.1' | |
recipients = [receipt, ] | |
sender = 'mailtest_1nodedb@%s' % hostname | |
message = 'Testing mailflow for %s @ one-node.net' % hostname | |
try: | |
session = smtplib.SMTP(server) | |
session.sendmail(sender, recipients, message) | |
session.quit() | |
return True | |
except Exception: | |
return False | |
def send_sendmail_mail(receipt): | |
from email.mime.text import MIMEText | |
msg = MIMEText("Here is the body of my message") | |
msg["From"] = "mailtest_1nodedb@cloud.byte.nl" | |
msg["To"] = receipt | |
msg["Subject"] = "Testing mailflow for one node db cloud db node" | |
p = subprocess.Popen(["/usr/sbin/sendmail", "-t"], stdin=subprocess.PIPE) | |
return p.communicate(msg.as_string()) | |
def assert_process_running(procname): | |
command = '/usr/bin/pgrep %s' % procname | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) | |
pid, notused = process.communicate() | |
try: | |
os.kill(int(pid), 0) | |
success = True | |
except (OSError, ValueError): | |
success = False | |
assert success, "Process %s is not running but should!" % procname |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment