Skip to content

Instantly share code, notes, and snippets.

@vdboor
Created April 7, 2016 13:08
Show Gist options
  • Save vdboor/d04696af89bc478a9fd0581783ebfd62 to your computer and use it in GitHub Desktop.
Save vdboor/d04696af89bc478a9fd0581783ebfd62 to your computer and use it in GitHub Desktop.
Manage the `userlist.txt` for PgBouncer in ansible. Place this file in the `library` folder.
#!/usr/bin/env python
#
# (c) 2015, Edoburu, GPLv3+ licensed
#
# Based on https://github.com/ansible/ansible-modules-core/blob/devel/web_infrastructure/htpasswd.py
#
DOCUMENTATION = """
module: htpasswd
short_description: manage PgBouncer userlist.txt entries
description:
- Add and remove username/password entries in a password file for PgBouncer.
options:
path:
required: true
aliases: [ dest, destfile ]
description:
- Path to the file that contains the usernames and passwords
name:
required: true
aliases: [ username ]
description:
- User name to add or remove
password:
required: false
description:
- Password associated with user.
- Must be specified if user does not exist yet.
crypt_scheme:
required: false
choices: ["md5", "plaintext"]
default: "md5"
description:
- Encryption scheme to be used.
state:
required: false
choices: [ present, absent ]
default: "present"
description:
- Whether the user entry should be present or not
create:
required: false
choices: [ "yes", "no" ]
default: "yes"
description:
- Used with C(state=present). If specified, the file will be created
if it does not already exist. If set to "no", will fail if the
file does not exist
author: Diederik van der Boor
"""
from ansible.module_utils.basic import *
import hashlib
import re
RE_LINE = re.compile('^"([^"]+)" "([^"]+)"$')
class ParsedLine(object):
def __init__(self, line, username=None, password=None):
self.line = line
self.username = username
self.password = password
@property
def is_entry(self):
return self.username is not None
@classmethod
def create(cls, username, password):
full_line = '"{0}" "{1}"'.format(username, password)
return cls(full_line, username, password)
def __repr__(self):
return 'ParsedLine({0})'.format(self.line)
class PostgresPasswdFile(object):
def __init__(self, filename, default_scheme='md5'):
self.filename = filename
self.default_scheme = default_scheme
self._lines = self._load()
def _encrypt(self, password, username, scheme=None):
if scheme is None:
scheme = self.default_scheme
if scheme == 'md5':
# MD5 encrypted
return "md5" + hashlib.md5(password + username).hexdigest()
else:
# Plain text!
return password
def _load(self):
try:
fp = open(self.filename, 'r')
except IOError:
return []
# Read all lines to an internal buffer.
lines = []
with fp:
for line in fp:
match = RE_LINE.match(line)
if match:
linedata = ParsedLine(match.group(0), match.group(1), match.group(2))
else:
# comment or other unknown line
linedata = ParsedLine(line)
lines.append(linedata)
return lines
def save(self):
# Write all lines back in the file
with open(self.filename, 'w') as fp:
if not self._lines:
fp.truncate(0)
else:
fp.write("\n".join(parsedline.line for parsedline in self._lines) + "\n")
def users(self):
return (parsedline.username for parsedline in self._lines if parsedline.is_entry)
def set_password(self, username, password):
"""
Insert or replace a user password.
"""
# Insert in the same internal parsed-tuple format
password_crypt = self._encrypt(password, username)
new_line = ParsedLine.create(username, password_crypt)
seen = False
new_lines = []
for line in self._lines[:]:
if line.username and line.username == username:
if not seen: # Second occurrence is skipped
new_lines.append(new_line)
seen = True
else:
# Preserve old line
new_lines.append(line)
if not seen:
self._lines.append(new_line)
else:
self._lines = new_lines
def verify(self, username, password):
for parsedline in self._lines:
if parsedline.is_entry and parsedline.username == username:
# Found line for username
line_pw = parsedline.password
if line_pw.startswith('md5') and len(line_pw) == 35:
# Password is MD5 encrypted (32 chars + "md5" prefix)
return self._encrypt(password, username, 'md5') == line_pw
else:
# Password is plain text.
return line_pw == password
return False
def delete(self, username):
self._lines = [
parsedline for parsedline in self._lines if not parsedline.is_entry or parsedline.username != username
]
def check_file_attrs(module, changed, message):
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
if changed:
message += " and "
changed = True
message += "ownership, perms or SE linux context changed"
return message, changed
def present(dest, username, password, crypt_scheme, create, check_mode):
"""
Ensures user is present
Returns (msg, changed)
"""
if not os.path.exists(dest):
if not create:
raise ValueError('Destination %s does not exist' % dest)
if check_mode:
return ("Create %s" % dest, True)
pwfile = PostgresPasswdFile(dest)
pwfile.set_password(username, password)
pwfile.save()
return ("Created %s and added %s" % (dest, username), True)
else:
pwfile = PostgresPasswdFile(dest, default_scheme=crypt_scheme)
found = pwfile.verify(username, password)
if found:
return ("%s already present" % username, False)
else:
if not check_mode:
pwfile.set_password(username, password)
pwfile.save()
return ("Add/update %s" % username, True)
def absent(dest, username, check_mode):
"""
Ensures user is absent
Returns (msg, changed)
"""
if not os.path.exists(dest):
raise ValueError("%s does not exists" % dest)
pwfile = PostgresPasswdFile(dest)
if username not in pwfile.users():
return ("%s not present" % username, False)
else:
if not check_mode:
pwfile.delete(username)
pwfile.save()
return ("Remove %s" % username, True)
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(default="/etc/pgbouncer/userlist.txt", required=False, aliases=["dest", "destfile"]),
name=dict(default=None, required=True, aliases=["username"]),
password=dict(default=None, required=False),
crypt_scheme=dict(required=False, default='md5'),
state=dict(required=False, default="present"),
create=dict(type='bool', default='yes'),
),
add_file_common_args=True,
supports_check_mode=True,
)
path = module.params['path']
username = module.params['name']
password = module.params['password']
crypt_scheme = module.params['crypt_scheme']
state = module.params['state']
create = module.params['create']
check_mode = module.check_mode
try:
if state == 'present':
(msg, changed) = present(path, username, password, crypt_scheme, create, check_mode)
elif state == 'absent':
(msg, changed) = absent(path, username, check_mode)
else:
module.fail_json(msg="Invalid state: %s" % state)
return # noqa
check_file_attrs(module, changed, msg)
module.exit_json(msg=msg, changed=changed)
except Exception, e:
module.fail_json(msg=str(e))
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment