Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Last active July 8, 2022 11:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save adiroiban/59eb28cf767aec9535fda8ac1162401f to your computer and use it in GitHub Desktop.
Save adiroiban/59eb28cf767aec9535fda8ac1162401f to your computer and use it in GitHub Desktop.
Python RADIUS client CHAP / MS-CHAP-v1 / MS-CHAP-v2
# Code under public domain.
"""
Authentication based on a remote RADIUS server.
"""
from __future__ import absolute_import, unicode_literals
import contextlib
import os
import socket
import struct
import hashlib
import radius
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import modes
from twisted.internet import defer, threads
from zope.interface import implementer
from chevah.server import force_unicode
from chevah.server.authentication.base import (
apply_group_mapping,
check_password_credentials_validity,
)
from chevah.server.authentication.configuration import (
_AuthenticationConfigurationBase,
)
from chevah.server.commons.constant import (
ADDRESS_UNKNOWN,
TYPE_NAME,
)
from chevah.server.commons.credential import PasswordCredentials
from chevah.server.commons.exception import (
CredentialsForbidden,
InvalidAuthentication,
ServerException,
)
from chevah.server.commons.interface import (
IApplicationAccountConfiguration,
IAuthentication,
IRADIUSAuthenticationConfiguration,
)
from chevah.server.commons.match import get_matched_groups
from chevah.server.commons.runnable import Runnable
from chevah.server.configuration.model import (
WritableBoolean,
WritableNumber,
WritableString,
WritableStringCaseInsensitive,
WritableStringMultipleNonDuplicateTuples,
)
from chevah.server.configuration.option import (
AddressPortOptionsMixin,
)
from chevah.server.identity.model import ExternalAccountConfigurationAbstract
def patched_update(self, data):
"""
Sets keys via __setitem__() to invoke validation.
This patched upstream code to handle multiple values for the same
attribute.
"""
for key, values in data.items():
for value in values:
self[key] = value
@staticmethod
def patched_unpack(data):
"""
Unpacks data into Attributes instance.
This patched upstream code to handle multiple values for the same
attribute.
"""
pos, attrs = 0, {}
while pos < len(data):
code, length = struct.unpack('BB', data[pos:pos + 2])
new_value = data[pos + 2:pos + length]
if code in attrs:
attrs[code].append(new_value)
else:
attrs[code] = [new_value]
pos += length
return radius.Attributes(attrs)
radius.Attributes.update = patched_update
radius.Attributes.unpack = patched_unpack
# There is a bug upstream in which this is 4.
ATTR_CHAP_PASSWORD = 3
radius.ATTRS[ATTR_CHAP_PASSWORD] = 'CHAP-Password'
# 311 is the vendor ID for Microsoft.
MS_VENDOR_ID = struct.pack('>I', 311)
# See: https://datatracker.ietf.org/doc/html/rfc2548
# While MS-CHAP is 3 way protocol, for RADIUS we do a 2 way protocol
# and the client will generate it's a random challenge instead of
# receiving it from the server.
MS_CHAP_CHALLENGE_TYPE = struct.pack('>B', 11)
MS_CHAP_V1_RESPONSE_TYPE = struct.pack('>B', 1)
# Size for response is always 52.
# Same size for V1 and V2.
MS_CHAP_RESPONSE_LENGTH = struct.pack('>B', 52)
# Identifier is the same for V1 and V2.
# In PPP this is used to track packages, but RADIUS has a separate
# package tracking field, so this is not used.
MS_CHAP_RESPONSE_IDENTIFIER = b'\x00'
MS_CHAP_V1_RESPONSE_FLAGS = b'\x01' # Prefer NT password aka MD4.
MS_CHAP_V2_RESPONSE_TYPE = struct.pack('>B', 25)
MS_CHAP_V2_RESPONSE_FLAGS = b'\x00' # Reserved.
class ChevahRadius(radius.Radius):
"""
Patched version while the upstream project is re-enabled.
"""
def send_message(self, message):
"""
Raise VerificationError if we got a response but couldn't be validated,
instead of raising the same error as timeout.
"""
send = message.pack()
addrs = socket.getaddrinfo(
self.host,
self.port,
0,
socket.SOCK_DGRAM,
)
@contextlib.contextmanager
def connect(res):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.timeout)
sock.connect(sa)
yield sock
finally:
if sock is not None:
sock.close()
def attempt(res):
with connect(res) as c:
c.send(send)
recv = c.recv(radius.PACKET_MAX)
return message.verify(recv)
err = None
for i in range(1, self.retries + 1):
for res in addrs:
try:
return attempt(res)
except socket.timeout:
# Just try again on timeout.
err = None
except radius.VerificationError as e:
if isinstance(err, radius.VerificationError):
# No need to retry, as most probably we have an
# invalid shared secret.
break
# Silently discard invalid replies (as RFC states).
# This means the implementation discards the packet without
# further processing.
# The implementation SHOULD provide the capability of
# logging the error, including the contents of the silently
# discarded packet, and SHOULD record the event in a
# statistics counter.
# The logging is done in the authentication failed event.
err = e
except socket.error as e:
err = radius.SocketError(e)
else:
# Inner loop not broken.
continue
break
if err is not None:
raise err
raise radius.NoResponse()
@implementer(IRADIUSAuthenticationConfiguration)
class RADIUSAuthenticationConfiguration(
_AuthenticationConfigurationBase, AddressPortOptionsMixin):
"""
Configuration for the RADIUS authentication.
"""
PORT_DEFAULT = 1812
shared_secret = WritableString('shared_secret')
continue_authentication = WritableBoolean(
'continue_authentication', default=False)
group_mapping = WritableStringMultipleNonDuplicateTuples('group_mapping')
timeout = WritableNumber(
'timeout', default=60, allow_negative=False, allow_zero=False)
authentication_type = WritableStringCaseInsensitive(
'authentication_type', default=TYPE_NAME.MS_CHAP_V2)
debug = WritableBoolean('debug', default=False)
nas_port = WritableNumber('nas_port', default=0)
@implementer(IApplicationAccountConfiguration)
class RADIUSApplicationAccountConfiguration(
ExternalAccountConfigurationAbstract):
"""
Account configured using the RADIUS method.
"""
@property
def kind_name(self):
"""
See: `ExternalAccountConfigurationAbstract`.
"""
return TYPE_NAME.RADIUS
@implementer(IAuthentication)
class RADIUSAuthentication(Runnable):
"""
Authenticate against a remote RADIUS server.
"""
PROPERTIES_REQUIRING_RESTART = (
'address',
'port',
'shared_secret',
'group_mapping',
'timeout',
'authentication_type',
'nas_port',
'debug',
)
# The remote RADIUS server that was available at startup.
_server = None
# List of list of group mapping definition. Already validated.
_group_mapping = None
# Number of the port to request RADIUS access.
_nas_port = None
# Updated at runtime with available authentication type.
_authentication_types = None
# Updated after stat with configured authentication method.
_accessMethod = None
_debug = False
def _resetState(self):
"""
Initialize the internal state.
"""
self._group_mapping = None
self._server = None
self._timeout = None
self._nas_port = None
self._debug = False
self._accessMethod = None
self._authentication_types = {
TYPE_NAME.PAP: self._access_request_PAP,
TYPE_NAME.CHAP: self._access_request_CHAP,
TYPE_NAME.MS_CHAP_V1: self._access_request_MSCHAPv1,
TYPE_NAME.MS_CHAP_V2: self._access_request_MSCHAPv2,
}
@defer.inlineCallbacks
def _onStart(self, avatar=None):
"""
See: Runnable.
"""
self._resetState()
self._validateConfiguration()
self._debug = self._configuration.debug
# Configuration and RADIUS environment can be changed at any time,
# At least try to connect to check
# that the server is available and do a simple configuration
# validation. In this way we try to prevent late errors raised
# due to miss-configuration.
self._timeout = self._configuration.timeout
self._server = ChevahRadius(
self._configuration.shared_secret.encode('utf-8'),
host=self._configuration.address,
port=self._configuration.port,
retries=3,
timeout=self._timeout,
)
test_credentials = PasswordCredentials( # noqa:bandit
username='connection-test-user',
password='connection-test-password',
peer=ADDRESS_UNKNOWN,
creator=self,
)
# We run a simple authentication to see that we get a reject
# request that will signal that server is up and the shared secret
# is valid.
# If any of these are not correct we will not receive a response from
# the remote RADIUS server.
try:
yield self.authenticate(test_credentials)
except CredentialsForbidden:
"""
All good. We expect to have the test credentials rejected.
"""
except InvalidAuthentication as error:
raise ServerException(
'Invalid RADIUS configuration at start. %s' % (error.message,))
def _onStop(self, avatar=None):
"""
See: Runnable.
"""
self._resetState()
def _validateConfiguration(self):
"""
Check that configuration is valid.
"""
if not self._configuration.shared_secret:
raise ServerException('Missing shared_secret configuration.')
self._nas_port = int(self._configuration.nas_port)
if self._nas_port < 0:
raise ServerException(
'NAS-Port should be greater or equal to zero.')
self._group_mapping = self._configuration.group_mapping
if self._group_mapping:
fallback = self._group_mapping[0]
if len(fallback) > 1:
raise ServerException(
'The fallback group should be a single value.')
for rule in self._group_mapping[1:]:
if len(rule) != 3:
raise ServerException(
'The group mapping should have 3 elements.')
# Check that expression is valid.
get_matched_groups(
value='', rule=rule[1], usage='group_mapping')
authentication_type = self._configuration.authentication_type
self._accessMethod = self._authentication_types.get(
authentication_type, None)
if not self._accessMethod:
raise ServerException(
'Unknown "authentication_type" configured: "%s".' % (
authentication_type,))
def authenticate(self, credentials):
"""
See: `IAuthentication`
"""
# FIXME:5695:
# Check if the connection from which the credentials were created
# has a lower timeout than the radius timeout and temporarily
# increase the timeout.
def cb_request_auth(ignored):
"""
Called when credentials are valid.
"""
request = self._accessMethod(
credentials.username,
credentials.password,
)
request.attributes["NAS-Port"] = struct.pack('>I', self._nas_port)
if self._debug:
entry = _resolve_attributes(request.attributes)
self.emitEvent(
'20000',
data={'details': 'REQUEST(%s):%s' % (request.code, entry)})
return threads.deferToThread(
self._server.send_message,
request,
)
deferred = check_password_credentials_validity('RADIUS', credentials)
deferred.addCallback(cb_request_auth)
deferred.addCallback(self._cbGotResponse, credentials)
deferred.addErrback(self._ebCheckError)
return deferred
def _access_request_PAP(self, username, password):
"""
Return message for PAP Auth-Type.
"""
return self._server.access_request_message(
username.encode('utf-8'), password.encode('utf-8'))
def _access_request_CHAP(self, username, password):
"""
Return message for CHAP Auth-Type.
"""
message = radius.Message(
self._server.secret, radius.CODE_ACCESS_REQUEST)
message.attributes['User-Name'] = username.encode('utf-8')
chap_id = chr(message.id)
password_hash = hashlib.md5(
chap_id +
password.encode('utf-8') +
message.authenticator
).digest()
message.attributes['CHAP-Password'] = chap_id + password_hash
return message
def _access_request_MSCHAPv1(self, username, password):
"""
Return message for MS-CHAP-V1 Auth-Type.
"""
message = radius.Message(
self._server.secret, radius.CODE_ACCESS_REQUEST)
message.attributes['User-Name'] = username.encode('utf-8')
challenge = os.urandom(8)
# Size for request contains the CHAP type and size itself.
# This is why we have extra 2 bytes.
challange_length = struct.pack('>B', len(challenge) + 2)
message.attributes['Vendor-Specific'] = (
MS_VENDOR_ID +
MS_CHAP_CHALLENGE_TYPE +
challange_length +
challenge
)
# It should contain the password hashed in both LAN Manager (LM) format
# as well as NT format.
# We only provide the NT password as LAN is obsolete and v2 only used
# NT password anyway.
# Payload is just raw challenge encrypted with password.
message.attributes['Vendor-Specific'] = (
MS_VENDOR_ID +
MS_CHAP_V1_RESPONSE_TYPE +
MS_CHAP_RESPONSE_LENGTH +
MS_CHAP_RESPONSE_IDENTIFIER +
MS_CHAP_V1_RESPONSE_FLAGS +
b'\x00' * 24 + # Don't compute LM password
_encrypt_nt_password(challenge, password)
)
return message
def _access_request_MSCHAPv2(self, username, password):
"""
Return message for MS-CHAP-V2 Auth-Type.
"""
message = radius.Message(
self._server.secret, radius.CODE_ACCESS_REQUEST)
message.attributes['User-Name'] = username.encode('utf-8')
# Size for request contains the CHAP type and size itself.
# This is whe we have extra 2 bytes.
challange_length = struct.pack('>B', len(message.authenticator) + 2)
message.attributes['Vendor-Specific'] = (
MS_VENDOR_ID +
MS_CHAP_CHALLENGE_TYPE +
challange_length +
message.authenticator
)
peer_challenge = os.urandom(16)
ms_chap_v2_challenge = hashlib.sha1(
peer_challenge +
message.authenticator +
username.encode('utf-8')
).digest()[:8]
# https://datatracker.ietf.org/doc/html/rfc2548#section-2.3.2
message.attributes['Vendor-Specific'] = (
MS_VENDOR_ID +
MS_CHAP_V2_RESPONSE_TYPE +
MS_CHAP_RESPONSE_LENGTH +
MS_CHAP_RESPONSE_IDENTIFIER +
MS_CHAP_V2_RESPONSE_FLAGS +
peer_challenge +
b'\x00' * 8 + # Reserved.
_encrypt_nt_password(ms_chap_v2_challenge, password)
)
return message
def _cbGotResponse(self, reply, credentials):
"""
Called when we got a response from the server.
"""
# Resolve from RADIUS number attributes, to attribute names
entry = _resolve_attributes(reply.attributes)
if self._debug:
self.emitEvent(
'20000',
data={'details': 'RESPONSE(%s):%s' % (reply.code, entry)})
if reply.code == radius.CODE_ACCESS_REJECT:
raise CredentialsForbidden('Access rejected.')
elif reply.code == radius.CODE_ACCESS_CHALLENGE:
raise CredentialsForbidden('Access challenge not supported.')
if reply.code != radius.CODE_ACCESS_ACCEPT:
# Use a different message to make it easier to identify this
# error case in a support request.
raise CredentialsForbidden('Access denied.')
external = apply_group_mapping(self._group_mapping, {}, entry)
account = RADIUSApplicationAccountConfiguration(
parent=self.root.configuration.identity.accounts,
name=credentials.username,
external=external,
)
# We return no explicit group configuration and let the common code
# to resolve the group found in the account's configuration.
return account, None
def _ebCheckError(self, failure):
"""
Called as the last errback for the authentication process.
"""
if failure.check(CredentialsForbidden):
if self._configuration.continue_authentication:
# Allow other authentication to check the credentials.
return None
return failure
if failure.check(radius.SocketError):
raise InvalidAuthentication('RADIUS connection failed. %s' % (
force_unicode(failure.value)))
if failure.check(radius.NoResponse):
raise InvalidAuthentication(
'RADIUS connection failed. Timeout error.')
if failure.check(radius.VerificationError):
raise InvalidAuthentication(
'RADIUS connection failed. '
'Probably due to invalid shared secret. '
'If configured with a valid shared secret '
'this might be a sign of a security attack.')
# Wrap any error into an InvalidAuthentication... but this
# is more like an ISE.
raise InvalidAuthentication(
'RADIUS authentication failed with an internal error. %s' % (
force_unicode(failure.value)))
def authenticateAdministrator(self, credentials):
"""
See: `IAuthentication`
"""
return defer.fail(InvalidAuthentication(
'RADIUS administrators are not yet supported.'))
def _resolve_attributes(attributes):
"""
Resolve from RADIUS number attributes, to attribute names.
"""
result = {}
for key, value in attributes.items():
key_name = radius.ATTRS.get(key, 'Attr-%s' % (key,))
result[key_name] = value
return result
def _encrypt_nt_password(challenge, password):
"""
Encrypt the password in NT format (MD4) to be used for MS-CHAP response
body
From https://stackoverflow.com/q/20037402/539264:
1. password should be utf-16 without VOM.
2. Hash the converted password with MD4 --> NT_hash (16 bytes long).
3. Pad NT_hash with 0's to 21 bytes (add five 0's to the end).
4. Split NT_hash into three 7-byte chunks ([:14], [14:28], and [28:]).
5. Add fake odd-parity values to each byte of each chunk,
creating three 8-byte keys. Parity is ignored anyway.
6. Encrypt the 16-byte Client Challenge Hash three times with DES,
each time with a different 8-byte key from step 5.
"""
key_source = (
hashlib.new('md4', password.encode('utf-16le')).digest() +
b'\x00' * 5
)
mode = modes.CBC(b'\x00' * 8)
# Simple DES has no parity bit.
encryptor = Cipher(TripleDES(
_key56_to_key64(key_source[:7])), mode).encryptor()
part1 = encryptor.update(challenge) + encryptor.finalize()
encryptor = Cipher(TripleDES(
_key56_to_key64(key_source[7:14])), mode).encryptor()
part2 = encryptor.update(challenge) + encryptor.finalize()
encryptor = Cipher(TripleDES(
_key56_to_key64(key_source[14:])), mode).encryptor()
part3 = encryptor.update(challenge) + encryptor.finalize()
return part1 + part2 + part3
def _key56_to_key64(key_56):
"""
Insert dummy parity bit.
This converts the 7 bytes key to 8 bytes key while keeping all the
important bytes used by DES.
This will need to be update on Python 3 migration.
"""
# Start with a empty value.
key = [0] * 8
key[0] = ord(key_56[0])
key[1] = ((ord(key_56[0]) << 7) & 0xFF) | (ord(key_56[1]) >> 1)
key[2] = ((ord(key_56[1]) << 6) & 0xFF) | (ord(key_56[2]) >> 2)
key[3] = ((ord(key_56[2]) << 5) & 0xFF) | (ord(key_56[3]) >> 3)
key[4] = ((ord(key_56[3]) << 4) & 0xFF) | (ord(key_56[4]) >> 4)
key[5] = ((ord(key_56[4]) << 3) & 0xFF) | (ord(key_56[5]) >> 5)
key[6] = ((ord(key_56[5]) << 2) & 0xFF) | (ord(key_56[6]) >> 6)
key[7] = (ord(key_56[6]) << 1) & 0xFF
return b''.join([chr(b) for b in key])
server default {
listen {
# Allowed values are:
# auth listen for authentication packets
# acct listen for accounting packets
type = auth
ipaddr = *
port = 1812
# interface = eth0
limit {
max_connections = 16
# The lifetime, in seconds, of a TCP connection. After
# this lifetime, the connection will be closed.
lifetime = 0
# The idle timeout, in seconds, of a TCP connection.
idle_timeout = 30
}
}
listen {
type = auth
ipv6addr = ::
port = 1812
# interface = eth0
limit {
max_connections = 16
lifetime = 0
idle_timeout = 30
}
}
authorize {
chap
files
mschap
digest
pap
}
authenticate {
Auth-Type PAP {
pap
}
Auth-Type CHAP {
chap
}
# The Auth-Type attribute is automatically set by `mschap` module
# when MS-CHAP attributes are detected.
Auth-Type MSCHAP {
mschap
}
digest
}
post-auth {
}
}
# See https://freeradius.org/radiusd/man/users.html
# User with spaces in name and password.
# Allowed only for NAS-Port 0.
# Allowed for any authentication type.
"radius CI v1" Cleartext-Password := "Radius Password", NAS-Port == 0
Framed-Filter-Id = "group=ftp-users;limit=123",
Reply-Message = "Hi %{User-Name}",
Fall-Through = Yes
# User allowed only on NAS-Port 42.
"port42 CI v1" Cleartext-Password := "Radius 42", NAS-Port == 42
Framed-Filter-Id = "group=ftp-users;limit=123",
Reply-Message = "Hi %{User-Name}",
Fall-Through = Yes
# Only PAP auth.
# We can't match on User-Password as freeradius will ignore it.
# So we go with rejecting CHAP or MS-CHAP.
"pap CI v1" Cleartext-Password := "Radius pap", CHAP-Password !* ANY, MS-CHAP-Challenge !* ANY
Framed-Filter-Id = "group=pap-users;limit=234",
Reply-Message = "Hi PAP %{User-Name}",
# Only CHAP auth
"chap CI v1" Cleartext-Password := "Radius chap", CHAP-Password =* ANY
Framed-Filter-Id = "group=chap-users;limit=345",
Reply-Message = "Hi CHAP %{User-Name}",
# Only MS-CHAP-V1 auth.
"ms-chap-v1 CI v1" Cleartext-Password := "Radius ms-chap-v1", MS-CHAP-Use-NTLM-Auth := No, MS-CHAP-Response =* ANY
Framed-Filter-Id = "group=ms-chap-v1-users;limit=456",
Reply-Message = "Hi MS-CHAP-v1 %{User-Name}",
# Only MS-CHAP-V2 auth.
"ms-chap-v2 CI v1" Cleartext-Password := "Radius ms-chap-v2", MS-CHAP-Use-NTLM-Auth := No, MS-CHAP2-Response =* ANY
Framed-Filter-Id = "group=ms-chap-v2-users;limit=567",
Reply-Message = "Hi MS-CHAP-v2 %{User-Name}",
# Add an extra filter ID to all responses,
# including failed responses.
DEFAULT
Framed-Filter-Id += "group=ppp-users",
# Add a default message is not already set.
Reply-Message = "Bye bye!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment