Skip to content

Instantly share code, notes, and snippets.

@sigmaris
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sigmaris/07041c168768af0712c3 to your computer and use it in GitHub Desktop.
Save sigmaris/07041c168768af0712c3 to your computer and use it in GitHub Desktop.
Example SASL code
import argparse
import base64
import struct
import sys
from gssapi import sec_contexts, names
from gssapi.raw.types import NameType, RequirementFlag
SEC_LAYER_NONE = 1
SEC_LAYER_INT = 2
SEC_LAYER_CONF = 4
def abort(error):
print("ERROR: {0}".format(error))
sys.exit(1)
parser = argparse.ArgumentParser(description='Example SASL client.')
parser.add_argument('target_name', help='Name of the target service')
parser.add_argument('--authzid', help='Authorization identity')
parser.add_argument('--maxsize', default=0x010000, type=int, help='Maximum message size')
parser.add_argument('--security', choices=('auth', 'int', 'conf'),
default='auth', help='security layer, "auth" for auth only,'
' "int" for auth and integrity protection,'
' "conf" for auth, integrity, and confidentiality protection.')
args = parser.parse_args()
target_name = names.Name(args.target_name, NameType.hostbased_service)
flags = [RequirementFlag.integrity]
if args.security != 'auth':
flags.extend([
RequirementFlag.mutual_authentication,
RequirementFlag.out_of_sequence_detection
])
if args.security == 'conf':
flags.append(RequirementFlag.confidentiality)
ctx = sec_contexts.SecurityContext(name=target_name, flags=flags)
print("C: {0}".format(base64.b64encode(ctx.step())))
while not ctx.complete:
server_msg = base64.b64decode(raw_input("S> "))
client_msg = ctx.step(server_msg)
print("C: {0}".format(base64.b64encode(client_msg)))
res = ctx.unwrap(base64.b64decode(raw_input("S> ")))
security = struct.unpack('c', res.message[0])
max_length = struct.unpack('!I', b'\x00' + res.message[1:4])
our_max_length = args.maxsize
if args.security == 'conf':
layer_bits = 4
if not (security & SEC_LAYER_CONF):
abort("Server does not support confidentiality security layer.")
if not (ctx.actual_flags & RequirementFlag.confidentiality):
abort("Confidentiality is not supported on this GSSAPI context.")
elif args.security == 'int':
layer_bits = 2
if not (security & SEC_LAYER_INT):
abort("Server does not support integrity security layer.")
if not (ctx.actual_flags & RequirementFlag.integrity):
abort("Integrity is not supported on this GSSAPI context.")
elif args.security == 'auth':
layer_bits = 1
our_max_length = 0
if not (security & SEC_LAYER_NONE):
abort("Server does not support auth-only, i.e. no security layer.")
client_bytes = struct.pack('!I', our_max_length)
client_bytes[0] = struct.pack('c', layer_bits)
if args.authzid:
client_bytes += args.authzid.encode('UTF-8')
print("C: {0}".format(base64.b64encode(ctx.wrap(client_bytes, False))))
print("Authentication complete.")
print("Client's name: {0}".format(ctx.initiator_name))
print("Server's name: {0}".format(ctx.target_name))
print("Server's maximum buffer length: {0}".format(max_length))
import argparse
import base64
import struct
import sys
from gssapi import creds, names, sec_contexts
from gssapi.raw.types import NameType, MechType, RequirementFlag
SEC_LAYER_NONE = 1
SEC_LAYER_INT = 2
SEC_LAYER_CONF = 4
def abort(error):
print("ERROR: {0}".format(error))
sys.exit(1)
parser = argparse.ArgumentParser(description='Example SASL server.')
parser.add_argument('--name', help='Name of this service')
parser.add_argument('--maxsize', default=0x010000, type=int,
help='Maximum message size')
parser.add_argument('--security', nargs='*', choices=('auth', 'int', 'conf'),
default=['auth','int','conf'],
help='Supported security layers, "auth" for auth only,'
' "int" for integrity protection,'
' "conf" for integrity and confidentiality protection.')
args = parser.parse_args()
if args.maxsize > 0xFFFFFF:
abort("maxsize must be no more than 16777215")
sec_ctx_args = {}
if args.name:
name = names.Name(args.name, NameType.hostbased_service)
sec_ctx_args['creds'] = creds.Credentials(
desired_name=name, desired_mechs=[MechType.kerberos], usage='accept'
)
ctx = sec_contexts.SecurityContext(**sec_ctx_args)
while not ctx.complete:
client_msg = base64.b64decode(raw_input("C> "))
server_msg = ctx.step(client_msg)
if server_msg:
print("S: {0}".format(base64.b64encode(server_msg)))
if ctx.mech_type != MechType.kerberos:
abort("Mechanism type was not Kerberos 5.")
if args.name and name != ctx.target_name:
abort("Target name was not the name of this service.")
if server_msg:
print("Expecting an empty response from the client:")
raw_input("C> ")
layer_bits = 0
if 'auth' in args.security:
layer_bits |= SEC_LAYER_NONE
if 'int' in args.security:
layer_bits |= SEC_LAYER_INT
if 'conf' in args.security:
if ctx.actual_flags & RequirementFlag.confidentiality:
layer_bits |= SEC_LAYER_CONF
else:
print(
"Server cannot offer confidentiality "
"as it is unsupported by the security context"
)
if 'int' not in args.security and 'conf' not in args.security:
print(
"Server does not support any security layer, "
"forcing maximum buffer size to 0."
)
our_max_length = 0
else:
our_max_length = args.maxsize
server_bytes = struct.pack('!I', our_max_length)
server_bytes[0] = struct.pack('c', layer_bits)
print("S: {0}".format(base64.b64encode(ctx.wrap(server_bytes, False))))
client_msg = base64.b64decode(raw_input("C> "))
res = ctx.unwrap(client_msg)
client_security_layer = struct.unpack('c', res.message[0])
max_length = struct.unpack('!I', b'\x00' + res.message[1:4])
authz_identity = res.message[4:]
if not (client_security_layer & layer_bits):
abort(
"Client requested security layer ({0}) ".format(client_security_layer)
+ "that did not match our supported layers ({0})".format(layer_bits)
)
if (client_security_layer & SEC_LAYER_NONE) and (max_length != 0):
abort(
"Client requested no security layer "
"but sent a maximum buffer length of {0}".format(max_length)
)
print("Authentication successful.")
print("Client's name: {0}".format(ctx.initiator_name))
print("Client's maximum buffer length: {0}".format(max_length))
if len(authz_identity) > 0:
print("Client's authorization identity: {0}".format(authz_identity))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment