Last active
August 29, 2015 14:10
-
-
Save sigmaris/07041c168768af0712c3 to your computer and use it in GitHub Desktop.
Example SASL code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import base64 | |
import struct | |
import sys | |
from gssapi import sec_contexts, names | |
from gssapi.raw.types import NameType, RequirementFlag | |
SEC_LAYER_NONE = 1 | |
SEC_LAYER_INT = 2 | |
SEC_LAYER_CONF = 4 | |
def abort(error): | |
print("ERROR: {0}".format(error)) | |
sys.exit(1) | |
parser = argparse.ArgumentParser(description='Example SASL client.') | |
parser.add_argument('target_name', help='Name of the target service') | |
parser.add_argument('--authzid', help='Authorization identity') | |
parser.add_argument('--maxsize', default=0x010000, type=int, help='Maximum message size') | |
parser.add_argument('--security', choices=('auth', 'int', 'conf'), | |
default='auth', help='security layer, "auth" for auth only,' | |
' "int" for auth and integrity protection,' | |
' "conf" for auth, integrity, and confidentiality protection.') | |
args = parser.parse_args() | |
target_name = names.Name(args.target_name, NameType.hostbased_service) | |
flags = [RequirementFlag.integrity] | |
if args.security != 'auth': | |
flags.extend([ | |
RequirementFlag.mutual_authentication, | |
RequirementFlag.out_of_sequence_detection | |
]) | |
if args.security == 'conf': | |
flags.append(RequirementFlag.confidentiality) | |
ctx = sec_contexts.SecurityContext(name=target_name, flags=flags) | |
print("C: {0}".format(base64.b64encode(ctx.step()))) | |
while not ctx.complete: | |
server_msg = base64.b64decode(raw_input("S> ")) | |
client_msg = ctx.step(server_msg) | |
print("C: {0}".format(base64.b64encode(client_msg))) | |
res = ctx.unwrap(base64.b64decode(raw_input("S> "))) | |
security = struct.unpack('c', res.message[0]) | |
max_length = struct.unpack('!I', b'\x00' + res.message[1:4]) | |
our_max_length = args.maxsize | |
if args.security == 'conf': | |
layer_bits = 4 | |
if not (security & SEC_LAYER_CONF): | |
abort("Server does not support confidentiality security layer.") | |
if not (ctx.actual_flags & RequirementFlag.confidentiality): | |
abort("Confidentiality is not supported on this GSSAPI context.") | |
elif args.security == 'int': | |
layer_bits = 2 | |
if not (security & SEC_LAYER_INT): | |
abort("Server does not support integrity security layer.") | |
if not (ctx.actual_flags & RequirementFlag.integrity): | |
abort("Integrity is not supported on this GSSAPI context.") | |
elif args.security == 'auth': | |
layer_bits = 1 | |
our_max_length = 0 | |
if not (security & SEC_LAYER_NONE): | |
abort("Server does not support auth-only, i.e. no security layer.") | |
client_bytes = struct.pack('!I', our_max_length) | |
client_bytes[0] = struct.pack('c', layer_bits) | |
if args.authzid: | |
client_bytes += args.authzid.encode('UTF-8') | |
print("C: {0}".format(base64.b64encode(ctx.wrap(client_bytes, False)))) | |
print("Authentication complete.") | |
print("Client's name: {0}".format(ctx.initiator_name)) | |
print("Server's name: {0}".format(ctx.target_name)) | |
print("Server's maximum buffer length: {0}".format(max_length)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import base64 | |
import struct | |
import sys | |
from gssapi import creds, names, sec_contexts | |
from gssapi.raw.types import NameType, MechType, RequirementFlag | |
SEC_LAYER_NONE = 1 | |
SEC_LAYER_INT = 2 | |
SEC_LAYER_CONF = 4 | |
def abort(error): | |
print("ERROR: {0}".format(error)) | |
sys.exit(1) | |
parser = argparse.ArgumentParser(description='Example SASL server.') | |
parser.add_argument('--name', help='Name of this service') | |
parser.add_argument('--maxsize', default=0x010000, type=int, | |
help='Maximum message size') | |
parser.add_argument('--security', nargs='*', choices=('auth', 'int', 'conf'), | |
default=['auth','int','conf'], | |
help='Supported security layers, "auth" for auth only,' | |
' "int" for integrity protection,' | |
' "conf" for integrity and confidentiality protection.') | |
args = parser.parse_args() | |
if args.maxsize > 0xFFFFFF: | |
abort("maxsize must be no more than 16777215") | |
sec_ctx_args = {} | |
if args.name: | |
name = names.Name(args.name, NameType.hostbased_service) | |
sec_ctx_args['creds'] = creds.Credentials( | |
desired_name=name, desired_mechs=[MechType.kerberos], usage='accept' | |
) | |
ctx = sec_contexts.SecurityContext(**sec_ctx_args) | |
while not ctx.complete: | |
client_msg = base64.b64decode(raw_input("C> ")) | |
server_msg = ctx.step(client_msg) | |
if server_msg: | |
print("S: {0}".format(base64.b64encode(server_msg))) | |
if ctx.mech_type != MechType.kerberos: | |
abort("Mechanism type was not Kerberos 5.") | |
if args.name and name != ctx.target_name: | |
abort("Target name was not the name of this service.") | |
if server_msg: | |
print("Expecting an empty response from the client:") | |
raw_input("C> ") | |
layer_bits = 0 | |
if 'auth' in args.security: | |
layer_bits |= SEC_LAYER_NONE | |
if 'int' in args.security: | |
layer_bits |= SEC_LAYER_INT | |
if 'conf' in args.security: | |
if ctx.actual_flags & RequirementFlag.confidentiality: | |
layer_bits |= SEC_LAYER_CONF | |
else: | |
print( | |
"Server cannot offer confidentiality " | |
"as it is unsupported by the security context" | |
) | |
if 'int' not in args.security and 'conf' not in args.security: | |
print( | |
"Server does not support any security layer, " | |
"forcing maximum buffer size to 0." | |
) | |
our_max_length = 0 | |
else: | |
our_max_length = args.maxsize | |
server_bytes = struct.pack('!I', our_max_length) | |
server_bytes[0] = struct.pack('c', layer_bits) | |
print("S: {0}".format(base64.b64encode(ctx.wrap(server_bytes, False)))) | |
client_msg = base64.b64decode(raw_input("C> ")) | |
res = ctx.unwrap(client_msg) | |
client_security_layer = struct.unpack('c', res.message[0]) | |
max_length = struct.unpack('!I', b'\x00' + res.message[1:4]) | |
authz_identity = res.message[4:] | |
if not (client_security_layer & layer_bits): | |
abort( | |
"Client requested security layer ({0}) ".format(client_security_layer) | |
+ "that did not match our supported layers ({0})".format(layer_bits) | |
) | |
if (client_security_layer & SEC_LAYER_NONE) and (max_length != 0): | |
abort( | |
"Client requested no security layer " | |
"but sent a maximum buffer length of {0}".format(max_length) | |
) | |
print("Authentication successful.") | |
print("Client's name: {0}".format(ctx.initiator_name)) | |
print("Client's maximum buffer length: {0}".format(max_length)) | |
if len(authz_identity) > 0: | |
print("Client's authorization identity: {0}".format(authz_identity)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment