Skip to content

Instantly share code, notes, and snippets.

@bootc
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bootc/9583012 to your computer and use it in GitHub Desktop.
Save bootc/9583012 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#
# pam_python module to authenticate using SSH keys in an SSH agent
# Copyright (C) 2014 Chris Boot <bootc@bootc.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from __future__ import print_function
from binascii import hexlify
from paramiko import Message
from paramiko.agent import Agent
from paramiko.common import rng
from paramiko.dsskey import DSSKey
from paramiko.rsakey import RSAKey
import argparse
import base64
import grp
import os
import pwd
import stat
import sys
import traceback
###
### Constants
###
AUTHORIZED_KEYS = '.ssh/authorized_keys_pam'
COUNT_SIGN_BYTES = 128
###
### Globals
###
args = {
'debug': False,
}
auth_success = None
###
### Classes
###
class InvalidAuthorizedKey(Exception):
def __init__(self, line, exc):
self.line = line
self.exc = exc
self.args = (line, exc)
class AuthorizedKeysEntry:
def __init__(self, key=None, comment=None):
self.valid = (key is not None)
self.key = key
self.comment = comment
def from_line(cls, line):
fields = line.split(' ', 3)
if len(fields) < 2:
# Bad number of fields
return None
if len(fields) < 3:
fields.append(None)
keytype, key, comment = fields
# Decide what kind of key we're looking at and create an object
# to hold it accordingly.
try:
if keytype == 'ssh-rsa':
key = RSAKey(data=base64.decodestring(key))
elif keytype == 'ssh-dss':
key = DSSKey(data=base64.decodestring(key))
else:
return None
except binascii.Error, e:
raise InvalidAuthorizedKey(line, e)
return cls(key, comment)
from_line = classmethod(from_line)
def to_line(self):
if self.valid:
return '%s %s %s\n' % (self.key.get_name(), self.key.get_base64(),
self.comment or '')
return None
def __repr__(self):
return '<AuthorizedKeysEntry %r: %r>' % (self.key, self.comment)
class AuthorizedKeys:
def __init__(self, filename=None):
self.entries = []
if filename is not None:
self.load(filename)
def load(self, filename):
f = open(filename, 'r')
for line in f:
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
e = AuthorizedKeysEntry.from_line(line)
if e is not None:
self.entries.append(e)
f.close()
###
### Functions
###
def debug(s):
global args
if args.debug:
print(s, file=sys.stderr)
def fpr_hex(key):
fpr_hex = hexlify(key.get_fingerprint())
return ':'.join(fpr_hex[i:i+2] for i in range(0, len(fpr_hex), 2))
def parse_args(args_in):
global args
argp = argparse.ArgumentParser(
prog='pam_auth_ssh_agent.py',
description='PAM SSH Agent script',
epilog="""\
This script is designed to run from PAM under pam_python, and will
authenticate requests using your SSH agent against a set of authorized keys.
This is broadly similar to how SSH can use SSH keys for login, but can be
used for authenticating sudo requests or similar.
""")
argp.add_argument('--debug', action='store_true',
help='enables verbose logging')
args = argp.parse_args(args_in)
def secure_path(filename, uid, home):
"""
Logic borrowed from OpenSSH's auth_secure_path()
"""
path = os.path.realpath(filename)
st = os.stat(filename)
home = os.path.realpath(home)
if not stat.S_ISREG(st.st_mode):
print('{} is not a regular file'.format(filename), file=sys.stderr)
return False
if not secure_permissions(st, uid):
print('bad ownership or modes for file {}'.format(filename),
file=sys.stderr)
return False
# for each component of the canonical path, walking upwards
while True:
path = os.path.dirname(path)
st = os.stat(path)
if not secure_permissions(st, uid):
print('bad ownership or modes for directory {}'.format(path),
file=sys.stderr)
return False
if path == home:
break
if path == '/' or path == '.':
break
return True
def secure_permissions(st, uid):
"""
Ensures the file is owned by uid or root, and that it is not world
writeable. If it is group writeable, the group must have exactly one member:
the user given as a parameter to this function. Logic borrowed from Debian's
user-group-modes.patch for OpenSSH.
"""
# Must be owned by root or user we are checking
if st.st_uid != 0 and st.st_uid != uid:
return False
# Must not be world writeable
if st.st_mode & 0002:
return False
# Additional checks if it's group writeable
if st.st_mode & 0020:
members = 0
# Look up the file's group owner
try:
grent = grp.getgrgid(st.st_gid)
except KeyError:
return False
# Iterate the passwd database and check users' primary GIDs
for pwent in pwd.getpwall():
if pwent.pw_gid == grent.gr_gid:
members += 1
if pwent.pw_uid != uid:
return False
# Look up the file's user owner
try:
pwent = pwd.getpwuid(st.st_uid)
except KeyError:
return False
# Check supplementary group members
if len(grent.gr_mem):
members += 1
if grent.gr_mem[0] != pwent.pw_name or len(grent.gr_mem) > 1:
return False
if not members:
return False
return True
def find_matching_keys(a, b):
for key_a in a:
for key_b in b:
if str(key_a) == str(key_b):
return [ key_a, key_b ]
return None
def pam_sm_authenticate(pamh, flags, args_in):
global args, auth_success
if auth_success != None:
print('Invalid internal state', file=sys.stderr)
return pamh.PAM_SERVICE_ERR
auth_success = False
try:
pam_user = pamh.get_user(None)
except pamh.exception, e:
return e.pam_result
if pam_user == None:
return pamh.PAM_USER_UNKNOWN
if os.getenv('SSH_AUTH_SOCK') == None:
return pamh.PAM_USER_UNKNOWN
args_in.pop(0)
# Parse command-line arguments first
parse_args(args_in)
# Look up the user to authenticate given by PAM
try:
pwent = pwd.getpwnam(pam_user)
except KeyError:
print('Invalid PAM_USER given: {}'.format(pam_user),
file=sys.stderr)
return pamh.PAM_USER_UNKNOWN
debug('Authenticating user "{}"'.format(pwent.pw_name))
# Ensure the user's home directory exists
if not os.path.exists(pwent.pw_dir):
print('{}: does not exist'.format(pwent.pw_dir), file=sys.stderr)
return pamh.PAM_AUTH_ERR
# Work out the path to the authorized_keys file
authorized_keys_file = os.path.join(pwent.pw_dir, AUTHORIZED_KEYS)
if not os.path.isfile(authorized_keys_file):
print('{}: does not exist'.format(authorized_keys_file),
file=sys.stderr)
return pamh.PAM_AUTH_ERR
debug('Looking for keys in {}'.format(authorized_keys_file))
# Perform security checks on the authorized_keys file
if not secure_path(authorized_keys_file, pwent.pw_uid, pwent.pw_dir):
print('Path security checks failed.', file=sys.stderr)
return pamh.PAM_AUTH_ERR
# Read authorized keys from disk
try:
authorized_keys_obj = AuthorizedKeys(authorized_keys_file)
except IOError, e:
print('{}: \'{}\''.format(e.strerror, e.filename))
return pamh.PAM_AUTH_ERR
authorized_keys = map(lambda x: x.key, authorized_keys_obj.entries)
# Connect to the SSH agent and fetch its keys
agent = Agent()
agent_keys = agent.get_keys()
# Find the first key in the agent that matches an authorized key
match = find_matching_keys(authorized_keys, agent_keys)
if not match:
print('No agent keys match authorized_keys', file=sys.stderr)
return pamh.PAM_AUTH_ERR
authorized_key, agent_key = match
debug('Attempting authentication with key {}'.format(
fpr_hex(authorized_key)))
plaintext = rng.read(COUNT_SIGN_BYTES)
sig = agent_key.sign_ssh_data(rng, plaintext)
if not authorized_key.verify_ssh_sig(plaintext, Message(sig)):
print('Signature verification failed', file=sys.stderr)
return pamh.PAM_AUTH_ERR
print('Authenticated using key: {}'.format(fpr_hex(authorized_key)))
# Success
return pamh.PAM_SUCCESS
def pam_sm_setcred(pamh, flags, argv):
if auth_success:
return pamh.PAM_SUCCESS
else:
return pamh.PAM_CRED_UNAVAIL
# vim: ts=4 sts=4 sw=4 tw=80 et smarttab
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment