Skip to content

Instantly share code, notes, and snippets.

@frennkie
Created October 27, 2020 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frennkie/2d66bd8383efa102a27e16c9a09b7701 to your computer and use it in GitHub Desktop.
Save frennkie/2d66bd8383efa102a27e16c9a09b7701 to your computer and use it in GitHub Desktop.
import logging
import ssl
import uuid
from urllib.parse import urlparse
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from django.db import models
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from ldap3 import Connection, Tls, Server
from ldap3.core.exceptions import LDAPBindError
logger = logging.getLogger(__name__)
class LdapCertBackend(models.Model):
id = models.UUIDField(
primary_key=True,
editable=False,
default=uuid.uuid4,
help_text='Unique ID for backend',
verbose_name=_('id')
)
name = models.CharField(
help_text=_('The name of this backend'),
max_length=140,
unique=True,
verbose_name=_('name')
)
uri = models.CharField(
help_text='URI (e.g. ldap://server1.example.com:389)',
max_length=2100,
verbose_name=_('uri')
)
ca_cert = models.ForeignKey(
"CaCertificate",
blank=True,
null=True,
on_delete=models.SET_NULL,
)
use_starttls = models.BooleanField(
default=False,
help_text='Use STARTTLS after connection start',
verbose_name=_('use_starttls')
)
username = models.CharField(
blank=True,
default="",
help_text='Bind DN (leave empty for anonymous).',
max_length=100,
verbose_name=_('username')
)
password = models.CharField(
blank=True,
default="",
help_text='Bind Password (hidden/masked).',
max_length=100,
verbose_name=_('password')
)
is_ad = models.BooleanField(
default=False,
help_text='Use NTLM auth for Microsoft Active Directory',
verbose_name=_('is_ad')
)
base = models.CharField(
blank=True,
default="",
help_text='Search Base',
max_length=100,
verbose_name=_('search base')
)
field = models.CharField(
default='userCertificate;binary',
help_text=_('LDAP field of certificate (default: userCertificate;binary)'),
max_length=1000,
verbose_name=_('field')
)
class Meta:
verbose_name = _('LDAP Backend')
verbose_name_plural = _('LDAP Backends')
def __str__(self):
if self.uri:
return '%s (%s)' % (self.name, self.uri)
return '%s (ldap://...)' % self.name
@cached_property
def parsed_uri(self):
return urlparse(self.uri)
@property
def ldap_hostname(self):
return self.parsed_uri.hostname
@property
def ldap_port(self):
return self.parsed_uri.hostname
@property
def ldap_scheme(self):
return self.parsed_uri.scheme
def get_conn(self):
"""This will raise exceptions"""
tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
server = Server(self.uri, tls=tls_configuration)
conn = Connection(server, self.username, self.password, auto_bind=False)
if self.use_starttls:
conn.start_tls()
if not conn.bind():
logger.error(f'error on ldap bind: {conn.result}')
raise LDAPBindError(conn.result.get('message'))
return conn
def fetch(self, value=None, conn=None):
if conn is None:
conn = self.get_conn()
if value is None:
print('not specified what to fetch.. check DB and fetch all for backend')
values = []
else:
values = [value]
x509_certs = []
for val in values:
if conn.search(self.base, f'(mail={val})', attributes=[self.field]):
for entry in conn.entries:
binary_cert = entry[self.field].values[0]
x509_cert = x509.load_der_x509_certificate(binary_cert, backend=default_backend())
x509_certs.append(x509_cert)
conn.unbind()
return x509_certs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment