Last active
August 29, 2015 13:57
-
-
Save bootc/9583012 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# pam_python module to authenticate using SSH keys in an SSH agent | |
# Copyright (C) 2014 Chris Boot <bootc@bootc.net> | |
# | |
# This program is free software; you can redistribute it and/or | |
# modify it under the terms of the GNU General Public License | |
# as published by the Free Software Foundation; either version 2 | |
# of the License, or (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program; if not, write to the Free Software | |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |
from __future__ import print_function | |
from binascii import hexlify | |
from paramiko import Message | |
from paramiko.agent import Agent | |
from paramiko.common import rng | |
from paramiko.dsskey import DSSKey | |
from paramiko.rsakey import RSAKey | |
import argparse | |
import base64 | |
import grp | |
import os | |
import pwd | |
import stat | |
import sys | |
import traceback | |
### | |
### Constants | |
### | |
AUTHORIZED_KEYS = '.ssh/authorized_keys_pam' | |
COUNT_SIGN_BYTES = 128 | |
### | |
### Globals | |
### | |
args = { | |
'debug': False, | |
} | |
auth_success = None | |
### | |
### Classes | |
### | |
class InvalidAuthorizedKey(Exception): | |
def __init__(self, line, exc): | |
self.line = line | |
self.exc = exc | |
self.args = (line, exc) | |
class AuthorizedKeysEntry: | |
def __init__(self, key=None, comment=None): | |
self.valid = (key is not None) | |
self.key = key | |
self.comment = comment | |
def from_line(cls, line): | |
fields = line.split(' ', 3) | |
if len(fields) < 2: | |
# Bad number of fields | |
return None | |
if len(fields) < 3: | |
fields.append(None) | |
keytype, key, comment = fields | |
# Decide what kind of key we're looking at and create an object | |
# to hold it accordingly. | |
try: | |
if keytype == 'ssh-rsa': | |
key = RSAKey(data=base64.decodestring(key)) | |
elif keytype == 'ssh-dss': | |
key = DSSKey(data=base64.decodestring(key)) | |
else: | |
return None | |
except binascii.Error, e: | |
raise InvalidAuthorizedKey(line, e) | |
return cls(key, comment) | |
from_line = classmethod(from_line) | |
def to_line(self): | |
if self.valid: | |
return '%s %s %s\n' % (self.key.get_name(), self.key.get_base64(), | |
self.comment or '') | |
return None | |
def __repr__(self): | |
return '<AuthorizedKeysEntry %r: %r>' % (self.key, self.comment) | |
class AuthorizedKeys: | |
def __init__(self, filename=None): | |
self.entries = [] | |
if filename is not None: | |
self.load(filename) | |
def load(self, filename): | |
f = open(filename, 'r') | |
for line in f: | |
line = line.strip() | |
if (len(line) == 0) or (line[0] == '#'): | |
continue | |
e = AuthorizedKeysEntry.from_line(line) | |
if e is not None: | |
self.entries.append(e) | |
f.close() | |
### | |
### Functions | |
### | |
def debug(s): | |
global args | |
if args.debug: | |
print(s, file=sys.stderr) | |
def fpr_hex(key): | |
fpr_hex = hexlify(key.get_fingerprint()) | |
return ':'.join(fpr_hex[i:i+2] for i in range(0, len(fpr_hex), 2)) | |
def parse_args(args_in): | |
global args | |
argp = argparse.ArgumentParser( | |
prog='pam_auth_ssh_agent.py', | |
description='PAM SSH Agent script', | |
epilog="""\ | |
This script is designed to run from PAM under pam_python, and will | |
authenticate requests using your SSH agent against a set of authorized keys. | |
This is broadly similar to how SSH can use SSH keys for login, but can be | |
used for authenticating sudo requests or similar. | |
""") | |
argp.add_argument('--debug', action='store_true', | |
help='enables verbose logging') | |
args = argp.parse_args(args_in) | |
def secure_path(filename, uid, home): | |
""" | |
Logic borrowed from OpenSSH's auth_secure_path() | |
""" | |
path = os.path.realpath(filename) | |
st = os.stat(filename) | |
home = os.path.realpath(home) | |
if not stat.S_ISREG(st.st_mode): | |
print('{} is not a regular file'.format(filename), file=sys.stderr) | |
return False | |
if not secure_permissions(st, uid): | |
print('bad ownership or modes for file {}'.format(filename), | |
file=sys.stderr) | |
return False | |
# for each component of the canonical path, walking upwards | |
while True: | |
path = os.path.dirname(path) | |
st = os.stat(path) | |
if not secure_permissions(st, uid): | |
print('bad ownership or modes for directory {}'.format(path), | |
file=sys.stderr) | |
return False | |
if path == home: | |
break | |
if path == '/' or path == '.': | |
break | |
return True | |
def secure_permissions(st, uid): | |
""" | |
Ensures the file is owned by uid or root, and that it is not world | |
writeable. If it is group writeable, the group must have exactly one member: | |
the user given as a parameter to this function. Logic borrowed from Debian's | |
user-group-modes.patch for OpenSSH. | |
""" | |
# Must be owned by root or user we are checking | |
if st.st_uid != 0 and st.st_uid != uid: | |
return False | |
# Must not be world writeable | |
if st.st_mode & 0002: | |
return False | |
# Additional checks if it's group writeable | |
if st.st_mode & 0020: | |
members = 0 | |
# Look up the file's group owner | |
try: | |
grent = grp.getgrgid(st.st_gid) | |
except KeyError: | |
return False | |
# Iterate the passwd database and check users' primary GIDs | |
for pwent in pwd.getpwall(): | |
if pwent.pw_gid == grent.gr_gid: | |
members += 1 | |
if pwent.pw_uid != uid: | |
return False | |
# Look up the file's user owner | |
try: | |
pwent = pwd.getpwuid(st.st_uid) | |
except KeyError: | |
return False | |
# Check supplementary group members | |
if len(grent.gr_mem): | |
members += 1 | |
if grent.gr_mem[0] != pwent.pw_name or len(grent.gr_mem) > 1: | |
return False | |
if not members: | |
return False | |
return True | |
def find_matching_keys(a, b): | |
for key_a in a: | |
for key_b in b: | |
if str(key_a) == str(key_b): | |
return [ key_a, key_b ] | |
return None | |
def pam_sm_authenticate(pamh, flags, args_in): | |
global args, auth_success | |
if auth_success != None: | |
print('Invalid internal state', file=sys.stderr) | |
return pamh.PAM_SERVICE_ERR | |
auth_success = False | |
try: | |
pam_user = pamh.get_user(None) | |
except pamh.exception, e: | |
return e.pam_result | |
if pam_user == None: | |
return pamh.PAM_USER_UNKNOWN | |
if os.getenv('SSH_AUTH_SOCK') == None: | |
return pamh.PAM_USER_UNKNOWN | |
args_in.pop(0) | |
# Parse command-line arguments first | |
parse_args(args_in) | |
# Look up the user to authenticate given by PAM | |
try: | |
pwent = pwd.getpwnam(pam_user) | |
except KeyError: | |
print('Invalid PAM_USER given: {}'.format(pam_user), | |
file=sys.stderr) | |
return pamh.PAM_USER_UNKNOWN | |
debug('Authenticating user "{}"'.format(pwent.pw_name)) | |
# Ensure the user's home directory exists | |
if not os.path.exists(pwent.pw_dir): | |
print('{}: does not exist'.format(pwent.pw_dir), file=sys.stderr) | |
return pamh.PAM_AUTH_ERR | |
# Work out the path to the authorized_keys file | |
authorized_keys_file = os.path.join(pwent.pw_dir, AUTHORIZED_KEYS) | |
if not os.path.isfile(authorized_keys_file): | |
print('{}: does not exist'.format(authorized_keys_file), | |
file=sys.stderr) | |
return pamh.PAM_AUTH_ERR | |
debug('Looking for keys in {}'.format(authorized_keys_file)) | |
# Perform security checks on the authorized_keys file | |
if not secure_path(authorized_keys_file, pwent.pw_uid, pwent.pw_dir): | |
print('Path security checks failed.', file=sys.stderr) | |
return pamh.PAM_AUTH_ERR | |
# Read authorized keys from disk | |
try: | |
authorized_keys_obj = AuthorizedKeys(authorized_keys_file) | |
except IOError, e: | |
print('{}: \'{}\''.format(e.strerror, e.filename)) | |
return pamh.PAM_AUTH_ERR | |
authorized_keys = map(lambda x: x.key, authorized_keys_obj.entries) | |
# Connect to the SSH agent and fetch its keys | |
agent = Agent() | |
agent_keys = agent.get_keys() | |
# Find the first key in the agent that matches an authorized key | |
match = find_matching_keys(authorized_keys, agent_keys) | |
if not match: | |
print('No agent keys match authorized_keys', file=sys.stderr) | |
return pamh.PAM_AUTH_ERR | |
authorized_key, agent_key = match | |
debug('Attempting authentication with key {}'.format( | |
fpr_hex(authorized_key))) | |
plaintext = rng.read(COUNT_SIGN_BYTES) | |
sig = agent_key.sign_ssh_data(rng, plaintext) | |
if not authorized_key.verify_ssh_sig(plaintext, Message(sig)): | |
print('Signature verification failed', file=sys.stderr) | |
return pamh.PAM_AUTH_ERR | |
print('Authenticated using key: {}'.format(fpr_hex(authorized_key))) | |
# Success | |
return pamh.PAM_SUCCESS | |
def pam_sm_setcred(pamh, flags, argv): | |
if auth_success: | |
return pamh.PAM_SUCCESS | |
else: | |
return pamh.PAM_CRED_UNAVAIL | |
# vim: ts=4 sts=4 sw=4 tw=80 et smarttab |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment